华为云用户手册

  • Executor配置 Executor也是单独一个Java进程,但不像Driver和AM只有一个,Executor可以有多个进程,而目前Spark只支持相同的配置,即所有Executor的进程参数都必然是一样的。 表12 参数说明 参数 描述 取值示例 spark.executor.extraJavaOptions 传递至Executor的额外JVM选项。例如,GC设置或其他日志记录。请注意不能通过此选项设置Spark属性或heap大小。Spark属性应该使用SparkConf对象或调用spark-submit脚本时指定的spark-defaults.conf文件来设置。Heap大小可以通过spark.executor.memory来设置。 参考快速配置Spark参数 spark.executor.extraClassPath 附加至Executor classpath的额外的classpath。这主要是为了向后兼容Spark的历史版本。用户一般不用设置此选项。 - spark.executor.extraLibraryPath 设置启动executor JVM时所使用的特殊的library path。 参考快速配置Spark参数 spark.executor.userClassPathFirst (试验性)与spark.driver.userClassPathFirst相同的功能,但应用于Executor实例。 false spark.executor.memory 每个Executor进程使用的内存数量,与JVM内存设置字符串的格式相同(例如:512M,2G)。 4G spark.executorEnv.[EnvironmentVariableName] 添加由EnvironmentVariableName指定的环境变量至executor进程。用户可以指定多个来设置多个环境变量。 - spark.executor.logs.rolling.maxRetainedFiles 设置系统即将保留的最新滚动日志文件的数量。旧的日志文件将被删除。默认关闭。 - spark.executor.logs.rolling.size.maxBytes 设置滚动Executor日志的文件的最大值。默认关闭。数值以字节为单位设置。如果要自动清除旧日志,请查看spark.executor.logs.rolling.maxRetainedFiles。 - spark.executor.logs.rolling.strategy 设置executor日志的滚动策略。默认滚动关闭。可以设置为“time”(基于时间的滚动)或“size”(基于大小的滚动)。当设置为“time”,使用spark.executor.logs.rolling.time.interval属性的值作为日志滚动的间隔。当设置为“size”,使用spark.executor.logs.rolling.size.maxBytes设置滚动的最大文件大小滚动。 - spark.executor.logs.rolling.time.interval 设置executor日志滚动的时间间隔。默认关闭。合法值为“daily”、“hourly”、“minutely”或任意秒。如果要自动清除旧日志,请查看spark.executor.logs.rolling.maxRetainedFiles。 daily
  • HistoryServer HistoryServer读取文件系统中的EventLog文件,展示已经运行完成的Spark应用在运行时的状态信息。 表14 参数说明 参数 描述 取值示例 spark.history.fs.logDirectory History server的日志目录 - spark.history.ui.port JobHistory侦听连接的端口。 18080 spark.history.fs.updateInterval History server所显示信息的更新周期,单位为秒。每次更新检查持久存储中针对事件日志进行的更改。 10s spark.history.fs.update.interval.seconds 每个事件日志更新检查的间隔。与spark.history.fs.updateInterval功能相同,推荐使用spark.history.fs.updateInterval。 10s spark.history.updateInterval 该配置项与spark.history.fs.update.interval.seconds和spark.history.fs.updateInterval功能相同,推荐使用spark.history.fs.updateInterval。 10s
  • ExecutorLaucher配置 ExecutorLauncher只有在Yarn-Client模式下才会存在的角色,Yarn-Client模式下,ExecutorLauncher和Driver不在同一个进程中,需要对ExecutorLauncher的参数进行特殊的配置。 表11 参数说明 参数 描述 取值示例 spark.yarn.am.extraJavaOptions 在Client模式下传递至YARN Application Master的一系列额外JVM选项。在Cluster模式下使用spark.driver.extraJavaOptions。 参考快速配置Spark参数 spark.yarn.am.memory 针对Client模式下YARN Application Master使用的内存数量,与JVM内存设置字符串格式一致(例如:512m,2g)。在集群模式下,使用spark.driver.memory。 1G spark.yarn.am.memoryOverhead 和“spark.yarn.driver.memoryOverhead”一样,但只针对Client模式下的Application Master。 - spark.yarn.am.cores 针对Client模式下YARN Application Master使用的核数。在Cluster模式下,使用spark.driver.cores。 1
  • WebUI WebUI展示了Spark应用运行的过程和状态。 表13 参数说明 参数 描述 取值示例 spark.ui.killEnabled 允许停止Web UI中的stage和相应的job。 说明: 出于安全考虑,将此配置项的默认值设置成false,以避免用户发生误操作。如果需要开启此功能,则可以在spark-defaults.conf配置文件中将此配置项的值设为true。请谨慎操作。 true spark.ui.port 应用程序dashboard的端口,显示内存和工作量数据。 JD BCS erver2x:4040 SparkResource2x:0 IndexServer2x:22901 spark.ui.retainedJobs 在垃圾回收之前Spark UI和状态API记住的job数。 1000 spark.ui.retainedStages 在垃圾回收之前Spark UI和状态API记住的stage数。 1000
  • Spark Streaming Kafka Receiver是Spark Streaming一个重要的组成部分,它负责接收外部数据,并将数据封装为Block,提供给Streaming消费。最常见的数据源是Kafka,Spark Streaming对Kafka的集成也是最完善的,不仅有可靠性的保障,而且也支持从Kafka直接作为RDD输入。 表7 参数说明 参数 描述 取值示例 spark.streaming.kafka.maxRatePerPartition 使用Kafka direct stream API时,从每个Kafka分区读取数据的最大速率(每秒记录数量)。 - spark.streaming.blockInterval 在被存入Spark之前Spark Streaming Receiver接收数据累积成数据块的间隔(毫秒)。推荐最小值为50毫秒。 200ms spark.streaming.receiver.maxRate 每个Receiver接收数据的最大速率(每秒记录数量)。配置设置为0或者负值将不会对速率设限。 - spark.streaming.receiver.writeAheadLog.enable 是否使用ReliableKafkaReceiver。该Receiver支持流式数据不丢失。 false
  • 普通Shuffle配置 表9 参数说明 参数 描述 取值示例 spark.shuffle.spill 如果设为“true”,通过将数据溢出至磁盘来限制reduce任务期间内存的使用量。 true spark.shuffle.spill.compress 是否压缩shuffle期间溢出的数据。使用spark.io.compression.codec指定的算法进行数据压缩。 true spark.shuffle.file.buffer 每个shuffle文件输出流的内存缓冲区大小(单位:KB)。这些缓冲区可以减少创建中间shuffle文件流过程中产生的磁盘寻道和系统调用次数。也可以通过配置项spark.shuffle.file.buffer.kb设置。 32KB spark.shuffle.compress 是否压缩map任务输出文件。建议压缩。使用spark.io.compression.codec进行压缩。 true spark.reducer.maxSizeInFlight 从每个reduce任务同时fetch的map任务输出最大值(单位:MB)。由于每个输出要求创建一个缓冲区进行接收,这代表了每个reduce任务固定的内存开销,所以除非拥有大量内存,否则保持低值。也可以通过配置项spark.reducer.maxMbInFlight设置。 48MB
  • Driver配置 Spark Driver可以理解为Spark提交应用的客户端,所有的代码解析工作都在这个进程中完成,因此该进程的参数尤其重要。下面将以如下顺序介绍Spark中进程的参数设置: JavaOptions:Java命令中“-D”后面的参数,可以由System.getProperty获取。 ClassPath:包括Java类和Native的Lib加载路径。 Java Memory and Cores:Java进程的内存和CPU使用量。 Spark Configuration:Spark内部参数,与Java进程无关。 表10 参数说明 参数 描述 取值示例 spark.driver.extraJavaOptions 传递至driver(驱动程序)的一系列额外JVM选项。例如,GC设置或其他日志记录。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。 参考快速配置Spark参数 spark.driver.extraClassPath 附加至driver的classpath的额外classpath条目。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。 参考快速配置Spark参数 spark.driver.userClassPathFirst (试验性)当在驱动程序中加载类时,是否授权用户添加的jar优先于Spark自身的jar。这种特性可用于减缓Spark依赖和用户依赖之间的冲突。目前该特性仍处于试验阶段,仅用于Cluster模式中。 false spark.driver.extraLibraryPath 设置一个特殊的library path在启动驱动程序JVM时使用。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。 JDB CS erver2x: ${SPARK_INSTALL_HOME}/spark/native SparkResource2x: ${DATA_NODE_INSTALL_HOME}/hadoop/lib/native spark.driver.cores 驱动程序进程使用的核数。仅适用于Cluster模式。 1 spark.driver.memory 驱动程序进程使用的内存数量,即SparkContext初始化的进程(例如:512M, 2G)。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。 4G spark.driver.maxResultSize 对每个Spark action操作(例如“collect”)的所有分区序列化结果的总量限制,至少1M,设置成0表示不限制。如果总量超过该限制,工作任务会中止。限制值设置过高可能会引起驱动程序的内存不足错误(取决于spark.driver.memory和JVM的对象内存开销)。设置合理的限制可以避免驱动程序出现内存不足的错误。 1G spark.driver.host Driver监测的主机名或IP地址,用于Driver与Executor进行通信。 (local hostname) spark.driver.port Driver监测的端口,用于Driver与Executor进行通信。 (random)
  • Spark Streaming Spark Streaming是在Spark批处理平台提供的流式数据的处理能力,以“mini-batch”的方式处理从外部输入的数据。 在Spark客户端的“{客户端安装目录}/Spark/spark/conf/spark-defaults.conf”文件中配置如下参数。 表6 参数说明 参数 描述 取值示例 spark.streaming.receiver.writeAheadLog.enable 启用预写日志(WAL)功能。所有通过Receiver接收的输入数据将被保存至预写日志,预写日志可以保证Driver程序出错后数据可以恢复。 false spark.streaming.unpersist 由Spark Streaming产生和保存的RDDs自动从Spark的内存中强制移除。Spark Streaming接收的原始输入数据也将自动清除。设置为false时原始输入数据和存留的RDDs不会自动清除,因此在streaming应用外部依然可以访问,但是这会占用更多的Spark内存。 true
  • Netty/NIO及Hash/Sort配置 Shuffle是大数据处理中最重要的一个性能点,网络是整个Shuffle过程的性能点。目前Spark支持两种Shuffle方式,一种是Hash,另外一种是Sort。网络也有两种方式,Netty和NIO。 表8 参数说明 参数 描述 取值示例 spark.shuffle.manager 处理数据的方式。有两种实现方式可用:sort和hash。sort shuffle对内存的使用率更高,是Spark 1.2及后续版本的默认选项。Spark2.x及后续版本不支持hash。 SORT spark.shuffle.consolidateFiles (仅hash方式)如果要合并在shuffle过程中创建的中间文件,需要将该值设置为“true”。文件创建的少可以提高文件系统处理性能,降低风险。使用ext4或者xfs文件系统时,建议设置为“true”。由于文件系统限制,在ext3上该设置可能会降低8核以上机器的处理性能。 false spark.shuffle.sort.bypassMergeThreshold 该参数只适用于spark.shuffle.manager设置为sort时。在不做map端聚合并且reduce任务的partition数小于或等于该值时,避免对数据进行归并排序,防止系统处理不必要的排序引起性能下降。 200 spark.shuffle.io.maxRetries (仅Netty方式)如果设为非零值,由于IO相关的异常导致的fetch失败会自动重试。该重试逻辑有助于大型shuffle在发生长GC暂停或者网络闪断时保持稳定。 12 spark.shuffle.io.numConnectionsPerPeer (仅Netty方式)为了减少大型集群的连接创建,主机间的连接会被重新使用。对于拥有较多硬盘和少数主机的集群,此操作可能会导致并发性不足以占用所有磁盘,所以用户可以考虑增加此值。 1 spark.shuffle.io.preferDirectBufs (仅Netty方式)使用off-heap缓冲区减少shuffle和高速缓存块转移期间的垃圾回收。对于off-heap内存被严格限制的环境,用户可以将其关闭以强制所有来自Netty的申请使用堆内存。 true spark.shuffle.io.retryWait (仅Netty方式)等待fetch重试期间的时间(秒)。重试引起的最大延迟为maxRetries * retryWait,默认是15秒。 5
  • Python Spark Python Spark是Spark除了Scala、Java两种API之外的第三种编程语言。不同于Java和Scala都是在JVM平台上运行,Python Spark不仅会有JVM进程,还会有自身的Python进程。以下配置项只适用于Python Spark场景,而其他配置项也同样可以在Python Spark中生效。 表4 参数说明 参数 描述 取值示例 spark.python.profile 在Python worker中开启profiling。通过sc.show_profiles()展示分析结果。或者在driver退出前展示分析结果。可以通过sc.dump_profiles(path) 将结果转储到磁盘中。如果一些分析结果已经手动展示,那么在Driver退出前,它们将不会再自动展示。 默认使用pyspark.profiler.BasicProfiler,可以在初始化SparkContext时传入指定的profiler来覆盖默认的profiler。 false spark.python.worker.memory 聚合过程中每个python worker进程所能使用的内存大小,其值格式同指定JVM内存一致,如512m,2g。如果进程在聚集期间所用的内存超过了该值,数据将会被写入磁盘。 512m spark.python.worker.reuse 是否重用python worker。如是,它将使用固定数量的Python workers,那么下一批提交的task将重用这些Python workers,而不是为每个task重新fork一个Python进程。 该功能在大型广播下非常有用,因为此时对下一批提交的task不需要将数据从JVM再一次传输至Python worker。 true
  • Spark长时间任务安全认证配置 安全模式下,使用Spark CLI(如spark shell、spark sql、spark submit)时,如果使用kinit命令进行安全认证,当执行长时间运行任务时,会因为认证过期导致任务失败。 在客户端的“{客户端安装目录}/Spark/spark/conf/spark-defaults.conf”配置文件中设置如下参数,配置完成后,重新执行Spark CLI即可。 当参数值为“true”时,需要保证“spark-defaults.conf”和“hive-site.xml”中的Keytab和principal的值相同。 表3 参数说明 参数名称 含义 取值示例 spark.kerberos.principal 具有Spark操作权限的principal。请联系 MRS 集群管理员获取对应principal。 - spark.kerberos.keytab 具有Spark操作权限的Keytab文件名称和文件路径。请联系MRS集群管理员获取对应Keytab文件。 - spark.security.bigdata.loginOnce Principal用户是否只登录一次。true为单次登录;false为多次登录。 单次登录与多次登录的区别在于:Spark社区使用多次Kerberos用户登录多次的方案,但容易出现TGT过期或者Token过期异常导致应用无法长时间运行。MRS修改了Kerberos登录方式,只允许用户登录一次,可以有效的解决过期问题。限制在于,Hive相关的principal与keytab的配置项必须与Spark配置相同。 说明: 当参数值为true时,需要保证“spark-defaults.conf”和“hive-site.xml”中的Keytab和principal的值相同。 true
  • 配置Stage失败重试次数 Spark任务在遇到FetchFailedException时会触发Stage重试。为了防止Stage无限重试,对Stage重试次数进行限制。重试次数可以根据实际需要进行调整。 在Spark客户端的“{客户端安装目录}/Spark/spark/conf/spark-defaults.conf”配置文件中配置如下参数。 表1 参数说明 参数 参数说明 取值示例 spark.stage.maxConsecutiveAttempts 用于控制一个Stage在失败后可以连续重新尝试的最大次数。 4
  • Dynamic Allocation 动态资源调度是On Yarn模式特有的特性,并且必须开启Yarn External Shuffle才能使用这个功能。在使用Spark作为一个常驻的服务时候,动态资源调度将大大的提高资源的利用率。例如JDBCServer服务,大多数时间该进程并不接受JDBC请求,因此将这段空闲时间的资源释放出来,将极大的节约集群的资源。 表5 参数说明 参数 描述 取值示例 spark.dynamicAllocation.enabled 是否使用动态资源调度,用于根据规模调整注册于该应用的executor的数量。注意目前仅在YARN模式下有效。 启用动态资源调度必须将spark.shuffle.service.enabled设置为true。以下配置也与此相关:spark.dynamicAllocation.minExecutors、spark.dynamicAllocation.maxExecutors和spark.dynamicAllocation.initialExecutors。 JDBCServer2x: true SparkResource2x: false spark.dynamicAllocation.minExecutors 最小Executor个数。 0 spark.dynamicAllocation.initialExecutors 初始Executor个数。 spark.dynamicAllocation.minExecutors spark.dynamicAllocation.maxExecutors 最大executor个数。 2048 spark.dynamicAllocation.schedulerBacklogTimeout 调度第一次超时时间。单位为秒。 1s spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 调度第二次及之后超时时间。 1s spark.dynamicAllocation.executorIdleTimeout 普通Executor空闲超时时间。单位为秒。 60 spark.dynamicAllocation.cachedExecutorIdleTimeout 含有cached blocks的Executor空闲超时时间。 JDBCServer2x:2147483647s IndexServer2x:2147483647s SparkResource2x:120
  • 配置是否使用笛卡尔积功能 要启动使用笛卡尔积功能,需要在Spark的“spark-defaults.conf”配置文件中进行如下设置。 JDBC应用在服务端的“spark-defaults.conf”配置文件中设置该参数。 Spark客户端提交的任务在客户端配的“spark-defaults.conf”配置文件中设置该参数。 表2 笛卡尔积参数说明 参数 说明 取值示例 spark.sql.crossJoin.enabled 是否允许隐性执行笛卡尔积。 “true”表示允许 “false”表示不允许,此时只允许query中显式包含CROSS JOIN语法。 true
  • 参数说明 表1 MemArtsCC参数说明 参数 参数说明 默认值 access_token_enable Access token认证的开关。 开启后,SDK通过worker读取缓存需要经过token校验。SDK首次向worker发送读请求时,worker会做一次Kerberos认证,生成一个密钥,保存在本地和ZooKeeper,然后用这个密钥生成一个token,返回给SDK,SDK向worker发送读请求时,会将该token传入,和密钥进行校验,校验通过才允许读取缓存。 安全集群为true,普通集群为false cache_cap_max_available_rate 每块盘的最大可用容量比率。 设置范围为0.01~1.0,间隔为0.01。本参数决定使用MemArtsCC磁盘最大容量百分比,默认值是30%,比如3TB的磁盘,MemArtsCC最大可使用的缓存空间为900GB,缓存超过900GB,MemArtsCC动态淘汰缓存。 0.3 cache_reserved_space 每块盘需要动态预留的空间。 cache_reserved_space决定磁盘预留空间,默认值为512MB,建议设置为磁盘容量的10%以上。比如3TB的磁盘,cache_reserved_space设置为300GB,cache_cap_max_available_rate设置为30%,如果磁盘空间小于300GB,尽管MemArtsCC的缓存没有达到最大可使用容量900GB,MemArtsCC也会动态淘汰缓存。 512MB auto_isolate_broken_disk 自动隔离故障磁盘开关。 true broken_disk_list 故障磁盘列表。 -
  • 前提条件 集群中已安装Doris、HDFS、Yarn、Flink和Kafka等服务。 待连接Doris数据库的节点与MRS集群网络互通。 创建具有Doris管理权限的用户。 集群已启用Kerberos认证(安全模式) 在 FusionInsight Manager中创建一个人机用户,例如“dorisuser”,创建一个拥有“Doris管理员权限”的角色绑定给该用户。 使用新建的用户dorisuser重新登录FusionInsight Manager,修改该用户初始密码。 集群未启用Kerberos认证(普通模式) 使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。 已安装MySQL客户端,相关操作可参考使用MySQL客户端连接Doris。 已安装Flink客户端。
  • 相关表参数配置说明 表1 创建Doris Sink表时可选配置参数 参数名称 参数默认值 是否必须配置 参数描述 doris.request.retries 3 否 向Doris发送请求的重试次数。 doris.request.connect.timeout.ms 30000 否 向Doris发送请求的连接超时时间。 doris.request.read.timeout.ms 30000 否 向Doris发送请求的读取超时时间。 sink.max-retries 3 否 Commit失败后的最大重试次数,默认为3次。 sink.enable.batch-mode false 否 是否使用攒批模式写入Doris,开启后写入时机不依赖CheckPoint,通过“sink.buffer-flush.max-rows”、“sink.buffer-flush.max-bytes”或“sink.buffer-flush.interval”参数来控制写入时机。 sink.buffer-flush.max-rows 50000 否 攒批模式下,单个批次最多写入的数据行数。 sink.buffer-flush.max-bytes 10MB 否 攒批模式下,单个批次最多写入的字节数。 sink.buffer-flush.interval 10s 否 攒批模式下,异步刷新缓存的间隔。 表2 Lookup Join场景下创建Flink表时的可选配置参数 参数名称 参数默认值 是否必须配置 参数描述 lookup.cache.max-rows -1 否 Lookup缓存的最大行数,默认值为:-1,表示不开启缓存。 lookup.cache.ttl 10s 否 Lookup缓存的最大时间,默认值为10s。 lookup.max-retries 1 否 Lookup查询失败后的重试次数。 lookup.jdbc.async false 否 是否开启异步Lookup,默认值为:false。 lookup.jdbc.read.batch.size 128 否 启用异步Lookup时,每次查询的最大批次大小。 lookup.jdbc.read.batch.queue-size 256 否 启用异步Lookup时,中间缓冲队列的大小。 lookup.jdbc.read.thread-size 3 否 每个Task中Lookup的JDBC线程数。
  • 约束与限制 如果当前组件使用了Ranger进行权限控制,须基于Ranger配置相关策略进行权限管理,具体操作可参考添加Spark2x的Ranger访问权限策略。 Spark2x开启或关闭Ranger鉴权后,需要重启Spark2x服务,并重新下载客户端,或刷新客户端配置文件spark/conf/spark-defaults.conf: 开启Ranger鉴权:spark.ranger.plugin.authorization.enable=true 关闭Ranger鉴权:spark.ranger.plugin.authorization.enable=false
  • 操作场景 该任务指导MRS集群管理员在Manager创建并设置SparkSQL的角色。SparkSQL角色可设置Spark管理员权限以及数据表的数据操作权限。 用户使用Hive并创建数据库需要加入hive组,不需要角色授权。用户在Hive和HDFS中对自己创建的数据库或表拥有完整权限,可直接创建表、查询数据、删除数据、插入数据、更新数据以及授权他人访问表与对应HDFS目录与文件。默认创建的数据库或表保存在HDFS目录“/user/hive/warehouse”。
  • 批量构建全局二级索引数据 只有处于INACTIVE状态的索引才能进行批量构建,如需重建索引数据,请先修改索引状态。 数据表中存在大量数据时,构建耗时较长,建议将nohup命令放在后台执行,避免操作被意外中断。 在HBase客户端执行以下命令可批量构建已有数据的索引数据: hbase org.apache.hadoop.hbase.hindex.global.mapreduce.GlobalTableIndexer -Dtablename.to.index='table' -Dindexnames.to.build='idx1' 相关参数介绍如下: tablename.to.index:表示需修改索引状态的数据表的名称。 indexnames.to.build:指定的需要批量生成数据的索引名,可以同时指定多个,用#号分隔。 hbase.gsi.cleandata.enabled(可选):表示构建索引数据前是否需要清空索引表,默认值为“false”。 hbase.gsi.cleandata.timeout(可选):表示构建索引数据前等待清空索引表超时时间,默认值为“1800”,单位为:秒。
  • 操作步骤 使用安装客户端的用户登录客户端所在节点,具体操作请参见使用客户端运行Loader作业。 执行以下命令,进入“backup.properties”文件所在目录。例如,Loader客户端安装目录为 “/opt/client/Loader/”。 cd /opt/client/Loader/loader-tools-1.99.3/loader-backup/conf 执行以下命令,修改“backup.properties”文件的配置参数,参数具体说明如表1所示。 vi backup.properties server.url = 10.0.0.1:21351,10.0.0.2:12000 authentication.type = kerberos authentication.user = authentication.password= job.jobId = 1 use.keytab = true client.principal = loader/hadoop client.keytab = /opt/client/conf/loader.keytab 表1 配置参数说明 配置参数 说明 示例 server.url Loader服务的浮动IP地址和端口(21351)。 为了兼容性,此处支持配置多个IP地址和端口,并以“,”进行分隔。其中第一个必须是Loader服务的浮动IP地址和端口(21351),其余的可根据业务需求配置。 10.0.0.1:21351,10.0.0.2:12000 authentication.type 登录认证的方式。 “kerberos”,表示使用安全模式,进行Kerberos认证。Kerberos认证提供两种认证方式:密码和keytab文件。 “simple”,表示使用普通模式,不进行Kerberos认证。 kerberos authentication.user 普通模式或者使用密码认证方式时,登录使用的用户。 keytab登录方式,则不需要设置该参数。 bar authentication.password 使用密码认证方式时,登录使用的用户密码。 普通模式或者keytab登录方式,则不需要设置该参数。 用户需要对密码加密,加密方法: 进入“encrypt_tool”所在目录。例如,Loader客户端安装目录为“/opt/hadoopclient/Loader”,则执行如下命令。 cd /opt/hadoopclient/Loader/loader-tools-1.99.3 执行以下命令,对非加密密码进行加密。命令中如果携带认证密码信息可能存在安全风险,在执行命令前建议关闭系统的history命令记录功能,避免信息泄露。 ./encrypt_tool 未加密的密码 得到加密后的密文,作为“authentication.password”的取值。 说明: 非加密密码中含有特殊字符时需要转义。例如,$符号属于特殊字符,可使用单引号进行转义;非加密密码中含有单引号时可用双引号进行转义,非加密密码中含有双引号应使用反斜杠\进行转义。可参考Shell的转义字符规则。 - job.jobId 需要执行数据备份的作业ID。 作业ID可通过登录Loader webUI在已创建的作业查看。 1 use.keytab 是否使用keytab方式登录。 true,表示使用keytab文件登录 false,表示使用密码登录。 true client.principal 使用keytab认证方式时,访问Loader服务的用户规则。 普通模式或者密码登录方式,则不需要设置该参数。 loader/hadoop client.keytab 使用keytab认证方式登录时,使用的keytab文件所在目录。 普通模式或者密码登录方式,则不需要设置该参数。 /opt/client/conf/loader.keytab 执行以下命令,进入备份脚本“run.sh”所在目录。例如,Loader客户端安装目录为“/opt/hadoopclient/Loader”。 cd /opt/hadoopclient/Loader/loader-tools-1.99.3/loader-backup 执行以下命令,运行备份脚本“run.sh”,进行Loader作业数据备份。系统将数据备份到作业的输出路径同一层目录。 ./run.sh 备份数据的输入目录 例如,备份数据的输入目录为“/user/hbase/”,作业的输出路径为/opt/client/sftp/sftp1,其中sftp1只起到一个占位符的作用。执行如下命令,数据将备份到/opt/client/sftp/hbase目录。 ./run.sh /user/hbase/
  • Flink On Hudi同步元数据到Hive 启动此特性后,Flink写数据至Hudi表将自动在Hive上创建出Hudi表并同步添加分区,然后供SparkSQL、Hive等服务读取Hudi表数据。 如下是支持的两种同步元数据方式,后续操作步骤以JDBC方式为示例: 适用于MRS 3.2.0及之后版本。 使用JDBC方式同步元数据到Hive CREATE TABLE stream_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor', 'table.type' = 'MERGE_ON_READ', 'hive_sync.enable' = 'true', 'hive_sync.table' = '要同步到Hive的表名', 'hive_sync.db' = '要同步到Hive的数据库名', 'hive_sync.metastore.uris' = 'Hive客户端hive-site.xml文件中hive.metastore.uris的值', 'hive_sync.jdbc_url' = 'Hive客户端component_env文件中CLIENT_HIVE_URI的值' ); hive_sync.jdbc_url:Hive客户端component_env文件中CLIENT_HIVE_URI的值,如果该值中存在“\”需将其删除。 如果需要使用Hive风格分区,需同时配置如下参数: 'hoodie.datasource.write.hive_style_partitioning' = 'true' 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor' Flink on Hudi并同步数据至Hive的任务,因为Hudi对大小写敏感,Hive对大小写不敏感,所以在Hudi表中的字段不建议使用大写字母,否则可能会造成数据无法正常读写。 使用HMS方式同步元数据到Hive CREATE TABLE stream_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor', 'table.type' = 'MERGE_ON_READ', 'hive_sync.enable' = 'true', 'hive_sync.table' = '要同步到Hive的表名', 'hive_sync.db' = '要同步到Hive的数据库名', 'hive_sync.mode' = 'hms', 'hive_sync.metastore.uris' = 'Hive客户端hive-site.xml文件中hive.metastore.uris的值', 'properties.hive.metastore.kerberos.principal' = 'Hive客户端hive-site.xml文件中hive.metastore.kerberos.principal的值' );
  • 前提条件 集群已安装HDFS、Yarn、Flink和Hudi等服务。 包含Hudi服务的客户端已安装,例如安装路径为:/opt/client。 Flink要求1.12.2及以后版本,Hudi要求0.9.0及以后版本。 参考创建FlinkServer权限角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flink_admin。并且用户需要添加hadoop、hive、kafkaadmin用户组,以及Manager_administrator角色。
  • FlinkSQL Lookup Join Hudi使用须知 适用于MRS 3.5.0及以后版本。 使用lookup.join.cache.ttl参数来控制维表数据的加载周期,默认值为60min。 Hudi维表数据会被加载到Flink TaskManager Heap中,所以不推荐大于10万行记录的Hudi表作为维表。 维表的新增、更新数据需要等到下一次加载周期后,才能被加载进来参与计算。 SQL示例如下: CREATE TABLE hudimor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20), PRIMARY KEY (uuid) NOT ENFORCED ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudimor', 'table.type' = 'MERGE_ON_READ', 'hoodie.datasource.write.recordkey.field' = 'uuid', 'write.precombine.field' = 'ts', 'lookup.join.cache.ttl' = '60min' ); CREATE TABLE datagen(uuid varchar(20), proctime as PROCTIME()) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); CREATE TABLE blackhole ( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) WITH ('connector' = 'blackhole'); insert into blackhole select t1.uuid as uuid, t2.name as name, t2.age as age, t2.ts as ts, t2.p as p FROM datagen AS t1 left JOIN hudimor FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.uuid = t2.uuid;
  • WITH主要参数说明 表2 WITH主要参数说明 方式 配置项 是否必选 默认值 描述 读取 read.tasks 否 4 读Hudi表task并行度。 read.streaming.enabled 否 false 是否开启流读模式。 read.streaming.start-commit 否 默认从最新commit Stream和Batch增量消费,指定“yyyyMMddHHmmss”格式时间的开始消费位置(闭区间)。 read.end-commit 否 默认到最新commit Stream和Batch增量消费,指定“yyyyMMddHHmmss”格式时间的结束消费位置(闭区间)。 写入 write.tasks 否 4 写Hudi表task并行度。 index.bootstrap.enabled 否 false 是否开启索引加载,开启后会将已存表的最新数据一次性加载到state中。 如果有全量数据接增量的需求,且已经有全量的离线Hoodie表,需要接上实时写入,同时保证数据不重复,可以开启索引加载功能。 write.index_bootstrap.tasks 否 4 如果启动作业时索引加载缓慢,可以调大该值,调大该值后可以加快bootstrap阶段的效率,但bootstrap阶段会阻塞CheckPoint。 compaction.async.enabled 否 true 是否开启在线压缩。 compaction.schedule.enabled 否 true 是否阶段性生成压缩plan,即使关闭在线压缩的情况下也建议开启。 compaction.tasks 否 10 压缩Hudi表task并行度。 index.state.ttl 否 7D 索引保存的时间,默认为7天(单位:天),小于“0”表示永久保存 索引是判断数据重复的核心数据结构,对于长时间的更新,比如更新一个月前的数据,需要将该值调大。
  • WITH主要参数说明 配置项 是否必选 类型 描述 connector 必选 String 指定要使用的连接器,Kafka使用“kafka”。 topic kafka作为sink,必选 kafka作为source,可选 String 主题名称 当表用作source时,要从中读取数据的主题名称。支持主题列表,通过按分号分隔主题,如“主题-1;主题-2”。 当表用作sink时,主题名称为写入数据的主题。sink不支持主题列表。 topic-pattern kafka作为source时可选 String 主题模式 当表用作source时可设置该参数,主题名称需使用正则表达式。 不能同时设置“topic-pattern” 和“topic” 。 properties.bootstrap.servers 必选 String Kafka broker列表,以逗号分隔。 properties.group.id kafka作为source时必选 String Kafka的使用者组ID。 format 必选 String 用于反序列化和序列化Kafka消息的值部分的格式。 properties.* 可选 String 安全模式下需增加认证相关的参数。 scan.topic-partition-discovery.interval 可选 Duration 消费者定时动态发现创建的Partition的时间间隔。默认值:5min。
  • Concatenation Operator : || || 操作符用于将相同类型的数组或数值串联起来。 SELECT ARRAY[1] || ARRAY[2]; _col0 -------- [1, 2] (1 row) SELECT ARRAY[1] || 2; _col0 -------- [1, 2] (1 row) SELECT 2 || ARRAY[1]; _col0 -------- [2, 1] (1 row)
  • 快速配置常用参数 其他参数在安装集群时已进行了适配,以下参数需要根据使用场景进行调整。以下参数除特别指出外,一般在Spark2x客户端的“{客户端安装目录}/Spark/spark/conf/spark-defaults.conf”文件中配置。 表1 快速配置常用参数 配置项 说明 取值示例 spark.sql.parquet.compression.codec 对于非分区parquet表,设置其存储文件的压缩格式。 在JDBCServer服务端的“spark-defaults.conf”配置文件中进行设置。 snappy spark.dynamicAllocation.enabled 是否使用动态资源调度,用于根据规模调整注册于该应用的executor的数量。目前仅在YARN模式下有效。 JDBCServer默认值为true,client默认值为false。 false spark.executor.memory 每个Executor进程使用的内存数量,与JVM内存设置字符串的格式相同(例如:512m,2g)。 4G spark.sql.autoBroadcastJoinThreshold 当进行join操作时,配置广播的最大值。 当SQL语句中涉及的表中相应字段的大小小于该值时,进行广播。 配置为-1时,将不进行广播。 10485760 spark.yarn.queue JDBCServer服务所在的Yarn队列。 在JDBCServer服务端的“spark-defaults.conf”配置文件中进行设置。 default spark.driver.memory 大集群下推荐配置32~64g驱动程序进程使用的内存数量,即SparkContext初始化的进程(例如:512m, 2g)。 4G spark.yarn.security.credentials.hbase.enabled 是否打开获取HBase token的功能。如果需要Spark-on-HBase功能,并且配置了安全集群,参数值设置为“true”。否则设置为“false”。 false spark.serializer 用于串行化将通过网络发送或需要缓存的对象的类以序列化形式展现。 Java序列化的默认值适用于任何Serializable Java对象,但运行速度相当慢,所以建议使用org.apache.spark.serializer.KryoSerializer并配置Kryo序列化。可以是org.apache.spark.serializer.Serializer的任何子类。 org.apache.spark.serializer.JavaSerializer spark.executor.cores 每个执行者使用的内核个数。 在独立模式和Mesos粗粒度模式下设置此参数。当有足够多的内核时,允许应用程序在同样的worker上执行多个执行程序;否则,在每个worker上,每个应用程序只能运行一个执行程序。 1 spark.shuffle.service.enabled NodeManager中一个长期运行的辅助服务,用于提升Shuffle计算性能。 false spark.sql.adaptive.enabled 是否开启自适应执行框架。 false spark.executor.memoryOverhead 每个执行器要分配的堆内存量(单位为兆字节)。 这是占用虚拟机开销的内存,类似于内部字符串,其他内置开销等等。会随着执行器大小(通常为6-10%)而增长。 1GB spark.streaming.kafka.direct.lifo 配置是否开启Kafka后进先出功能。 false
  • 操作步骤 以客户端安装用户,登录安装客户端的节点。 执行以下命令切换到客户端目录。 cd 客户端安装目录 执行以下命令配置环境变量。 source bigdata_env 如果当前集群已启用Kerberos认证,执行以下命令认证当前用户 kinit 组件业务用户 如果当前集群未启用Kerberos认证,则执行以下命令设置Hadoop用户名: export HADOOP_USER_NAME=hbase 进入Spark客户端目录,执行如下命令,同步数据到HBase目标表中。 cd Spark/spark/bin 例如,执行以下命令同步test.orc_table表的所有数据到HBase的test:orc_table表中,使用id+uuid组合作为rowkey列,输出路径指定为“/tmp/orc_table”: spark-submit --master yarn --deploy-mode cluster --jars 客户端安装目录/HBase/hbase/lib/protobuf-java-2.5.0.jar,客户端安装目录/HBase/hbase/conf/* --conf spark.yarn.user.classpath.first=true --class com.huawei.hadoop.hbase.tools.bulkload.SparkBulkLoadTool 客户端安装目录/HBase/hbase/lib/hbase-it-bulk-load-*.jar -sql "select * from test.orc_table" -tb "test:orc_table" -rc "id,uuid" -op "/tmp/orc_table"
  • 前提条件 集群安装了Spark及Hive服务。 执行数据导入的用户需要同时具有Spark(对应源表的SELECT权限)、HBase权限(对应HBase NameSpace的RWXA权限)和HDFS权限(对应HFile输出目录的读写权限)。 如果集群已启用Kerberos认证(安全模式),需修改Spark“客户端安装目录/Spark/spark/conf/spark-defaults.conf”配置文件中的“spark.yarn.security.credentials.hbase.enabled”参数值为“true”。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全