华为云用户手册

  • Storage 内存计算是Spark的最大亮点,Spark的Storage主要管理内存资源。Storage中主要存储RDD在Cache过程中产生的数据块。JVM中堆内存是整体的,因此在Spark的Storage管理中,“Storage Memory Size”变成了一个非常重要的概念。 表20 参数说明 参数 描述 默认值 spark.storage.memoryMapThreshold 超过该块大小的Block,Spark会对该磁盘文件进行内存映射。这可以防止Spark在内存映射时映射过小的块。一般情况下,对接近或低于操作系统的页大小的块进行内存映射会有高开销。 2m
  • PORT 表21 参数说明 参数 描述 默认值 spark.ui.port 应用仪表盘的端口,显示内存和工作负载数据。 JD BCS erver2x:4040 SparkResource2x:0 spark.blockManager.port 所有BlockManager监测的端口。这些同时存在于Driver和Executor上。 随机端口范围 spark.driver.port Driver监测的端口,用于Driver与Executor进行通信。 随机端口范围
  • Executor配置 Executor也是单独一个Java进程,但不像Driver和AM只有一个,Executor可以有多个进程,而目前Spark只支持相同的配置,即所有Executor的进程参数都必然是一样的。 表12 参数说明 参数 描述 默认值 spark.executor.extraJavaOptions 传递至Executor的额外JVM选项。例如,GC设置或其他日志记录。请注意不能通过此选项设置Spark属性或heap大小。Spark属性应该使用SparkConf对象或调用spark-submit脚本时指定的spark-defaults.conf文件来设置。Heap大小可以通过spark.executor.memory来设置。 参考快速配置参数 spark.executor.extraClassPath 附加至Executor classpath的额外的classpath。这主要是为了向后兼容Spark的历史版本。用户一般不用设置此选项。 - spark.executor.extraLibraryPath 设置启动executor JVM时所使用的特殊的library path。 参考快速配置参数 spark.executor.userClassPathFirst (试验性)与spark.driver.userClassPathFirst相同的功能,但应用于Executor实例。 false spark.executor.memory 每个Executor进程使用的内存数量,与JVM内存设置字符串的格式相同(例如:512M,2G)。 4G spark.executorEnv.[EnvironmentVariableName] 添加由EnvironmentVariableName指定的环境变量至executor进程。用户可以指定多个来设置多个环境变量。 - spark.executor.logs.rolling.maxRetainedFiles 设置系统即将保留的最新滚动日志文件的数量。旧的日志文件将被删除。默认关闭。 - spark.executor.logs.rolling.size.maxBytes 设置滚动Executor日志的文件的最大值。默认关闭。数值以字节为单位设置。如果要自动清除旧日志,请查看spark.executor.logs.rolling.maxRetainedFiles。 - spark.executor.logs.rolling.strategy 设置executor日志的滚动策略。默认滚动关闭。可以设置为“time”(基于时间的滚动)或“size”(基于大小的滚动)。当设置为“time”,使用spark.executor.logs.rolling.time.interval属性的值作为日志滚动的间隔。当设置为“size”,使用spark.executor.logs.rolling.size.maxBytes设置滚动的最大文件大小滚动。 - spark.executor.logs.rolling.time.interval 设置executor日志滚动的时间间隔。默认关闭。合法值为“daily”、“hourly”、“minutely”或任意秒。如果要自动清除旧日志,请查看spark.executor.logs.rolling.maxRetainedFiles。 daily
  • HistoryServer HistoryServer读取文件系统中的EventLog文件,展示已经运行完成的Spark应用在运行时的状态信息。 表14 参数说明 参数 描述 默认值 spark.history.fs.logDirectory History server的日志目录 - spark.history.ui.port JobHistory侦听连接的端口。 18080 spark.history.fs.updateInterval History server所显示信息的更新周期,单位为秒。每次更新检查持久存储中针对事件日志进行的更改。 10s spark.history.fs.update.interval.seconds 每个事件日志更新检查的间隔。与spark.history.fs.updateInterval功能相同,推荐使用spark.history.fs.updateInterval。 10s spark.history.updateInterval 该配置项与spark.history.fs.update.interval.seconds和spark.history.fs.updateInterval功能相同,推荐使用spark.history.fs.updateInterval。 10s
  • EventLog Spark应用在运行过程中,实时将运行状态以JSON格式写入文件系统,用于HistoryServer服务读取并重现应用运行时状态。 表16 参数说明 参数 描述 默认值 spark.eventLog.enabled 是否记录Spark事件,用于应用程序在完成后重构webUI。 true spark.eventLog.dir 如果spark.eventLog.enabled为true,记录Spark事件的目录。在此目录下,Spark为每个应用程序创建文件,并将应用程序的事件记录到文件中。用户也可设置为统一的与HDFS目录相似的地址,这样History server就可以读取历史文件。 hdfs://hacluster/spark2xJobHistory2x spark.eventLog.compress spark.eventLog.enabled为true时,是否压缩记录的事件。 false
  • ExecutorLaucher配置 ExecutorLauncher只有在Yarn-Client模式下才会存在的角色,Yarn-Client模式下,ExecutorLauncher和Driver不在同一个进程中,需要对ExecutorLauncher的参数进行特殊的配置。 表11 参数说明 参数 描述 默认值 spark.yarn.am.extraJavaOptions 在Client模式下传递至YARN Application Master的一系列额外JVM选项。在Cluster模式下使用spark.driver.extraJavaOptions。 参考快速配置参数 spark.yarn.am.memory 针对Client模式下YARN Application Master使用的内存数量,与JVM内存设置字符串格式一致(例如:512m,2g)。在集群模式下,使用spark.driver.memory。 1G spark.yarn.am.memoryOverhead 和“spark.yarn.driver.memoryOverhead”一样,但只针对Client模式下的Application Master。 - spark.yarn.am.cores 针对Client模式下YARN Application Master使用的核数。在Cluster模式下,使用spark.driver.cores。 1
  • WebUI WebUI展示了Spark应用运行的过程和状态。 表13 参数说明 参数 描述 默认值 spark.ui.killEnabled 允许停止Web UI中的stage和相应的job。 说明: 出于安全考虑,将此配置项的默认值设置成false,以避免用户发生误操作。如果需要开启此功能,则可以在spark-defaults.conf配置文件中将此配置项的值设为true。请谨慎操作。 true spark.ui.port 应用程序dashboard的端口,显示内存和工作量数据。 JDB CS erver2x:4040 SparkResource2x:0 IndexServer2x:22901 spark.ui.retainedJobs 在垃圾回收之前Spark UI和状态API记住的job数。 1000 spark.ui.retainedStages 在垃圾回收之前Spark UI和状态API记住的stage数。 1000
  • Spark Streaming Kafka Receiver是Spark Streaming一个重要的组成部分,它负责接收外部数据,并将数据封装为Block,提供给Streaming消费。最常见的数据源是Kafka,Spark Streaming对Kafka的集成也是最完善的,不仅有可靠性的保障,而且也支持从Kafka直接作为RDD输入。 表7 参数说明 参数 描述 默认值 spark.streaming.kafka.maxRatePerPartition 使用Kafka direct stream API时,从每个Kafka分区读取数据的最大速率(每秒记录数量)。 - spark.streaming.blockInterval 在被存入Spark之前Spark Streaming Receiver接收数据累积成数据块的间隔(毫秒)。推荐最小值为50毫秒。 200ms spark.streaming.receiver.maxRate 每个Receiver接收数据的最大速率(每秒记录数量)。配置设置为0或者负值将不会对速率设限。 - spark.streaming.receiver.writeAheadLog.enable 是否使用ReliableKafkaReceiver。该Receiver支持流式数据不丢失。 false
  • 普通Shuffle配置 表9 参数说明 参数 描述 默认值 spark.shuffle.spill 如果设为“true”,通过将数据溢出至磁盘来限制reduce任务期间内存的使用量。 true spark.shuffle.spill.compress 是否压缩shuffle期间溢出的数据。使用spark.io.compression.codec指定的算法进行数据压缩。 true spark.shuffle.file.buffer 每个shuffle文件输出流的内存缓冲区大小(单位:KB)。这些缓冲区可以减少创建中间shuffle文件流过程中产生的磁盘寻道和系统调用次数。也可以通过配置项spark.shuffle.file.buffer.kb设置。 32KB spark.shuffle.compress 是否压缩map任务输出文件。建议压缩。使用spark.io.compression.codec进行压缩。 true spark.reducer.maxSizeInFlight 从每个reduce任务同时fetch的map任务输出最大值(单位:MB)。由于每个输出要求创建一个缓冲区进行接收,这代表了每个reduce任务固定的内存开销,所以除非拥有大量内存,否则保持低值。也可以通过配置项spark.reducer.maxMbInFlight设置。 48MB
  • Driver配置 Spark Driver可以理解为Spark提交应用的客户端,所有的代码解析工作都在这个进程中完成,因此该进程的参数尤其重要。下面将以如下顺序介绍Spark中进程的参数设置: JavaOptions:Java命令中“-D”后面的参数,可以由System.getProperty获取。 ClassPath:包括Java类和Native的Lib加载路径。 Java Memory and Cores:Java进程的内存和CPU使用量。 Spark Configuration:Spark内部参数,与Java进程无关。 表10 参数说明 参数 描述 默认值 spark.driver.extraJavaOptions 传递至driver(驱动程序)的一系列额外JVM选项。例如,GC设置或其他日志记录。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。 参考快速配置参数 spark.driver.extraClassPath 附加至driver的classpath的额外classpath条目。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。 参考快速配置参数 spark.driver.userClassPathFirst (试验性)当在驱动程序中加载类时,是否授权用户添加的jar优先于Spark自身的jar。这种特性可用于减缓Spark依赖和用户依赖之间的冲突。目前该特性仍处于试验阶段,仅用于Cluster模式中。 false spark.driver.extraLibraryPath 设置一个特殊的library path在启动驱动程序JVM时使用。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。 JDBCServer2x: ${SPARK_INSTALL_HOME}/spark/native SparkResource2x: ${DATA_NODE_INSTALL_HOME}/hadoop/lib/native spark.driver.cores 驱动程序进程使用的核数。仅适用于Cluster模式。 1 spark.driver.memory 驱动程序进程使用的内存数量,即SparkContext初始化的进程(例如:512M, 2G)。 注意:在Client模式中,该配置禁止直接在应用程序中通过SparkConf设置,因为驱动程序JVM已经启动。请通过--driver-java-options命令行选项或默认property文件进行设置。 4G spark.driver.maxResultSize 对每个Spark action操作(例如“collect”)的所有分区序列化结果的总量限制,至少1M,设置成0表示不限制。如果总量超过该限制,工作任务会中止。限制值设置过高可能会引起驱动程序的内存不足错误(取决于spark.driver.memory和JVM的对象内存开销)。设置合理的限制可以避免驱动程序出现内存不足的错误。 1G spark.driver.host Driver监测的主机名或IP地址,用于Driver与Executor进行通信。 (local hostname) spark.driver.port Driver监测的端口,用于Driver与Executor进行通信。 (random)
  • Spark Streaming Spark Streaming是在Spark批处理平台提供的流式数据的处理能力,以“mini-batch”的方式处理从外部输入的数据。 在Spark客户端的“spark-defaults.conf”文件中配置如下参数。 表6 参数说明 参数 描述 默认值 spark.streaming.receiver.writeAheadLog.enable 启用预写日志(WAL)功能。所有通过Receiver接收的输入数据将被保存至预写日志,预写日志可以保证Driver程序出错后数据可以恢复。 false spark.streaming.unpersist 由Spark Streaming产生和保存的RDDs自动从Spark的内存中强制移除。Spark Streaming接收的原始输入数据也将自动清除。设置为false时原始输入数据和存留的RDDs不会自动清除,因此在streaming应用外部依然可以访问,但是这会占用更多的Spark内存。 true
  • Netty/NIO及Hash/Sort配置 Shuffle是大数据处理中最重要的一个性能点,网络是整个Shuffle过程的性能点。目前Spark支持两种Shuffle方式,一种是Hash,另外一种是Sort。网络也有两种方式,Netty和NIO。 表8 参数说明 参数 描述 默认值 spark.shuffle.manager 处理数据的方式。有两种实现方式可用:sort和hash。sort shuffle对内存的使用率更高,是Spark 1.2及后续版本的默认选项。Spark2.x及后续版本不支持hash。 SORT spark.shuffle.consolidateFiles (仅hash方式)如果要合并在shuffle过程中创建的中间文件,需要将该值设置为“true”。文件创建的少可以提高文件系统处理性能,降低风险。使用ext4或者xfs文件系统时,建议设置为“true”。由于文件系统限制,在ext3上该设置可能会降低8核以上机器的处理性能。 false spark.shuffle.sort.bypassMergeThreshold 该参数只适用于spark.shuffle.manager设置为sort时。在不做map端聚合并且reduce任务的partition数小于或等于该值时,避免对数据进行归并排序,防止系统处理不必要的排序引起性能下降。 200 spark.shuffle.io.maxRetries (仅Netty方式)如果设为非零值,由于IO相关的异常导致的fetch失败会自动重试。该重试逻辑有助于大型shuffle在发生长GC暂停或者网络闪断时保持稳定。 12 spark.shuffle.io.numConnectionsPerPeer (仅Netty方式)为了减少大型集群的连接创建,主机间的连接会被重新使用。对于拥有较多硬盘和少数主机的集群,此操作可能会导致并发性不足以占用所有磁盘,所以用户可以考虑增加此值。 1 spark.shuffle.io.preferDirectBufs (仅Netty方式)使用off-heap缓冲区减少shuffle和高速缓存块转移期间的垃圾回收。对于off-heap内存被严格限制的环境,用户可以将其关闭以强制所有来自Netty的申请使用堆内内存。 true spark.shuffle.io.retryWait (仅Netty方式)等待fetch重试期间的时间(秒)。重试引起的最大延迟为maxRetries * retryWait,默认是15秒。 5
  • Python Spark Python Spark是Spark除了Scala、Java两种API之外的第三种编程语言。不同于Java和Scala都是在JVM平台上运行,Python Spark不仅会有JVM进程,还会有自身的Python进程。以下配置项只适用于Python Spark场景,而其他配置项也同样可以在Python Spark中生效。 表4 参数说明 参数 描述 默认值 spark.python.profile 在Python worker中开启profiling。通过sc.show_profiles()展示分析结果。或者在driver退出前展示分析结果。可以通过sc.dump_profiles(path) 将结果转储到磁盘中。如果一些分析结果已经手动展示,那么在Driver退出前,它们将不会再自动展示。 默认使用pyspark.profiler.BasicProfiler,可以在初始化SparkContext时传入指定的profiler来覆盖默认的profiler。 false spark.python.worker.memory 聚合过程中每个python worker进程所能使用的内存大小,其值格式同指定JVM内存一致,如512m,2g。如果进程在聚集期间所用的内存超过了该值,数据将会被写入磁盘。 512m spark.python.worker.reuse 是否重用python worker。如是,它将使用固定数量的Python workers,那么下一批提交的task将重用这些Python workers,而不是为每个task重新fork一个Python进程。 该功能在大型广播下非常有用,因为此时对下一批提交的task不需要将数据从JVM再一次传输至Python worker。 true
  • Spark长时间任务安全认证配置 安全模式下,使用Spark CLI(如spark shell、spark sql、spark submit)时,如果使用kinit命令进行安全认证,当执行长时间运行任务时,会因为认证过期导致任务失败。 在客户端的“spark-defaults.conf”配置文件中设置如下参数,配置完成后,重新执行Spark CLI即可。 当参数值为“true”时,需要保证“spark-defaults.conf”和“hive-site.xml”中的Keytab和principal的值相同。 表3 参数说明 参数名称 含义 默认值 spark.kerberos.principal 具有Spark操作权限的principal。请联系 MRS 集群管理员获取对应principal。 - spark.kerberos.keytab 具有Spark操作权限的Keytab文件名称和文件路径。请联系MRS集群管理员获取对应Keytab文件。 - spark.security.bigdata.loginOnce Principal用户是否只登录一次。true为单次登录;false为多次登录。 单次登录与多次登录的区别在于:Spark社区使用多次Kerberos用户登录多次的方案,但容易出现TGT过期或者Token过期异常导致应用无法长时间运行。DataSight修改了Kerberos登录方式,只允许用户登录一次,可以有效的解决过期问题。限制在于,Hive相关的principal与keytab的配置项必须与Spark配置相同。 说明: 当参数值为true时,需要保证“spark-defaults.conf”和“hive-site.xml”中的Keytab和principal的值相同。 true
  • 配置Stage失败重试次数 Spark任务在遇到FetchFailedException时会触发Stage重试。为了防止Stage无限重试,对Stage重试次数进行限制。重试次数可以根据实际需要进行调整。 在Spark客户端的“spark-defaults.conf”文件中配置如下参数。 表1 参数说明 参数 说明 默认值 spark.stage.maxConsecutiveAttempts Stage失败重试最大次数。 4
  • Dynamic Allocation 动态资源调度是On Yarn模式特有的特性,并且必须开启Yarn External Shuffle才能使用这个功能。在使用Spark作为一个常驻的服务时候,动态资源调度将大大的提高资源的利用率。例如JDBCServer服务,大多数时间该进程并不接受JDBC请求,因此将这段空闲时间的资源释放出来,将极大的节约集群的资源。 表5 参数说明 参数 描述 默认值 spark.dynamicAllocation.enabled 是否使用动态资源调度,用于根据规模调整注册于该应用的executor的数量。注意目前仅在YARN模式下有效。 启用动态资源调度必须将spark.shuffle.service.enabled设置为true。以下配置也与此相关:spark.dynamicAllocation.minExecutors、spark.dynamicAllocation.maxExecutors和spark.dynamicAllocation.initialExecutors。 JDBCServer2x: true SparkResource2x: false spark.dynamicAllocation.minExecutors 最小Executor个数。 0 spark.dynamicAllocation.initialExecutors 初始Executor个数。 spark.dynamicAllocation.minExecutors spark.dynamicAllocation.maxExecutors 最大executor个数。 2048 spark.dynamicAllocation.schedulerBacklogTimeout 调度第一次超时时间。单位为秒。 1s spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 调度第二次及之后超时时间。 1s spark.dynamicAllocation.executorIdleTimeout 普通Executor空闲超时时间。单位为秒。 60 spark.dynamicAllocation.cachedExecutorIdleTimeout 含有cached blocks的Executor空闲超时时间。 JDBCServer2x:2147483647s IndexServer2x:2147483647s SparkResource2x:120
  • 配置是否使用笛卡尔积功能 要启动使用笛卡尔积功能,需要在Spark的“spark-defaults.conf”配置文件中进行如下设置。 表2 笛卡尔积参数说明 参数 说明 默认值 spark.sql.crossJoin.enabled 是否允许隐性执行笛卡尔积。 “true”表示允许 “false”表示不允许,此时只允许query中显式包含CROSS JOIN语法。 true JDBC应用在服务端的“spark-defaults.conf”配置文件中设置该参数。 Spark客户端提交的任务在客户端配的“spark-defaults.conf”配置文件中设置该参数。
  • 参数说明 IN | FROM schema_name 指定schema名称,未指定时默认使用当前的schema。 LIKE 'identifier_with_wildcards' identifier_with_wildcards只支持包含“*”和“|”的规则匹配表达式。 其中“*”可以匹配单个或多个字符,“|”适用于匹配多种规则匹配表达式中的任意一种的情况,它用于分隔这些规则匹配表达式。 规则匹配表达式首尾的空格,不会参与匹配计算。 partition_spec 一个可选参数,使用键值对来指定分区列表,键值对之间通过逗号分隔。需要注意,指定分区时,表名不支持模糊匹配。
  • 示例 -- 演示数据准备 create schema show_schema; use show_schema; create table show_table1(a int,b string); create table show_table2(a int,b string); create table from_table1(a int,b string); create table in_table1(a int,b string); --查询表名以"show"开始的表的详细信息 show table extended like 'show*'; tab_name -------------------------------------------------------------------------- tableName:show_table1 owner:admintest location:hdfs://hacluster/user/hive/warehouse/show_schema.db/show_table1 InputFormat:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat OutputFormat:org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat columns:struct columns {int a,string b} partitioned:false partitionColumns: totalNumberFiles:0 totalFileSize:0 tableName:show_table2 owner:admintest location:hdfs://hacluster/user/hive/warehouse/show_schema.db/show_table2 InputFormat:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat OutputFormat:org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat columns:struct columns {int a,string b} partitioned:false partitionColumns: totalNumberFiles:0 totalFileSize:0 (1 row) -- 查询表名以"from"或者"show"开头的表的详细信息 show table extended like 'from*|show*'; tab_name ---------------------------------------------------------------------- tableName show_table1 owner admintest location hdfs://hacluster/user/hive/warehouse/show_table1 InputFormat org.apache.hadoop.hive.ql.io.orc.OrcInputFormat OutputFormat org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat columns struct columns {int a,string b} partitioned false partitionColumns totalNumberFiles 0 totalFileSize null tableName from_table1 owner admintest location hdfs://hacluster/user/hive/warehouse/from_table1 InputFormat org.apache.hadoop.hive.ql.io.orc.OrcInputFormat OutputFormat org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat columns struct columns {int a,string b} partitioned false partitionColumns totalNumberFiles 0 totalFileSize null tableName show_table2 owner admintest location hdfs://hacluster/user/hive/warehouse/show_table2 InputFormat org.apache.hadoop.hive.ql.io.orc.OrcInputFormat OutputFormat org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat columns struct columns {int a,string b} partitioned false partitionColumns totalNumberFiles 0 totalFileSize null (1 row) -- 查询web schema下的page_views表扩展信息 show table extended from web like 'page*'; tab_name ----------------------------------------------------------------------------- tableName:page_views owner:admintest location:hdfs://hacluster/user/web.db/page_views InputFormat:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat OutputFormat:org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat columns:struct columns {timestamp view_time,bigint user_id,string page_url} partitioned:true partitionColumns: struct partition_columns {date ds,string country} totalNumberFiles:0 totalFileSize:0 (1 row)
  • 示例 显示能够创建orders表的SQL 语句: CREATE TABLE orders ( orderkey bigint, orderstatus varchar, totalprice double, orderdate date ) WITH (format = 'ORC', location='/user',orc_compress='ZLIB',external=true, "auto.purge"=false); show create table orders; Create Table ------------------------------------------------- CREATE TABLE hive.default.orders ( orderkey bigint, orderstatus varchar, totalprice double, orderdate date ) WITH ( external_location = 'hdfs://hacluster/user', format = 'ORC', orc_compress = 'ZLIB', orc_compress_size = 262144, orc_row_index_stride = 10000, orc_stripe_size = 67108864 ) (1 row)
  • 配置描述 在客户端的“mapred-site.xml”配置文件中进行如下配置。“mapred-site.xml”配置文件在客户端安装路径的config目录下,例如“/opt/client/Yarn/config”。 表1 参数说明 参数 描述 默认值 mapreduce.app-submission.cross-platform 支持在Windows上提交到Linux上运行MR任务的配置项。当该参数的值设为“true”时,表示支持。当该参数的值设为“false”时,表示不支持。 true
  • 前提条件 已安装Oozie、ZooKeeper服务,且服务正常运行。 没有任务正在运行。 如果当前集群不是安装最新的版本包,需要从“$BIGDATA_HOME/ FusionInsight _Porter_x.x.x/install/FusionInsight-Oozie-x.x.x/oozie-x.x.x/embedded-oozie-server/webapp/WEB-INF/lib”路径复制“curator-x-discovery-x.x.x.jar”包到“$BIGDATA_HOME/FusionInsight_Porter_x.x.x/install/FusionInsight-Oozie-x.x.x/oozie-x.x.x/lib”目录下。
  • 配置场景 Spark SQL Adaptive Execution特性用于使Spark SQL在运行过程中,根据中间结果优化后续执行流程,提高整体执行效率。当前已实现的特性如下: 自动设置shuffle partition数 在启用Adaptive Execution特性前,Spark SQL根据spark.sql.shuffle.partitions配置指定shuffle时的partition个数。此种方法在一个应用中执行多种SQL查询时缺乏灵活性,无法保证所有场景下的性能更优。开启Adaptive Execution后,Spark SQL将自动为每个shuffle过程动态设置partition个数,而不是使用通用配置,使每次shuffle过程自动使用最合理的partition数。 动态调整执行计划 在启用Adaptive Execution特性前,Spark SQL根据RBO和CBO的优化结果创建执行计划,此种方法忽略了数据在运行过程中的结果集变化。比如基于某个大表创建的视图,与其他大表join时,即便视图的结果集很小,也无法将执行计划调整为BroadcastJoin。启用Adaptive Execution特性后,Spark SQL能够在运行过程中根据前面stage的运行结果动态调整后续的执行计划,从而获得更好的执行性能。 自动处理数据倾斜 在执行SQL语句时,如果存在数据倾斜,可能导致单个executor内存溢出、任务执行缓慢等问题。启动Adaptive Execution特性后,Spark SQL能自动处理数据倾斜场景,对倾斜的分区,启动多个task进行处理,每个task读取部分shuffle输出文件,再对这部分任务的Join结果进行Union操作,以达到消除数据倾斜的效果
  • 字符串函数 这些函数假定输入字符串包含有效的UTF-8编码的Unicode代码点。不会显式检查UTF-8数据是否有效,对于无效的UTF-8数据,函数可能会返回错误的结果。可以使用from_utf8来更正无效的UTF-8数据。 此外,这些函数对Unicode代码点进行运算,而不是对用户可见的字符(或字形群集)进行运算。某些语言将多个代码点组合成单个用户感观字符(这是语言书写系统的基本单位),但是函数会将每个代码点视为单独的单位。 lower和upper函数不执行某些语言所需的区域设置相关、上下文相关或一对多映射。 chr(n) → varchar 描述:返回Unicode编码值为n的字符值。 select chr(100); --d char_length(string) → bigint 参考length(string) character_length(string) → bigint 参考length(string) codepoint(string) → integer 描述:返回单个字符对应的Unicode编码。 select codepoint('d'); --100 concat(string1, string2) → varchar 描述:字符串连接。 select concat('hello','world'); -- helloworld concat_ws(string0, string1, ..., stringN) → varchar 描述:将string1、string2、...,stringN,以string0作为分隔符串联成一个字符串。如果string0为null,则返回值为null。分隔符后的参数如果是NULL值,将会被跳过。 select concat_ws(',','hello','world'); -- hello,world select concat_ws(NULL,'def'); --NULL select concat_ws(',','hello',NULL,'world'); -- hello,world select concat_ws(',','hello','','world'); -- hello,,world concat_ws(string0, array(varchar)) → varchar 描述:将数组中的元素以string0为分隔符进行串联。如果string0为null,则返回值为null。数组中的任何null值都将被跳过。 select concat_ws(NULL,ARRAY['abc']);--NULL select concat_ws(',',ARRAY['abc',NULL,NULL,'xyz']); -- abc,xyz select concat_ws(',',ARRAY['hello','world']); -- hello,world decode(binary bin, string charset) →varchar 描述:根据给定的字符集将第一个参数编码为字符串,支持的字符集包括('UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16'),当第一个参数为null,将返回null。 select decode(X'70 61 6e 64 61','UTF-8'); _col0 ------- panda (1 row) select decode(X'00 70 00 61 00 6e 00 64 00 61','UTF-16BE'); _col0 ------- panda (1 row) encode(string str, string charset) →binary 描述:字符串按照给定的字符集进行编码。 select encode('panda','UTF-8'); _col0 ---------------- 70 61 6e 64 61 (1 row) find_in_set (string str, string strList) →int 描述:返回str在逗号分隔的strList中第一次出现的位置。当有参数为null时,返回值也为null。 select find_in_set('ab', 'abc,b,ab,c,def'); -- 3 format_number(number x, int d) →string 描述:将数字x格式化为'#,###,###.##',保留d位小数,以字符串的形式返回结果。 select format_number(541211.212,2); -- 541,211.21 format(format,args...) → varchar 描述:参见Format。 locate(string substr, string str, int pos]) →int 描述:返回子串在字符串的第pos位后第一次出现的位置。没有满足条件的返回0。 select locate('aaa','bbaaaaa',6);-- 0 select locate('aaa','bbaaaaa',1);-- 3 select locate('aaa','bbaaaaa',4);-- 4 length(string) → bigint 描述:返回字符串的长度。 select length('hello');-- 5 levenshtein_distance(string1, string2) → bigint 描述:计算string1和string2的Levenshtein距离,即将string转为string2所需要的单字符编辑(包括插入、删除或替换)最少次数。 select levenshtein_distance('helo word','hello,world'); -- 3 hamming_distance(string1, string2) → bigint 描述:返回字符串1和字符串2的汉明距离,即对应位置字符不同的数量。 请注意,两个字符串的长度必须相同。 select hamming_distance('abcde','edcba');-- 4 instr(string,substring) → bigint 描述:查找substring 在string中首次出现的位置。 select instr('abcde', 'cd');--3 levenshtein(string1, string2) → bigint 参考levenshtein_distance(string1, string2) levenshtein_distance(string1, string2) → bigint 描述:返回字符串1和字符串2的Levenshtein编辑距离,即将字符串1更改为字符串2所需的最小单字符编辑(插入,删除或替换)次数。 select levenshtein_distance('apple','epplea');-- 2 lower(string) → varchar 描述:将字符转换为小写。 select lower('HELLo!');-- hello! lcase(string A) → varchar 描述:同lower(string)。 ltrim(string) → varchar 描述:去掉字符串开头的空格。 select ltrim(' hello');-- hello lpad(string, size, padstring) → varchar 描述:右填充字符串以使用padstring调整字符大小。如果size小于字符串的长度,则结果将被截断为size个字符。大小不能为负,并且填充字符串必须为非空。 select lpad('myk',5,'dog'); -- domyk
  • 操作步骤 要使用CBO优化,可以按照以下步骤进行优化。 需要先执行特定的SQL语句来收集所需的表和列的统计信息。 SQL命令如下(根据具体情况选择需要执行的SQL命令): 生成表级别统计信息(扫表): ANALYZE TABLE src COMPUTE STATIS TICS 生成sizeInBytes和rowCount。 使用ANALYZE语句收集统计信息时,无法计算非HDFS数据源的表的文件大小。 生成表级别统计信息(不扫表): ANALYZE TABLE src COMPUTE STATISTICS NOSCAN 只生成sizeInBytes,如果原来已经生成过sizeInBytes和rowCount,而本次生成的sizeInBytes和原来的大小一样,则保留rowCount(如果存在),否则清除rowCount。 生成列级别统计信息 ANALYZE TABLE src COMPUTE STATISTICS FOR COLUMNS a, b, c 生成列统计信息,为保证一致性,会同步更新表统计信息。目前不支持复杂数据类型(如Seq, Map等)和HiveStringType的统计信息生成。 显示统计信息 DESC FORMATTED src 在Statistics中会显示“xxx bytes, xxx rows”分别表示表级别的统计信息。也可以通过如下命令显示列统计信息: DESC FORMATTED src a 使用限制:当前统计信息收集不支持针对分区表的分区级别的统计信息。 在Spark客户端的“spark-defaults.conf”配置文件中进行表1设置。 表1 参数介绍 参数 描述 默认值 spark.sql.cbo.enabled CBO总开关。 true表示打开, false表示关闭。 要使用该功能,需确保相关表和列的统计信息已经生成。 false spark.sql.cbo.joinReorder.enabled 使用CBO来自动调整连续的inner join的顺序。 true:表示打开 false:表示关闭 要使用该功能,需确保相关表和列的统计信息已经生成,且CBO总开关打开。 false spark.sql.cbo.joinReorder.dp.threshold 使用CBO来自动调整连续inner join的表的个数阈值。 如果超出该阈值,则不会调整join顺序。 12
  • CASE 标准的SQL CASE表达式有两种模式。 “简单模式”从左向右查找表达式的每个value,直到找出相等的expression: CASE expression WHEN value THEN result [ WHEN ... ] [ ELSE result ] END 返回匹配value的result。如果没有匹配到任何值,则返回ELSE子句的result;如果没有ELSE子句,则返回空。示例: select a, case a when 1 then 'one' when 2 then 'two' else 'many' end from (values (1),(2),(3),(4)) as t(a); a | _col1 ---|------- 1 | one 2 | two 3 | many 4 | many (4 rows) “查找模式”从左向右判断每个condition的布尔值,直到判断为真,返回匹配result: CASE WHEN condition THEN result [ WHEN ... ] [ ELSE result ] END 如果判断条件都不成立,则返回ELSE子句的result;如果没有ELSE子句,则返回空。示例: select a,b, case when a=1 then 'one' when b=2 then 'tow' else 'many' end from (values (1,2),(3,4),(1,3),(4,2)) as t(a,b); a | b | _col2 ---|---|------- 1 | 2 | one 3 | 4 | many 1 | 3 | one 4 | 2 | tow (4 rows)
  • TRY 评估一个表达式,如果出错,则返回Null。类似于编程语言中的try catch。try函数一般结合COALESCE使用,COALESCE可以将异常的空值转为0或者空,以下情况会被try捕获: 分母为0 错误的cast操作或者函数入参 数字超过了定义长度 不推荐使用,应该明确以上异常,做数据预处理 示例: 假设有以下表,字段origin_zip中包含了一些无效数据: -- 创建表 create table shipping (origin_state varchar,origin_zip varchar,packages int ,total_cost int); -- 插入数据 insert into shipping values ('California','94131',25,100), ('California','P332a',5,72), ('California','94025',0,155), ('New Jersey','08544',225,490); -- 查询数据 SELECT * FROM shipping; origin_state | origin_zip | packages | total_cost --------------+------------+----------+------------ California | 94131 | 25 | 100 California | P332a | 5 | 72 California | 94025 | 0 | 155 New Jersey | 08544 | 225 | 490 (4 rows) 不使用Try查询失败: SELECT CAST(origin_zip AS BIGINT) FROM shipping; Query failed: Cannot cast 'P332a' to BIGINT 使用Try返回NULL: SELECT TRY(CAST(origin_zip AS BIGINT)) FROM shipping; origin_zip ------------ 94131 NULL 94025 08544 (4 rows) 不使用try查询失败: SELECT total_cost/packages AS per_package FROM shipping; Query failed: Division by zero 使用TRY和COALESCE返回默认值: SELECT COALESCE(TRY(total_cost/packages),0) AS per_package FROM shipping; per_package ------------- 4 14 0 19 (4 rows)
  • NULLIF nullif(value1, value2) 如果value1与value2相等,返回NULL;否则返回value1 。 select nullif(a,b) from (values (1,1),(1,2)) as t(a,b); -- _col0 ------- NULL 1 (2 rows) ZEROIFNULL(value) 如果value为null,返回0,否则返回原值。目前支持数值类型还有varchar类型。 select zeroifnull(a),zeroifnull(b),zeroifnull(c) from (values (null,13.11,bigint '157'),(88,null,bigint '188'),(55,14.11,null)) as t(a,b,c); _col0 | _col1 | _col2 -------|-------|------- 0 | 13.11 | 157 88 | 0.00 | 188 55 | 14.11 | 0 (3 rows) NVL(value1,value2) 如果value1为NULL,返回value2,否则,返回value1。 select nvl(NULL,3); -- 3 select nvl(2,3); --2 ISNULL(value) 如果value1为NULL,返回true,否则返回false。 Create table nulltest(col1 int,col2 int); insert into nulltest values(null,3); select isnull(col1),isnull(col2) from nulltest; _col0 | _col1 -------|------- true | false (1 row) ISNOTNULL(value) 如果value1为NULL,返回false,否则返回true。 select isnotnull(col1),isnotnull(col2) from nulltest; _col0 | _col1 -------|------- false | true (1 row)
  • IF IF函数是语言结构,它与下面的CASE表达式功能相同: CASE WHEN condition THEN true_value [ ELSE false_value ] END if(condition, true_value) 如果condition为真,返回true_value;否则返回NULL,true_value不进行计算。 select if(a=1,8) from (values (1),(1),(2)) as t(a); -- 8 8 NULL select if(a=1,'value') from (values (1),(1),(2)) as t(a); -- value value NULL if(condition, true_value, false_value) 如果condition为真,返回true_value;否则计算并返回false_value 。 select if(a=1,'on','off') from (values (1),(1),(2)) as t(a); _col0 ------- on on off (3 rows)
  • Flink On Hudi同步元数据到Hive 适用于MRS 3.2.0及之后版本。 使用JDBC方式同步元数据到Hive CREATE TABLE stream_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor', 'table.type' = 'MERGE_ON_READ', 'hive_sync.enable' = 'true', 'hive_sync.table' = '要同步到Hive的表名', 'hive_sync.db' = '要同步到Hive的数据库名', 'hive_sync.metastore.uris' = 'Hive客户端hive-site.xml文件中hive.metastore.uris的值', 'hive_sync.jdbc_url' = 'Hive客户端component_env文件中CLIENT_HIVE_URI的值' ); hive_sync.jdbc_url:Hive客户端component_env文件中CLIENT_HIVE_URI的值,如果该值中存在“\”需将其删除。 如果需要使用Hive风格分区,需同时配置如下参数: 'hoodie.datasource.write.hive_style_partitioning' = 'true' 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor' Flink on Hudi并同步数据至Hive的任务,因为Hudi对大小写敏感,Hive对大小写不敏感,所以在Hudi表中的字段不建议使用大写字母,否则可能会造成数据无法正常读写。 使用HMS方式同步元数据到Hive CREATE TABLE stream_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor', 'table.type' = 'MERGE_ON_READ', 'hive_sync.enable' = 'true', 'hive_sync.table' = '要同步到Hive的表名', 'hive_sync.db' = '要同步到Hive的数据库名', 'hive_sync.mode' = 'hms', 'hive_sync.metastore.uris' = 'Hive客户端hive-site.xml文件中hive.metastore.uris的值', 'properties.hive.metastore.kerberos.principal' = 'Hive客户端hive-site.xml文件中hive.metastore.kerberos.principal的值' );
共100000条