华为云用户手册

  • CDL同步任务支持的数据类型及映射关系 主要介绍CDL同步任务支持的数据类型,以及源端数据库数据类型跟Spark数据类型的映射关系。 表3 PgSQL和Spark数据类型映射关系 PostgreSQL数据类型 Spark(Hudi)数据类型 int2 int int4 int int8 bigint numeric(p, s) decimal[p,s] bool boolean char string varchar string text string timestamptz timestamp timestamp timestamp date date json, jsonb string float4 float float8 double 表4 MySQL和Spark数据类型映射关系 MySQL数据类型 Spark(Hudi)数据类型 int int integer int bigint bigint double double decimal[p,s] decimal[p,s] varchar string char string text string timestamp timestamp datetime timestamp date date json string float double 表5 Ogg/Ogg Oracle Avro( MRS 3.3.0及之后版本)和Spark数据类型映射关系 Oracle数据类型 Spark(Hudi)数据类型 NUMBER(3),NUMBER(5) bigint INTEGER decimal NUMBER(20) decimal NUMBER decimal BINARY_DOUBLE double CHAR string VARCHAR string TIMESTAMP, DATETIME timestamp timestamp with time zone timestamp DATE timestamp 表6 DRS Opengauss Json和Spark数据类型映射关系(MRS 3.3.0及之后版本支持) Opengauss Json数据类型 Spark(Hudi)数据类型 int2 int int4 int int8 bigint numeric(p,s) decimal[p,s] bool boolean varchar string timestamp timestamp timestampz timestamp date date jsonb string json string float4 float float8 double text string 表7 DRS Oracle Json和Spark数据类型映射关系(MRS 3.3.0及之后版本支持) Oracle Json数据类型 Spark(Hudi)数据类型 number(p,s) decimal[p,s] binary double double char string varchar2 string nvarchar2 string timestamp timestamp timestamp with time zone timestamp date timestamp 表8 DRS Oracle Avro和Spark数据类型映射关系(MRS 3.3.0及之后版本支持) Oracle Avro数据类型 Spark(Hudi)数据类型 number[p,s] decimal[p,s] float(p) float binary_double double char(p) string varchar2(p) string timestamp(p) timestamp date timestamp 表9 openGauss和Spark数据类型映射关系(MRS 3.3.0及之后版本支持) Opengauss数据类型 Spark(Hudi)数据类型 int1 int int2 int int4 int int8 bigint numeric(p,s) decimal[p,s] bool boolean char string bpchar string nvarchar2 string text string date date timestamp timestamp timestampz timestamp json string jsonb string float4 float float8 double real float 表10 Spark(Hudi)和DWS数据类型映射关系 Spark(Hudi)数据类型 DWS数据类型 int int long bigint float float double double decimal[p,s] decimal[p,s] boolean boolean string varchar date date timestamp timestamp 表11 Spark(Hudi)和ClickHouse数据类型映射关系 Spark(Hudi)数据类型 ClickHouse数据类型 int Int32 long Int64 (bigint) float Float32 (float) double Float64 (double) decimal[p,s] Decimal(P,S) boolean bool string String (LONGTEXT, MEDIUMTEXT, TINYTEXT, TEXT, LONGBLOB, MEDIUMBLOB, TINYBLOB, BLOB, VARCHAR, CHAR) date Date timestamp DateTime
  • 操作场景 FlinkServer支持对接JDBC。本示例以安全模式FlinkServer、Kafka为例,介绍JDBC的MySQL作为Source表、Sink表以及维表的DDL定义,以及创建表时使用的WITH参数和代码示例,指导如何在FlinkServer作业管理页面对接JDBC。 FlinkServer界面回显FlinkSQL时,SQL中的“password”字段将显示为空,在回显状态下需要将密码信息补齐后再提交作业。
  • Flink SQL与JDBC数据类型对应关系 Flink SQL数据类型 MySQL数据类型 Oracle数据类型 PostgreSQL数据类型 SQL Server数据类型 BOOLEAN BOOLEAN TINYINT(1) - BOOLEAN BIT TINYINT TINYINT - - TINYINT SMALLINT SMALLINT TINYINT UNSIGNED - SMALLINT INT2 SMALLSERIAL SERIAL2 SMALLINT INT INT MEDIUMINT SMALLINT UNSIGNED - INTEGER SERIAL INT BIGINT BIGINT INT UNSIGNED - BIGINT BIGSERIAL BIGINT FLOAT FLOAT BINARY_FLOAT REAL FLOAT4 REAL DOUBLE DOUBLE DOUBLE PRECISION BINARY_DOUBLE FLOAT8 DOUBLE PRECISION FLOAT STRING CHAR(n) VARCHAR(n) TEXT CHAR(n) VARCHAR(n) CLOB CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT CHAR(n) NCHAR(n) VARCHAR(n) NVARCHAR(n) TEXT NTEXT BYTES BINARY VARBINARY BLOB RAW(s) BLOB BYTEA BINARY(n) VARBINARY(n) ARRAY - - ARRAY - DATE DATE DATE DATE DATE TIME [(p)] [WITHOUT TIMEZONE] TIME [(p)] DATE TIME [(p)] [WITHOUT TIMEZONE] TIME(0) TIMESTAMP [(p)] [WITHOUT TIMEZONE] DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE] TIMESTAMP [(p)] [WITHOUT TIMEZONE] DATETIME DATETIME2 DECIMAL(20, 0) BIGINT UNSIGNED - - - DECIMAL(p, s) NUMERIC(p, s) DECIMAL(p, s) SMALLINT FLOAT(s) DOUBLE PRECISION REAL NUMBER(p, s) NUMERIC(p, s) DECIMAL(p, s) DECIMAL(p, s)
  • 块副本位置选择 NodeLabel支持对各个副本的摆放采用不同的策略,如表达式“label-1,label-2,label-3”,表示3个副本分别放到含有label-1、label-2、label-3的DataNode中,不同的副本策略用逗号分隔。 如果label-1,希望放2个副本,可以这样设置表达式:“label-1[replica=2],label-2,label-3”。这种情况下,如果默认副本数是3,则会选择2个带有label-1和一个label-2的节点;如果默认副本数是4,会选择2个带有label-1、一个label-2以及一个label-3的节点。可以注意到,副本数是从左到右依次满足各个副本策略的,但也有副本数超过表达式表述的情况,当默认副本数为5时,多出来的一个副本会放到最后一个节点中,也就是label-3的节点里。 当启用ACLs功能并且用户无权访问表达式中使用的标签时,将不会为副本选择属于该标签的DataNode。
  • 多余块副本删除选择 如果块副本数超过参数“dfs.replication”值(即用户指定的文件副本数,可以参考修改集群服务配置参数进入HDFS服务全部配置页面,搜索对应参数查看),HDFS会删除多余块副本来保证集群资源利用率。 删除规则如下: 优先删除不满足任何表达式的副本。 示例:文件默认副本数为3 /test标签表达式为“LA[replica=1],LB[replica=1],LC[replica=1]”; /test文件副本分布的四个节点(D1~D4)以及对应标签(LA~LD): D1:LA D2:LB D3:LC D4:LD 则选择删除D4节点上的副本块。 如果所有副本都满足表达式,删除多于表达式指定的数量的副本。 示例:文件默认副本数为3 /test标签表达式为“LA[replica=1],LB[replica=1],LC[replica=1]”; /test文件副本分布的四个节点以及对应标签: D1:LA D2:LA D3:LB D4:LC 则选择删除D1或者D2上的副本块。 如果文件所有者或文件所有者的组不能访问某个标签,则优先删除映射到该标签的DataNode中的副本。
  • 基于标签的数据块摆放策略样例 例如某MRS集群有六个DataNode:dn-1,dn-2,dn-3,dn-4,dn-5以及dn-6,对应的IP为10.1.120.[1-6]。有六个目录需要配置标签表达式,Block默认备份数为3。 下面给出3种DataNode标签信息在“host2labels”文件中的表示方式,其作用是一样的。 主机名正则表达式 /dn-[1456]/ = label-1,label-2 /dn-[26]/ = label-1,label-3 /dn-[3456]/ = label-1,label-4 /dn-5/ = label-5 IP地址范围表示方式 10.1.120.[1-6] = label-1 10.1.120.1 = label-2 10.1.120.2 = label-3 10.1.120.[3-6] = label-4 10.1.120.[4-6] = label-2 10.1.120.5 = label-5 10.1.120.6 = label-3 普通的主机名表达式 /dn-1/ = label-1, label-2 /dn-2/ = label-1, label-3 /dn-3/ = label-1, label-4 /dn-4/ = label-1, label-2, label-4 /dn-5/ = label-1, label-2, label-4, label-5 /dn-6/ = label-1, label-2, label-3, label-4 目录的标签表达式设置结果如下: /dir1 = label-1 /dir2 = label-1 && label-3 /dir3 = label-2 || label-4[replica=2] /dir4 = (label-2 || label-3) && label-4 /dir5 = !label-1 /sdir2.txt = label-1 && label-3[replica=3,fallback=NONE] /dir6 = label-4[replica=2],label-2 标签表达式设置方式请参考hdfs nodelabel -setLabelExpression命令。 文件的数据块存放结果如下: “/dir1”目录下文件的数据块可存放在dn-1,dn-2,dn-3,dn-4,dn-5和dn-6六个节点中的任意一个。 “/dir2”目录下文件的数据块可存放在dn-2和dn-6节点上。Block默认备份数为3,表达式只匹配了两个DataNode节点,第三个副本会在集群上剩余的节点中选择一个DataNode节点存放。 “/dir3”目录下文件的数据块可存放在dn-1,dn-3,dn-4,dn-5和dn-6中的任意三个节点上。 “/dir4”目录下文件的数据块可存放在dn-4,dn-5和dn-6。 “/dir5”目录下文件的数据块没有匹配到任何一个DataNode,会从整个集群中任意选择三个节点存放(和默认选块策略行为一致)。 “/sdir2.txt”文件的数据块,两个副本存放在dn-2和dn-6节点上,虽然还缺失一个备份节点,但由于使用了fallback=NONE参数,所以只存放两个备份。 “/dir6”目录下文件的数据块在具备label-4的节点中选择2个节点(dn-3 -- dn-6),然后在label-2中选择一个节点,如果用户指定“/dir6”下文件副本数大于3,则多出来的副本均在label-2。
  • 配置描述 DataNode节点标签配置 请参考修改集群服务配置参数,进入HDFS的“全部配置”页面,在搜索框中输入参数名称。 表1 参数说明 参数 描述 默认值 dfs.block.replicator.classname 配置HDFS的DataNode原则策略。 如果需要开启NodeLabe功能,需要将该值设置为org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeLabel。 org.apache.hadoop.hdfs.server.blockmanagement.AvailableSpaceBlockPlacementPolicy host2tags 配置DataNode主机与标签的对应关系。 主机名称支持配置IP扩展表达式(如192.168.1.[1-128]或者192.168.[2-3].[1-128],且IP必须为业务IP),或者为前后加上 / 的主机名的正则表达式(如/datanode-[123]/或者/datanode-\d{2}/)。标签配置名称不允许包含 = / \ 字符。【注意】配置IP时必须是业务IP。 - host2tags配置项内容详细说明: 例如某MRS集群有20个DataNode:dn-1到dn-20,对应的IP地址为10.1.120.1到10.1.120.20,host2tags配置文件内容可以使用如下的表示方式。 主机名正则表达式: “/dn-\d/ = label-1”表示dn-1到dn-9对应的标签为label-1,即dn-1 = label-1,dn-2 = label-1,...dn-9 = label-1。 “/dn-((1[0-9]$)|(20$))/ = label-2”表示dn-10到dn-20对应的标签为label-2,即dn-10 = label-2,dn-11 = label-2,...dn-20 = label-2。 IP地址范围表示方式: “10.1.120.[1-9] = label-1”表示10.1.120.1到10.1.120.9对应的标签为label-1,即10.1.120.1 = label-1,10.1.120.2 = label-1,...10.1.120.9 = label-1。 “10.1.120.[10-20] = label-2”表示10.1.120.10到10.1.120.20对应的标签为label-2,即10.1.120.10 = label-2,10.1.120.11 = label-2,...10.1.120.20 = label-2。 基于标签的数据块摆放策略支持扩容减容场景: 当集群中新增加DataNode节点时,如果该DataNode对应的IP匹配host2tags配置项中的IP地址范围,或者该DataNode的主机名匹配host2tags配置项中的主机名正则表达式,则该DataNode节点会被设置成对应的标签。 例如“host2tags”配置值为10.1.120.[1-9] = label-1,而当前集群只有10.1.120.1到10.1.120.3三个数据节点。进行扩容后,又添加了10.1.120.4这个数据节点,则该数据节点会被设置成label-1的标签;如果10.1.120.3这个数据节点被删除或者退出服务后,数据块不会再被分配到该节点上。 设置目录/文件的标签表达式 在HDFS参数配置页面配置“path2expression”,配置HDFS目录与标签的对应关系。当配置的HDFS目录不存在时,也可以配置成功,新建不存在的同名目录,已设置的标签对应关系将在30分钟之内被继承。设置了标签的目录被删除后,新增一个同名目录,原有的对应关系也将在30分钟之内被继承。 命令行设置方式请参考hdfs nodelabel -setLabelExpression命令。 Java API设置方式通过NodeLabelFileSystem实例化对象调用setLabelExpression(String src, String labelExpression)方法。src为HDFS上的目录或文件路径,“labelExpression”为标签表达式。 开启NodeLabel特性后,可以通过命令hdfs nodelabel -listNodeLabels查看每个DataNode的标签信息。
  • 操作场景 用户需要通过数据特征灵活配置HDFS文件数据块的存储节点。通过设置HDFS目录/文件对应一个标签表达式,同时设置每个DataNode对应一个或多个标签,从而给文件的数据块存储指定了特定范围的DataNode。 当使用基于标签的数据块摆放策略,为指定的文件选择DataNode节点进行存放时,会根据文件的标签表达式选择出DataNode节点范围,然后在这些DataNode节点范围内,选择出合适的存放节点。 场景1 DataNodes分区场景。 场景说明: 用户需要让不同的应用数据运行在不同的节点,分开管理,就可以通过标签表达式,来实现不同业务的分离,指定业务存放到对应的节点上。 通过配置NodeLabel特性使得: /HBase下的数据存储在DN1、DN2、DN3、DN4节点上。 /Spark下的数据存储在DN5、DN6、DN7、DN8节点上。 图1 DataNode分区场景 通过hdfs nodelabel -setLabelExpression -expression 'LabelA[fallback=NONE]' -path /Hbase命令,给Hbase目录设置表达式。从图1中可知,“/Hbase”文件的数据块副本会被放置在有LabelA标签的节点上,即DN1、DN2、DN3、DN4。 同理,通过hdfs nodelabel -setLabelExpression -expression 'LabelB[fallback=NONE]' -path /Spark命令,给Spark目录设置表达式。在“/Spark”目录下文件对应的数据块副本只能放置到LabelB标签上的节点,如DN5、DN6、DN7、DN8。 设置数据节点的标签参考配置描述。 如果同一个集群上存在多个机架,每个标签下可以有多个机架的DataNodes,以确保数据块摆放的可靠性。 场景2 多机架下指定副本位置场景 场景说明: 在异构集群中,需要分配一些特定的具有高可靠性的节点用以存放重要的商业数据,可以通过标签表达式指定副本位置,指定文件数据块的其中一个副本存放到高可靠性的节点上。 “/data”目录下的数据块,默认三副本情况下,其中至少有一个副本会被存放到RACK1或RACK2机架的节点上(RACK1和RACK2机架的节点为高可靠性节点),另外两个副本会被分别存放到RACK3和RACK4机架的节点上。 图2 场景样例 通过hdfs nodelabel -setLabelExpression -expression 'LabelA||LabelB[fallback=NONE],LabelC,LabelD' -path /data命令给“/data”目录设置表达式。 当向“/data”目录下写数据时,至少有一个数据块副本存放在LabelA或者LabelB标签的节点中,剩余的两个数据块副本会被存放在有LabelC和LabelD标签的节点上。
  • 加密 Spark支持Akka和HTTP(广播和文件服务器)协议的SSL,但WebUI和块转移服务仍不支持SSL。 SSL必须在每个节点上配置,并使用特殊协议为通信涉及到的每个组件进行配置。 表24 参数说明 参数 描述 取值示例 spark.ssl.enabled 是否在所有被支持协议上开启SSL连接。 与spark.ssl.xxx类似的所有SSL设置指示了所有被支持协议的全局配置。为了覆盖特殊协议的全局配置,在协议指定的命名空间中必须重写属性。 使用“spark.ssl.YYY.XXX”设置覆盖由YYY指示的特殊协议的全局配置。目前YYY可以是基于Akka连接的akka或广播与文件服务器的fs。 false spark.ssl.enabledAlgorithms 以逗号分隔的密码列表。指定的密码必须被JVM支持。 - spark.ssl.keyPassword key-store的私人密钥密码。 - spark.ssl.keyStore key-store文件的路径。该路径可以绝对或相对于开启组件的目录。 - spark.ssl.keyStorePassword key-store的密码。 - spark.ssl.protocol 协议名。该协议必须被JVM支持。本页所有协议的参考表。 - spark.ssl.trustStore trust-store文件的路径。该路径可以绝对或相对于开启组件的目录。 - spark.ssl.trustStorePassword trust-store的密码。 -
  • 安全性 Spark目前支持通过共享密钥认证。可以通过spark.authenticate配置参数配置认证。该参数控制Spark通信协议是否使用共享密钥执行认证。该认证是确保双边都有相同的共享密钥并被允许通信的基本握手。如果共享密钥不同,通信将不被允许。共享密钥通过如下方式创建: 对于YARN部署的Spark,将spark.authenticate配置为真会自动处理生成和分发共享密钥。每个应用程序会独占一个共享密钥。 对于其他类型部署的Spark,应该在每个节点上配置Spark参数spark.authenticate.secret。所有Master/Workers和应用程序都将使用该密钥。 表25 参数说明 参数 描述 取值示例 spark.acls.enable 是否开启Spark acls。如果开启,它将检查用户是否有访问和修改job的权限。请注意这要求用户可以被识别。如果用户被识别为无效,检查将不被执行。UI可以使用过滤器认证和设置用户。 true spark.admin.acls 逗号分隔的有权限访问和修改所有Spark job的用户/管理员列表。如果在共享集群上运行并且工作时有MRS集群管理员或开发人员帮助调试,可以使用该列表。 admin spark.authenticate 是否Spark认证其内部连接。如果不是运行在YARN上,请参见spark.authenticate.secret。 true spark.authenticate.secret 设置Spark各组件之间验证的密钥。如果不是运行在YARN上且认证未开启,需要设置该项。 - spark.modify.acls 逗号分隔的有权限修改Spark job的用户列表。默认情况下只有开启Spark job的用户才有修改列表的权限(例如删除列表)。 - spark.ui.view.acls 逗号分隔的有权限访问Spark web ui的用户列表。默认情况下只有开启Spark job的用户才有访问权限。 -
  • 开启Spark进程间的认证机制 目前Spark进程间支持共享密钥方式的认证机制,通过配置spark.authenticate可以控制Spark在通信过程中是否做认证。这种认证方式只是通过简单的握手来确定通信双方享有共同的密钥。 在Spark客户端的“spark-defaults.conf”文件中配置如下参数。 表26 参数说明 参数 描述 取值示例 spark.authenticate 在Spark on YARN模式下,将该参数配置成true即可。密钥的生成和分发过程是自动完成的,并且每个应用独占一个密钥。 true
  • Compression 数据压缩是一个以CPU换内存的优化策略,因此当Spark内存严重不足的时候(由于内存计算的特质,这种情况非常常见),使用压缩可以大幅提高性能。目前Spark支持三种压缩算法:snappy,lz4,lzf。Snappy为默认压缩算法,并且调用native方法进行压缩与解压缩,在Yarn模式下需要注意堆外内存对Container进程的影响。 表27 参数说明 参数 描述 取值示例 spark.io.compression.codec 用于压缩内部数据的codec,例如RDD分区、广播变量和shuffle输出。默认情况下,Spark支持三种压缩算法:lz4,lzf和snappy。可以使用完全合格的类名称指定算法,例如org.apache.spark.io.LZ4CompressionCodec、org.apache.spark.io.LZFCompressionCodec及org.apache.spark.io.SnappyCompressionCodec。 lz4 spark.io.compression.lz4.block.size 当使用LZ4压缩算法时LZ4压缩中使用的块大小(字节)。当使用LZ4时降低块大小同样也会降低shuffle内存使用。 32768 spark.io.compression.snappy.block.size 当使用Snappy压缩算法时Snappy压缩中使用的块大小(字节)。当使用Snappy时降低块大小同样也会降低shuffle内存使用。 32768 spark.shuffle.compress 是否压缩map任务输出文件。建议压缩。使用spark.io.compression.codec进行压缩。 true spark.shuffle.spill.compress 是否压缩在shuffle期间溢出的数据。使用spark.io.compression.codec进行压缩。 true spark.eventLog.compress 设置当spark.eventLog.enabled设置为true时是否压缩记录的事件。 false spark.broadcast.compress 在发送之前是否压缩广播变量。建议压缩。 true spark.rdd.compress 是否压缩序列化的RDD分区(例如StorageLevel.MEMORY_ONLY_SER的分区)。牺牲部分额外CPU的时间可以节省大量空间。 false
  • Broadcast Broadcast用于Spark进程间数据块的传输。Spark中无论Jar包、文件还是闭包以及返回的结果都会使用Broadcast。目前的Broadcast支持两种方式,Torrent与HTTP。前者将会把数据切成小片,分布到集群中,有需要时从远程获取;后者将文件存入到本地磁盘,有需要时通过HTTP方式将整个文件传输到远端。前者稳定性优于后者,因此Torrent为默认的Broadcast方式。 表19 参数说明 参数 描述 取值示例 spark.broadcast.factory 使用的广播方式。 org.apache.spark.broadcast.TorrentBroadcastFactory spark.broadcast.blockSize TorrentBroadcastFactory的块大小。该值过大会降低广播时的并行度(速度变慢),过小可能会影响BlockManager的性能。 4096 spark.broadcast.compress 在发送广播变量之前是否压缩。建议压缩。 true
  • TIMEOUT Spark默认配置能很好的处理中等数据规模的计算任务,但一旦数据量过大,会经常出现超时导致任务失败的场景。在大数据量场景下,需调大Spark中的超时参数。 表23 参数说明 参数 描述 取值示例 spark.files.fetchTimeout 获取通过驱动程序的SparkContext.addFile()添加的文件时的通信超时(秒)。 60s spark.network.timeout 所有网络交互的默认超时(秒)。如未配置,则使用该配置代替spark.core.connection.ack.wait.timeout, spark.akka.timeout, spark.storage.blockManagerSlaveTimeoutMs或spark.shuffle.io.connectionTimeout。 360s spark.core.connection.ack.wait.timeout 连接时应答的超时时间(单位:秒)。为了避免由于GC带来的长时间等待,可以设置更大的值。 60
  • PORT 表21 参数说明 参数 描述 取值示例 spark.ui.port 应用仪表盘的端口,显示内存和工作负载数据。 JD BCS erver2x:4040 SparkResource2x:0 spark.blockManager.port 所有BlockManager监测的端口。这些同时存在于Driver和Executor上。 随机端口范围 spark.driver.port Driver监测的端口,用于Driver与Executor进行通信。 随机端口范围
  • Storage 内存计算是Spark的最大亮点,Spark的Storage主要管理内存资源。Storage中主要存储RDD在Cache过程中产生的数据块。JVM中堆内存是整体的,因此在Spark的Storage管理中,“Storage Memory Size”变成了一个非常重要的概念。 表20 参数说明 参数 描述 取值示例 spark.storage.memoryMapThreshold 超过该块大小的Block,Spark会对该磁盘文件进行内存映射。这可以防止Spark在内存映射时映射过小的块。一般情况下,对接近或低于操作系统的页大小的块进行内存映射会有高开销。 2m
  • EventLog Spark应用在运行过程中,实时将运行状态以JSON格式写入文件系统,用于HistoryServer服务读取并重现应用运行时状态。 表16 参数说明 参数 描述 取值示例 spark.eventLog.enabled 是否记录Spark事件,用于应用程序在完成后重构webUI。 true spark.eventLog.dir 如果spark.eventLog.enabled为true,记录Spark事件的目录。在此目录下,Spark为每个应用程序创建文件,并将应用程序的事件记录到文件中。用户也可设置为统一的与HDFS目录相似的地址,这样History server就可以读取历史文件。 hdfs://hacluster/spark2xJobHistory2x spark.eventLog.compress spark.eventLog.enabled为true时,是否压缩记录的事件。 false
  • EventLog的周期清理 JobHistory上的Event log是随每次任务的提交而累积的,任务提交的次数多了之后会造成太多文件的存放。Spark提供了周期清理Event log的功能,用户可以通过配置开关和相应的清理周期参数来进行控制。 表17 参数说明 参数 描述 取值示例 spark.history.fs.cleaner.enabled 是否打开清理功能。 true spark.history.fs.cleaner.interval 清理功能的检查周期。 1d spark.history.fs.cleaner.maxAge 日志的最长保留时间。 4d
  • Kryo Kryo是一个非常高效的Java序列化框架,Spark中也默认集成了该框架。几乎所有的Spark性能调优都离不开将Spark默认的序列化器转化为Kryo序列化器的过程。目前Kryo序列化只支持Spark数据层面的序列化,还不支持闭包的序列化。设置Kryo序列元,需要将配置项“spark.serializer”设置为“org.apache.spark.serializer.KryoSerializer”,同时也搭配设置以下的配置项,优化Kryo序列化的性能。 表18 参数说明 参数 描述 取值示例 spark.kryo.classesToRegister 使用Kryo序列化时,需要注册到Kryo的类名,多个类之间用逗号分隔。 - spark.kryo.referenceTracking 当使用Kryo序列化数据时,是否跟踪对同一个对象的引用情况。适用于对象图有循环引用或同一对象有多个副本的情况。否则可以设置为关闭以提升性能。 true spark.kryo.registrationRequired 是否需要使用Kryo来注册对象。当设为“true”时,如果序列化一个未使用Kryo注册的对象则会发生异常。当设为“false”(默认值)时,Kryo会将未注册的类名称一同写到序列化对象中。该操作会带来大量性能开销,所以在用户还没有从注册队列中删除相应的类时应该开启该选项。 false spark.kryo.registrator 如果使用Kryo序列化,使用Kryo将该类注册至定制类。如果需要以定制方式注册类,例如指定一个自定义字段序列化器,可使用该属性。否则spark.kryo.classesToRegister会更简单。它应该设置为一个扩展KryoRegistrator的类。 - spark.kryoserializer.buffer.max Kryo序列化缓冲区允许的最大值,单位为兆字节。这个值必须大于尝试序列化的对象。当在Kryo中遇到“buffer limit exceeded”异常时可以适当增大该值。也可以通过配置项spark.kryoserializer.buffer.max配置。 64MB spark.kryoserializer.buffer Kryo序列化缓冲区的初始值,单位为兆字节。每个worker的每个核心都会有一个缓冲区。如果有需要,缓冲区会增大到spark.kryoserializer.buffer.max设置的值。也可以通过配置项spark.kryoserializer.buffer配置。 64KB
  • Executor配置 Executor也是单独一个Java进程,但不像Driver和AM只有一个,Executor可以有多个进程,而目前Spark只支持相同的配置,即所有Executor的进程参数都必然是一样的。 表12 参数说明 参数 描述 取值示例 spark.executor.extraJavaOptions 传递至Executor的额外JVM选项。例如,GC设置或其他日志记录。请注意不能通过此选项设置Spark属性或heap大小。Spark属性应该使用SparkConf对象或调用spark-submit脚本时指定的spark-defaults.conf文件来设置。Heap大小可以通过spark.executor.memory来设置。 参考快速配置Spark参数 spark.executor.extraClassPath 附加至Executor classpath的额外的classpath。这主要是为了向后兼容Spark的历史版本。用户一般不用设置此选项。 - spark.executor.extraLibraryPath 设置启动executor JVM时所使用的特殊的library path。 参考快速配置Spark参数 spark.executor.userClassPathFirst (试验性)与spark.driver.userClassPathFirst相同的功能,但应用于Executor实例。 false spark.executor.memory 每个Executor进程使用的内存数量,与JVM内存设置字符串的格式相同(例如:512M,2G)。 4G spark.executorEnv.[EnvironmentVariableName] 添加由EnvironmentVariableName指定的环境变量至executor进程。用户可以指定多个来设置多个环境变量。 - spark.executor.logs.rolling.maxRetainedFiles 设置系统即将保留的最新滚动日志文件的数量。旧的日志文件将被删除。默认关闭。 - spark.executor.logs.rolling.size.maxBytes 设置滚动Executor日志的文件的最大值。默认关闭。数值以字节为单位设置。如果要自动清除旧日志,请查看spark.executor.logs.rolling.maxRetainedFiles。 - spark.executor.logs.rolling.strategy 设置executor日志的滚动策略。默认滚动关闭。可以设置为“time”(基于时间的滚动)或“size”(基于大小的滚动)。当设置为“time”,使用spark.executor.logs.rolling.time.interval属性的值作为日志滚动的间隔。当设置为“size”,使用spark.executor.logs.rolling.size.maxBytes设置滚动的最大文件大小滚动。 - spark.executor.logs.rolling.time.interval 设置executor日志滚动的时间间隔。默认关闭。合法值为“daily”、“hourly”、“minutely”或任意秒。如果要自动清除旧日志,请查看spark.executor.logs.rolling.maxRetainedFiles。 daily
  • ExecutorLaucher配置 ExecutorLauncher只有在Yarn-Client模式下才会存在的角色,Yarn-Client模式下,ExecutorLauncher和Driver不在同一个进程中,需要对ExecutorLauncher的参数进行特殊的配置。 表11 参数说明 参数 描述 取值示例 spark.yarn.am.extraJavaOptions 在Client模式下传递至YARN Application Master的一系列额外JVM选项。在Cluster模式下使用spark.driver.extraJavaOptions。 参考快速配置Spark参数 spark.yarn.am.memory 针对Client模式下YARN Application Master使用的内存数量,与JVM内存设置字符串格式一致(例如:512m,2g)。在集群模式下,使用spark.driver.memory。 1G spark.yarn.am.memoryOverhead 和“spark.yarn.driver.memoryOverhead”一样,但只针对Client模式下的Application Master。 - spark.yarn.am.cores 针对Client模式下YARN Application Master使用的核数。在Cluster模式下,使用spark.driver.cores。 1
  • WebUI WebUI展示了Spark应用运行的过程和状态。 表13 参数说明 参数 描述 取值示例 spark.ui.killEnabled 允许停止Web UI中的stage和相应的job。 说明: 出于安全考虑,将此配置项的默认值设置成false,以避免用户发生误操作。如果需要开启此功能,则可以在spark-defaults.conf配置文件中将此配置项的值设为true。请谨慎操作。 true spark.ui.port 应用程序dashboard的端口,显示内存和工作量数据。 JDB CS erver2x:4040 SparkResource2x:0 IndexServer2x:22901 spark.ui.retainedJobs 在垃圾回收之前Spark UI和状态API记住的job数。 1000 spark.ui.retainedStages 在垃圾回收之前Spark UI和状态API记住的stage数。 1000
  • HistoryServer HistoryServer读取文件系统中的EventLog文件,展示已经运行完成的Spark应用在运行时的状态信息。 表14 参数说明 参数 描述 取值示例 spark.history.fs.logDirectory History server的日志目录 - spark.history.ui.port JobHistory侦听连接的端口。 18080 spark.history.fs.updateInterval History server所显示信息的更新周期,单位为秒。每次更新检查持久存储中针对事件日志进行的更改。 10s spark.history.fs.update.interval.seconds 每个事件日志更新检查的间隔。与spark.history.fs.updateInterval功能相同,推荐使用spark.history.fs.updateInterval。 10s spark.history.updateInterval 该配置项与spark.history.fs.update.interval.seconds和spark.history.fs.updateInterval功能相同,推荐使用spark.history.fs.updateInterval。 10s
  • Spark Streaming Kafka Receiver是Spark Streaming一个重要的组成部分,它负责接收外部数据,并将数据封装为Block,提供给Streaming消费。最常见的数据源是Kafka,Spark Streaming对Kafka的集成也是最完善的,不仅有可靠性的保障,而且也支持从Kafka直接作为RDD输入。 表7 参数说明 参数 描述 取值示例 spark.streaming.kafka.maxRatePerPartition 使用Kafka direct stream API时,从每个Kafka分区读取数据的最大速率(每秒记录数量)。 - spark.streaming.blockInterval 在被存入Spark之前Spark Streaming Receiver接收数据累积成数据块的间隔(毫秒)。推荐最小值为50毫秒。 200ms spark.streaming.receiver.maxRate 每个Receiver接收数据的最大速率(每秒记录数量)。配置设置为0或者负值将不会对速率设限。 - spark.streaming.receiver.writeAheadLog.enable 是否使用ReliableKafkaReceiver。该Receiver支持流式数据不丢失。 false
  • Netty/NIO及Hash/Sort配置 Shuffle是大数据处理中最重要的一个性能点,网络是整个Shuffle过程的性能点。目前Spark支持两种Shuffle方式,一种是Hash,另外一种是Sort。网络也有两种方式,Netty和NIO。 表8 参数说明 参数 描述 取值示例 spark.shuffle.manager 处理数据的方式。有两种实现方式可用:sort和hash。sort shuffle对内存的使用率更高,是Spark 1.2及后续版本的默认选项。Spark2.x及后续版本不支持hash。 SORT spark.shuffle.consolidateFiles (仅hash方式)如果要合并在shuffle过程中创建的中间文件,需要将该值设置为“true”。文件创建的少可以提高文件系统处理性能,降低风险。使用ext4或者xfs文件系统时,建议设置为“true”。由于文件系统限制,在ext3上该设置可能会降低8核以上机器的处理性能。 false spark.shuffle.sort.bypassMergeThreshold 该参数只适用于spark.shuffle.manager设置为sort时。在不做map端聚合并且reduce任务的partition数小于或等于该值时,避免对数据进行归并排序,防止系统处理不必要的排序引起性能下降。 200 spark.shuffle.io.maxRetries (仅Netty方式)如果设为非零值,由于IO相关的异常导致的fetch失败会自动重试。该重试逻辑有助于大型shuffle在发生长GC暂停或者网络闪断时保持稳定。 12 spark.shuffle.io.numConnectionsPerPeer (仅Netty方式)为了减少大型集群的连接创建,主机间的连接会被重新使用。对于拥有较多硬盘和少数主机的集群,此操作可能会导致并发性不足以占用所有磁盘,所以用户可以考虑增加此值。 1 spark.shuffle.io.preferDirectBufs (仅Netty方式)使用off-heap缓冲区减少shuffle和高速缓存块转移期间的垃圾回收。对于off-heap内存被严格限制的环境,用户可以将其关闭以强制所有来自Netty的申请使用堆内存。 true spark.shuffle.io.retryWait (仅Netty方式)等待fetch重试期间的时间(秒)。重试引起的最大延迟为maxRetries * retryWait,默认是15秒。 5
  • Spark Streaming Spark Streaming是在Spark批处理平台提供的流式数据的处理能力,以“mini-batch”的方式处理从外部输入的数据。 在Spark客户端的“{客户端安装目录}/Spark/spark/conf/spark-defaults.conf”文件中配置如下参数。 表6 参数说明 参数 描述 取值示例 spark.streaming.receiver.writeAheadLog.enable 启用预写日志(WAL)功能。所有通过Receiver接收的输入数据将被保存至预写日志,预写日志可以保证Driver程序出错后数据可以恢复。 false spark.streaming.unpersist 由Spark Streaming产生和保存的RDDs自动从Spark的内存中强制移除。Spark Streaming接收的原始输入数据也将自动清除。设置为false时原始输入数据和存留的RDDs不会自动清除,因此在streaming应用外部依然可以访问,但是这会占用更多的Spark内存。 true
  • 普通Shuffle配置 表9 参数说明 参数 描述 取值示例 spark.shuffle.spill 如果设为“true”,通过将数据溢出至磁盘来限制reduce任务期间内存的使用量。 true spark.shuffle.spill.compress 是否压缩shuffle期间溢出的数据。使用spark.io.compression.codec指定的算法进行数据压缩。 true spark.shuffle.file.buffer 每个shuffle文件输出流的内存缓冲区大小(单位:KB)。这些缓冲区可以减少创建中间shuffle文件流过程中产生的磁盘寻道和系统调用次数。也可以通过配置项spark.shuffle.file.buffer.kb设置。 32KB spark.shuffle.compress 是否压缩map任务输出文件。建议压缩。使用spark.io.compression.codec进行压缩。 true spark.reducer.maxSizeInFlight 从每个reduce任务同时fetch的map任务输出最大值(单位:MB)。由于每个输出要求创建一个缓冲区进行接收,这代表了每个reduce任务固定的内存开销,所以除非拥有大量内存,否则保持低值。也可以通过配置项spark.reducer.maxMbInFlight设置。 48MB
  • Driver配置 Spark Driver可以理解为Spark提交应用的客户端,所有的代码解析工作都在这个进程中完成,因此该进程的参数尤其重要。下面将以如下顺序介绍Spark中进程的参数设置: JavaOptions:Java命令中“-D”后面的参数,可以由System.getProperty获取。 ClassPath:包括Java类和Native的Lib加载路径。 Java Memory and Cores:Java进程的内存和CPU使用量。 Spark Configuration:Spark内部参数,与Java进程无关。 表10 参数说明 参数 描述 取值示例 spark.driver.extraJavaOptions 传递至driver(驱动程序)的一系列额外JVM选项。例如,GC设置或其他日志记录。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。 参考快速配置Spark参数 spark.driver.extraClassPath 附加至driver的classpath的额外classpath条目。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。 参考快速配置Spark参数 spark.driver.userClassPathFirst (试验性)当在驱动程序中加载类时,是否授权用户添加的jar优先于Spark自身的jar。这种特性可用于减缓Spark依赖和用户依赖之间的冲突。目前该特性仍处于试验阶段,仅用于Cluster模式中。 false spark.driver.extraLibraryPath 设置一个特殊的library path在启动驱动程序JVM时使用。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。 JDBCServer2x: ${SPARK_INSTALL_HOME}/spark/native SparkResource2x: ${DATA_NODE_INSTALL_HOME}/hadoop/lib/native spark.driver.cores 驱动程序进程使用的核数。仅适用于Cluster模式。 1 spark.driver.memory 驱动程序进程使用的内存数量,即SparkContext初始化的进程(例如:512M, 2G)。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。 4G spark.driver.maxResultSize 对每个Spark action操作(例如“collect”)的所有分区序列化结果的总量限制,至少1M,设置成0表示不限制。如果总量超过该限制,工作任务会中止。限制值设置过高可能会引起驱动程序的内存不足错误(取决于spark.driver.memory和JVM的对象内存开销)。设置合理的限制可以避免驱动程序出现内存不足的错误。 1G spark.driver.host Driver监测的主机名或IP地址,用于Driver与Executor进行通信。 (local hostname) spark.driver.port Driver监测的端口,用于Driver与Executor进行通信。 (random)
  • Python Spark Python Spark是Spark除了Scala、Java两种API之外的第三种编程语言。不同于Java和Scala都是在JVM平台上运行,Python Spark不仅会有JVM进程,还会有自身的Python进程。以下配置项只适用于Python Spark场景,而其他配置项也同样可以在Python Spark中生效。 表4 参数说明 参数 描述 取值示例 spark.python.profile 在Python worker中开启profiling。通过sc.show_profiles()展示分析结果。或者在driver退出前展示分析结果。可以通过sc.dump_profiles(path) 将结果转储到磁盘中。如果一些分析结果已经手动展示,那么在Driver退出前,它们将不会再自动展示。 默认使用pyspark.profiler.BasicProfiler,可以在初始化SparkContext时传入指定的profiler来覆盖默认的profiler。 false spark.python.worker.memory 聚合过程中每个python worker进程所能使用的内存大小,其值格式同指定JVM内存一致,如512m,2g。如果进程在聚集期间所用的内存超过了该值,数据将会被写入磁盘。 512m spark.python.worker.reuse 是否重用python worker。如是,它将使用固定数量的Python workers,那么下一批提交的task将重用这些Python workers,而不是为每个task重新fork一个Python进程。 该功能在大型广播下非常有用,因为此时对下一批提交的task不需要将数据从JVM再一次传输至Python worker。 true
  • Spark长时间任务安全认证配置 安全模式下,使用Spark CLI(如spark shell、spark sql、spark submit)时,如果使用kinit命令进行安全认证,当执行长时间运行任务时,会因为认证过期导致任务失败。 在客户端的“{客户端安装目录}/Spark/spark/conf/spark-defaults.conf”配置文件中设置如下参数,配置完成后,重新执行Spark CLI即可。 当参数值为“true”时,需要保证“spark-defaults.conf”和“hive-site.xml”中的Keytab和principal的值相同。 表3 参数说明 参数名称 含义 取值示例 spark.kerberos.principal 具有Spark操作权限的principal。请联系MRS集群管理员获取对应principal。 - spark.kerberos.keytab 具有Spark操作权限的Keytab文件名称和文件路径。请联系MRS集群管理员获取对应Keytab文件。 - spark.security.bigdata.loginOnce Principal用户是否只登录一次。true为单次登录;false为多次登录。 单次登录与多次登录的区别在于:Spark社区使用多次Kerberos用户登录多次的方案,但容易出现TGT过期或者Token过期异常导致应用无法长时间运行。MRS修改了Kerberos登录方式,只允许用户登录一次,可以有效的解决过期问题。限制在于,Hive相关的principal与keytab的配置项必须与Spark配置相同。 说明: 当参数值为true时,需要保证“spark-defaults.conf”和“hive-site.xml”中的Keytab和principal的值相同。 true
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全