-
分析性能瓶颈 通过查看内存、反压等监控性能指标,确定性能下降的原因,以及瓶颈位于源端还是目的端。 图3 通过监控指标分析性能瓶颈 内存占用100% 当作业的内存占用达到100%时,意味着内存资源不足,需要扩大作业的内存配置或降低对内存的需求。 内存使用率达到100%时的实时作业监控指标: 图4 作业内存耗尽时的监控指标状态 图5 作业内存排查与调优策略 针对内存问题,优先通过增加任务配置中的并发数来解决,增加并发可减轻每个taskmanager上的内存压力。 增大并发后如果内存占用率仍比较大,可以根据不同的链路做组件级的参数调优,详情请参考各个链路的参数调优介绍,包括: 减小数据缓存的大小和时间。 加快缓存数据的flush速度。 对目的端的表结构进行优化,以提高写入性能。 增加单个taskmanager的处理内存,注意不要造成Migration资源组的资源使用率统计不准确。 对读写速率限流,适合数据量不大对作业稳定性需求较高的场景。 特殊场景: 监控指标显示内存使用率正常,但是日志中搜索到关键字OutOfMemery或OOMKilled,同样可以判断为内存不足场景。 虽然监控指标显示内存使用率正常,但日志中发现OutOfMemory或OOMKilled关键字,同样认为是内存不足。 分库分表场景下,如果源端数据库实例或分表数据过大,可能会导致jobmanager内存占用满,通过设置参数jobmanager.memory.process.size = 6G来即时扩展jobmanager节点内存。 作业持续反压100 长时间作业反压100%,表明可能是目的端写入性能瓶颈,原因可能为: 作业目的端配置或者建表不合理,性能仍有优化空间。 目的端集群压力过大。 图6 作业反压调优策略 图7 正常反压监控图 图8 作业反压持续100监控图 针对作业配置或建表不合理的场景,可以参考各个链路参数调优指导中关于目的端的介绍。 针对目的端集群压力较大的场景需要及时联系目的端数据库运维或开发者调整集群状态。 作业反压正常(binlog激增) 作业反压正常可能为源端抽取性能瓶颈,大部分场景为业务量上涨导致源端binlog激增。部分作业会存在网络问题导致数据抽取速率不足,时延上升。 源端binlog激增时的MySQ
L实例 监控: 图9 binlog激增MySQL监控图-1 图10 binlog激增MySQL监控图-2 单小时内binlog文件个数增长破百。 源端binlog文件激增时,需要及时进行以下操作: 调大实时作业并发,避免目的端写入成为瓶颈,导致源端抽取压力。 调大taskmanager堆内存,避免数据量过大作业异常。 在任务配置中添加参数: taskmanager.memory.managed.fraction = 0(默认0.2) taskmanager.memory.process.size = 4G(默认不足4G) 适时将时延较大的表从实时作业中拆分出去,通过离线迁移完成大表的数据迁移。
-
初步分析与调试 图1 性能调优初步分析与调试 由于全量阶段的数据量较大,需要更高的并发资源。当全量阶段同步速率较低时,请优先通过增加任务并发来解决。全量阶段完成后,可以暂停作业回调资源。如果上述措施对提升同步效率效果不明显,可进一步分析性能瓶颈,详情请参见分析性能瓶颈部分。 当增量阶段同步速率慢或时延增加时,首先观察监控指标或作业告警中是否有作业重试。作业重试可能阻塞数据同步进程,导致时延上升,需通过查看作业日志找到并解决作业重试问题。 图2 作业监控指标显示作业近期存在重试 增量阶段如果没有作业重试但同步效率仍低,则需进一步分析性能瓶颈,详情请参见分析性能瓶颈部分。
-
调优流程 调优流程如图1所示。 图1 Fabric SQL性能调优流程 调优各阶段说明,如表1所示。 表1 Fabric SQL性能调优流程说明 阶段 描述 系统调优 进行系统级的调优,更充分地利用机器的CPU、内存、I/O和网络资源,提升查询的吞吐量。 SQL调优 审视业务所用SQL语句是否存在可优化空间,包括: 通过ANALYZE语句生成表统计信息:ANALYZE语句可收集与数据库中表内容相关的统计信息,统计结果存储在系统表PG_STATISTIC中。执行计划生成器会使用这些统计数据,以确定最有效的执行计划。 分析执行计划:EXPLAIN语句可显示SQL语句的执行计划,EXPLAIN PERFORMANCE语句可显示SQL语句中各算子的执行时间。 查找问题根因并进行调优:通过分析执行计划,找到可能存在的原因,进行针对性的调优,通常为调整数据库级SQL调优参数。 编写更优的SQL:介绍一些复杂查询中的中间临时数据缓存、结果集缓存、结果集合并等场景中的更优SQL语法。
-
问题排查步骤 磁盘或其他存储介质问题导致merge过慢或者中止。 登录Manager页面,检查是否存在磁盘容量不足或其他磁盘告警,如果存在,请按照告警指导处理。 如果是磁盘容量不足,也可以联系客户删除部分过期数据,释放空间,快速恢复业务。 Zookeeper异常导致merge无法正常执行。 登录Manager页面,检查ZooKeeper是否存在服务不可用、ClickHouse服务在ZooKeeper的数量配额使用率超过阈值等相关告警,如果存在,请按照告警指导处理。 执行如下SQL排查是否存在副本同步队列任务积压: select FQDN() as node,type,count() from clusterAllReplicas(default_cluster, system.replication_queue) group by node,type; 如果存在积压,请查看副本队列中的任务是否报错,并根据报错信息处理。 执行如下SQL排查是否存在节点间表结构不一致。 select FQDN(), create_table_query from clusterAllReplicas(default_cluster,system.tables) where name = '${table_name}' group by FQDN(),create_table_query; 如果存在,请将不一致的表结构修改一致。 执行如下SQL排查是否存在mutation任务异常: select FQDN(), database, table, mutation_id, create_time, command from clusterAllReplicas(default_cluster, system.mutations) where is_done = '0' order by create_time asc; 如果mutation任务正常,等待mutation任务完成,如果mutation任务异常,清理异常的mutation任务。 业务写入压力过大导致merge速度小于insert速度。 可以用以下SQL语句检查报错节点最近一小时的写入条数和频次: select tables,written_rows,count() from system.query_log where type='QueryFinish' and query_start_time between (toUnixTimestamp(now()) - 3600) AND toUnixTimestamp(now()) and query_kind = 'Insert' group by tables,written_rows order by written_rows limit 10; 业务上建议一次写入一个分区,写入频率不要太快,不要小批量数据的插入,适当增大每次插入的时间间隔。 如果没有触发Merge,或者Merge较慢,需要调整参数加快Merge。 加速Merge,需要调整如下参数,请参考加速Merge操作: 配置项 参考值 max_threads CPU核数*2 background_pool_size CPU核数 merge_max_block_size 8192的整数倍,根据CPU内存资源大小调整 cleanup_delay_period 适当小于默认值30
-
操作场景 如果多个Hadoop集群由于NameNode超负荷运行并失去响应而发生故障,这种阻塞现象是由于Hadoop的初始设计造成的。在Hadoop中,NameNode作为单独的机器,在其namespace内协调HDFS的各种操作,这些操作包括获取数据块位置、列出目录及创建文件。NameNode接受HDFS的操作,将其视作RPC调用并置入FIFO调用队列,供读取线程处理。虽然FIFO在先到先服务的情况下足够公平,但如果用户执行的I/O操作较多,相比I/O操作较少的用户,将获得更多的服务。在这种情况下,FIFO有失公平并且会导致延迟增加。 图1 基于FIFO调用队列的NameNode请求处理 如果将FIFO队列替换为一种被称作FairCallQueue的新型队列,这种情况就能够得到改善。按照这种方法,FAIR队列会根据调用者的调用规模将传入的RPC调用分配至多个队列中。调度模块会跟踪最新的调用,并为调用量较小的用户分配更高的优先级。 图2 基于FAIRCallQueue的NameNode请求处理
-
源端优化 SQLServer抽取优化。 可通过在作业任务配置参数单击中“添加自定义属性”来新增SQLServer同步参数。 图1 添加自定义属性 可使用的调优参数具体如下: 表1 全量阶段优化参数 参数名 类型 默认值 说明 scan.incremental.snapshot.backfill.skip boolean true 全量阶段是否跳过读取Binlog数据,默认为true。跳过读取Binlog数据可以有效降低内存使用。需要注意的是,跳过读取Binlog功能只提供at-least-once保证。 表2 增量阶段优化参数 参数名 类型 默认值 说明 debezium.max.iteration.transactions int 1000 每张表在重演数据时每次抽取的数据条数,值较大时,会使得内存升高并阻塞增量同步任务。
-
操作场景 Spark系统在运行含shuffle过程的应用时,Executor进程除了运行task,还要负责写shuffle数据以及给其他Executor提供shuffle数据。当Executor进程任务过重,导致触发GC(Garbage Collection)而不能为其他Executor提供shuffle数据时,会影响任务运行。 External shuffle Service是长期存在于NodeManager进程中的一个辅助服务。通过该服务来抓取shuffle数据,减少了Executor的压力,在Executor GC的时候也不会影响其他Executor的任务运行。
-
约束与限制 只支持两表Join的场景。 不支持FULL OUTER JOIN的数据倾斜处理。 示例:执行下面SQL语句,a表倾斜或b表倾斜都无法触发该优化。 select aid FROM a FULL OUTER JOIN b ON aid=bid; 不支持LEFT OUTER JOIN的右表倾斜处理。 示例:执行下面SQL语句,b表倾斜无法触发该优化。 select aid FROM a LEFT OUTER JOIN b ON aid=bid; 不支持RIGHT OUTER JOIN的左表倾斜处理。 示例:执行下面SQL语句,a表倾斜无法触发该优化。 select aid FROM a RIGHT OUTER JOIN b ON aid=bid;
-
操作场景 在Spark SQL多表Join的场景下,会存在关联键严重倾斜的情况,导致Hash分桶后,部分桶中的数据远高于其他分桶。最终导致部分Task过重,运行很慢;其他Task过轻,运行很快。一方面,数据量大Task运行慢,使得计算性能低;另一方面,数据量少的Task在运行完成后,导致很多CPU空闲,造成CPU资源浪费。 通过如下配置项可开启自动进行数据倾斜处理功能,通过将Hash分桶后数据量很大的、且超过数据倾斜阈值的分桶拆散,变成多个task处理一个桶的数据机制,提高CPU资源利用率,提高系统性能。未产生倾斜的数据,将采用原有方式进行分桶并运行。
-
配置描述 安装Spark客户端。 详细操作请参考安装
MRS 客户端。 使用客户端安装用户登录Spark客户端节点。 在Spark客户端的“{客户端安装目录}/Spark/spark/conf/spark-defaults.conf”配置文件中添加如下表格中的参数。 表1 参数说明 参数 参数说明 取值示例 spark.sql.adaptive.enabled 用于启用或禁用自适应查询执行(Adaptive Query Execution,AQE)功能。该功能允许Spark在运行时动态调整查询计划,以应对数据倾斜、统计信息不准确等问题,从而提升查询性能。 注意:AQE特性与DPP(动态分区裁剪)特性同时开启时,SparkSQL任务执行中会优先执行DPP特性,从而使得AQE特性不生效。集群中DPP特性是默认开启的,因此开启AQE特性的同时,需要将DPP特性关闭。 false spark.sql.optimizer.dynamicPartitionPruning.enabled 用于控制动态分区裁剪(Dynamic Partition Pruning,DPP) 的启用状态。该功能允许Spark在查询执行时根据运行时数据动态过滤分区,从而减少数据扫描量,提升查询性能。 true spark.sql.adaptive.skewJoin.enabled 用于控制自适应倾斜Join优化的启用状态。该功能允许Spark在运行时检测并自动处理数据倾斜问题,避免因个别Task处理大量数据而导致的长尾性能问题。 当此配置为true且spark.sql.adaptive.enabled设置为true时,启用运行时自动处理join运算中的数据倾斜功能。 true spark.sql.adaptive.skewJoin.skewedPartitionFactor 此配置为一个倍数因子,用于判定分区是否为数据倾斜分区。单个分区被判定为数据倾斜分区的条件为:当一个分区的数据大小超过除此分区外其他所有分区大小的中值与该配置的乘积,并且大小超过spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes配置值时,此分区被判定为数据倾斜分区。 5 spark.sql.adaptive.skewjoin.skewedPartitionThresholdInBytes 分区大小(单位:字节)大于该阈值且大于spark.sql.adaptive.skewJoin.skewedPartitionFactor与分区中值的乘积,则认为该分区存在倾斜。理想情况下,此配置应大于spark.sql.adaptive.advisoryPartitionSizeInBytes。 256MB spark.sql.adaptive.shuffle.targetPostShuffleInputSize 每个Task处理的shuffle数据的最小数据量。单位:Byte。 67108864
-
注意事项 Join数据倾斜问题。执行任务的时候,任务进度长时间维持在99%,这种现象叫数据倾斜。 数据倾斜是经常存在的,因为有少量的Reduce任务分配到的数据量和其他Reduce差异过大,导致大部分Reduce都已完成任务,但少量Reduce任务还没完成的情况。 解决数据倾斜的问题,可通过设置“set hive.optimize.skewjoin=true”并调整“hive.skewjoin.key”的大小。“hive.skewjoin.key”是指Reduce端接收到多少个key即认为数据是倾斜的,并自动分发到多个Reduce。
-
Sort Merge Bucket Map Join 使用Sort Merge Bucket Map Join必须满足以下2个条件: join的两张表都很大,内存中无法存放。 两张表都按照join key进行分桶(clustered by (column))和排序(sorted by(column)),且两张表的分桶数正好是倍数关系。 通过如下设置,启用Sort Merge Bucket Map Join: 设置“hive.optimize.bucketmapjoin”参数值为“true”: set hive.optimize.bucketmapjoin=true; 设置“hive.optimize.bucketmapjoin.sortedmerge”参数值为“true”: set hive.optimize.bucketmapjoin.sortedmerge=true; 这种Map Join也没有Reduce任务,是在Map任务前启动MapReduce Local Task,将小表内容按桶读取到本地,在本机保存多个桶的HashTable备份并写入HDFS,并保存在Distributed Cache中,在Map Task中从本地磁盘或者Distributed Cache中按桶一个一个读取小表内容,然后与大表做匹配直接得到结果并输出。
-
Map Join Hive的Map Join适用于能够在内存中存放下的小表(指表大小小于25000000 BYTES),通过“hive.mapjoin.smalltable.filesize”定义小表的大小,默认为25000000 BYTES。 Map Join的方法有两种: 使用/*+ MAPJOIN(join_table) */。 执行语句前设置如下参数,当前版本中该值默认为“true”。 set hive.auto.convert.join=true; 使用Map Join时没有Reduce任务,而是在Map任务前起了一个MapReduce Local Task,这个Task通过TableScan读取小表内容到本机,在本机以HashTable的形式保存并写入硬盘上传到DFS,并在Distributed Cache中保存,在Map Task中从本地磁盘或者Distributed Cache中读取小表内容直接与大表join得到结果并输出。 使用Map Join时需要注意小表不能过大,如果小表将内存基本用尽,会使整个系统性能下降甚至出现内存溢出的异常。
-
操作场景 如果多个Hadoop集群由于NameNode超负荷运行并失去响应而发生故障,这种阻塞现象是由于Hadoop的初始设计造成的。在Hadoop中,NameNode作为单独的机器,在其namespace内协调HDFS的各种操作,这些操作包括获取数据块位置、列出目录及创建文件。NameNode接受HDFS的操作,将其视作RPC调用并置入FIFO调用队列,供读取线程处理。虽然FIFO在先到先服务的情况下足够公平,但如果用户执行的I/O操作较多,相比I/O操作较少的用户,将获得更多的服务。在这种情况下,FIFO有失公平并且会导致延迟增加。 图1 基于FIFO调用队列的NameNode请求处理 如果将FIFO队列替换为一种被称作FairCallQueue的新型队列,这种情况就能够得到改善。按照这种方法,FAIR队列会根据调用者的调用规模将传入的RPC调用分配至多个队列中。调度模块会跟踪最新的调用,并为调用量较小的用户分配更高的优先级。 图2 基于FAIRCallQueue的NameNode请求处理
-
参考信息 被广播的表执行超时,导致任务结束。 默认情况下,BroadCastJoin只允许被广播的表计算5分钟,超过5分钟该任务会出现超时异常,而这个时候被广播的表的broadcast任务依然在执行,造成资源浪费。 这种情况下,有两种方式处理: 调整“spark.sql.broadcastTimeout”的数值,加大超时的时间限制。 降低“spark.sql.autoBroadcastJoinThreshold”的数值,不使用BroadCastJoin的优化。