云服务器内容精选

  • 约束与限制 只支持两表Join的场景。 不支持FULL OUTER JOIN的数据倾斜处理。 示例:执行下面SQL语句,a表倾斜或b表倾斜都无法触发该优化。 select aid FROM a FULL OUTER JOIN b ON aid=bid; 不支持LEFT OUTER JOIN的右表倾斜处理。 示例:执行下面SQL语句,b表倾斜无法触发该优化。 select aid FROM a LEFT OUTER JOIN b ON aid=bid; 不支持RIGHT OUTER JOIN的左表倾斜处理。 示例:执行下面SQL语句,a表倾斜无法触发该优化。 select aid FROM a RIGHT OUTER JOIN b ON aid=bid;
  • 操作场景 在Spark SQL多表Join的场景下,会存在关联键严重倾斜的情况,导致Hash分桶后,部分桶中的数据远高于其他分桶。最终导致部分Task过重,运行很慢;其他Task过轻,运行很快。一方面,数据量大Task运行慢,使得计算性能低;另一方面,数据量少的Task在运行完成后,导致很多CPU空闲,造成CPU资源浪费。 通过如下配置项可开启自动进行数据倾斜处理功能,通过将Hash分桶后数据量很大的、且超过数据倾斜阈值的分桶拆散,变成多个task处理一个桶的数据机制,提高CPU资源利用率,提高系统性能。未产生倾斜的数据,将采用原有方式进行分桶并运行。
  • 配置描述 安装Spark客户端。 详细操作请参考安装 MRS 客户端。 使用客户端安装用户登录Spark客户端节点。 在Spark客户端的“{客户端安装目录}/Spark/spark/conf/spark-defaults.conf”配置文件中添加如下表格中的参数。 表1 参数说明 参数 参数说明 取值示例 spark.sql.adaptive.enabled 用于启用或禁用自适应查询执行(Adaptive Query Execution,AQE)功能。该功能允许Spark在运行时动态调整查询计划,以应对数据倾斜、统计信息不准确等问题,从而提升查询性能。 注意:AQE特性与DPP(动态分区裁剪)特性同时开启时,SparkSQL任务执行中会优先执行DPP特性,从而使得AQE特性不生效。集群中DPP特性是默认开启的,因此开启AQE特性的同时,需要将DPP特性关闭。 false spark.sql.optimizer.dynamicPartitionPruning.enabled 用于控制动态分区裁剪(Dynamic Partition Pruning,DPP) 的启用状态。该功能允许Spark在查询执行时根据运行时数据动态过滤分区,从而减少数据扫描量,提升查询性能。 true spark.sql.adaptive.skewJoin.enabled 用于控制自适应倾斜Join优化的启用状态。该功能允许Spark在运行时检测并自动处理数据倾斜问题,避免因个别Task处理大量数据而导致的长尾性能问题。 当此配置为true且spark.sql.adaptive.enabled设置为true时,启用运行时自动处理join运算中的数据倾斜功能。 true spark.sql.adaptive.skewJoin.skewedPartitionFactor 此配置为一个倍数因子,用于判定分区是否为数据倾斜分区。单个分区被判定为数据倾斜分区的条件为:当一个分区的数据大小超过除此分区外其他所有分区大小的中值与该配置的乘积,并且大小超过spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes配置值时,此分区被判定为数据倾斜分区。 5 spark.sql.adaptive.skewjoin.skewedPartitionThresholdInBytes 分区大小(单位:字节)大于该阈值且大于spark.sql.adaptive.skewJoin.skewedPartitionFactor与分区中值的乘积,则认为该分区存在倾斜。理想情况下,此配置应大于spark.sql.adaptive.advisoryPartitionSizeInBytes。 256MB spark.sql.adaptive.shuffle.targetPostShuffleInputSize 每个Task处理的shuffle数据的最小数据量。单位:Byte。 67108864
  • 参考信息 被广播的表执行超时,导致任务结束。 默认情况下,BroadCastJoin只允许被广播的表计算5分钟,超过5分钟该任务会出现超时异常,而这个时候被广播的表的broadcast任务依然在执行,造成资源浪费。 这种情况下,有两种方式处理: 调整“spark.sql.broadcastTimeout”的数值,加大超时的时间限制。 降低“spark.sql.autoBroadcastJoinThreshold”的数值,不使用BroadCastJoin的优化。
  • 操作步骤 在动态分区语句中加入distribute by,by值为分区字段。 示例如下: insert into table store_returns partition (sr_returned_date_sk) select sr_return_time_sk,sr_item_sk,sr_customer_sk,sr_cdemo_sk,sr_hdemo_sk,sr_addr_sk,sr_store_sk,sr_reason_sk,sr_ticket_number,sr_return_quantity,sr_return_amt,sr_return_tax,sr_return_amt_inc_tax,sr_fee,sr_return_ship_cost,sr_refunded_cash,sr_reversed_charge,sr_store_credit,sr_net_loss,sr_returned_date_sk from ${SOURCE}.store_returns distribute by sr_returned_date_sk;
  • 操作场景 SparkSQL在往动态分区表中插入数据时,分区数越多,单个Task生成的HDFS文件越多,则元数据占用的内存也越多。这就导致程序GC(Garbage Collection)严重,甚至发生OOM(Out of Memory)。 经测试证明:10240个Task,2000个分区,在执行HDFS文件从临时目录rename到目标目录动作前,FileStatus元数据大小约29G。为避免以上问题,可修改SQL语句对数据进行重分区,以减少HDFS文件个数。
  • 操作步骤 要使用CBO优化,可以按照以下步骤进行优化。 需要先执行特定的SQL语句来收集所需的表和列的统计信息。 SQL命令如下(根据具体情况选择需要执行的SQL命令): 生成表级别统计信息(扫表): ANALYZE TABLE src COMPUTE STATIS TICS 生成sizeInBytes和rowCount。 使用ANALYZE语句收集统计信息时,无法计算非HDFS数据源的表的文件大小。 生成表级别统计信息(不扫表): ANALYZE TABLE src COMPUTE STATISTI CS NOSCAN 只生成sizeInBytes,如果原来已经生成过sizeInBytes和rowCount,而本次生成的sizeInBytes和原来的大小一样,则保留rowCount(若存在),否则清除rowCount。 生成列级别统计信息 ANALYZE TABLE src COMPUTE STATISTICS FOR COLUMNS a, b, c 生成列统计信息,为保证一致性,会同步更新表统计信息。目前不支持复杂数据类型(如Seq, Map等)和HiveStringType的统计信息生成。 显示统计信息 DESC FORMATTED src 在Statistics中会显示“xxx bytes, xxx rows”分别表示表级别的统计信息。也可以通过如下命令显示列统计信息: DESC FORMATTED src a 使用限制:当前统计信息收集不支持针对分区表的分区级别的统计信息。 在Spark客户端的“spark-defaults.conf”配置文件中进行表1设置。 表1 参数介绍 参数 描述 默认值 spark.sql.cbo.enabled CBO总开关。 true表示打开, false表示关闭。 要使用该功能,需确保相关表和列的统计信息已经生成。 false spark.sql.cbo.joinReorder.enabled 使用CBO来自动调整连续的inner join的顺序。 true:表示打开 false:表示关闭 要使用该功能,需确保相关表和列的统计信息已经生成,且CBO总开关打开。 false spark.sql.cbo.joinReorder.dp.threshold 使用CBO来自动调整连续inner join的表的个数阈值。 如果超出该阈值,则不会调整join顺序。 12