云服务器内容精选

  • Spark写Hudi各种写入模式参数规范说明 类型 说明 开启参数 场景选择 特点 upsert update + insert Hudi默认写入类型,写入具有更新能力。 默认,无需参数开启。 SparkSQL: set hoodie.datasource.write.operation=upsert; DataSource Api: df.write .format("hudi") .options(xxx) .option("hoodie.datasource.write.operation", "upsert") .mode("append") .save("/tmp/tablePath") 默认选择。 优点: 支持小文件合并。 支持更新。 缺点: 写入速度中规中矩。 append 数据无更新直接写入 Spark:Spark侧没有纯append模式可使用bulk insert模式替代。 SparkSQL: set hoodie.datasource.write.operation = bulk_insert; set hoodie.datasource.write.row.writer.enable = true; DataSource Api: df.write .format("hudi") .options(xxx) .option("hoodie.datasource.write.operation", "bulk_insert") .option("hoodie.datasource.write.row.writer.enable", "true") .mode("append") .save("/tmp/tablePath") 追求高吞吐,无数据更新场景。 优点: 写入速度最快。 缺点: 无小文件合并能力。 无更新能力。 需要clustering合并小文件。 delete 删除操作 无需参数,直接使用delete语法即可: delete from tableName where primaryKey='id1'; SQL删除数据数据场景。 和upsert类型一样。 Insert overwrite 覆写分区 无需参数,直接使用insert overwrite语法即可: insert overwrite table tableName partition(dt ='2021-01-04') select * from srcTable; 分区级别重新。 覆写分区。 Insert overwrite table 覆写全表 无需参数,直接使用insert overwrite语法即可: insert overwrite table tableName select * from srcTable; 全部重写。 覆写全表。 Bulk_insert 批量导入 SparkSQL: set hoodie.datasource.write.operation = bulk_insert; set hoodie.datasource.write.row.writer.enable = true; DataSource Api: df.write .format("hudi") .options(xxx) .option("hoodie.datasource.write.operation", "bulk_insert") .option("hoodie.datasource.write.row.writer.enable", "true") .mode("append") .save("/tmp/tablePath") 建议表初始化搬迁的时候使用。 和append模式一样。
  • 优化Spark Shuffle参数提升Hudi写入效率 开启spark.shuffle.readHostLocalDisk=true,本地磁盘读取shuffle数据,减少网络传输的开销。 开启spark.io.encryption.enabled=false,关闭shuffle过程写加密磁盘,提升shuffle效率。 开启spark.shuffle.service.enabled=true,启动shuffle服务,提升任务shuffle的稳定性。 配置项 集群默认值 调整后 --conf spark.shuffle.readHostLocalDisk false true --conf spark.io.encryption.enabled true false --conf spark.shuffle.service.enabled false true
  • Spark加工Hudi表时其他参数优化 设置spark.sql.enableToString=false,降低Spark解析复杂SQL时候内存使用,提升解析效率。 设置spark.speculation=false,关闭推测执行,开启该参数会带来额外的cpu消耗,同时Hudi不支持启动该参数,启用该参数写Hudi有概率导致文件损坏。 配置项 集群默认值 调整后 --conf spark.sql.enableToString true false --conf spark.speculation false false
  • 优化shuffle并行度,提升Spark加工效率 所谓的shuffle并发度如下图所示: 集群默认是200,作业可以单独设置。如果发现瓶颈stage(执行时间长),且分配给当前作业的核数大于当前的并发数,说明并发度不足。通过以下配置优化。 场景 配置项 集群默认值 调整后 Jar作业 spark.default.parallelism 200 按实际作业可用资源2倍设置 SQL作业 spark.sql.shuffle.partitions 200 按实际作业可用资源2倍设置 hudi入库作业 hoodie.upsert.shuffle.parallelism 200 非bucket表使用,按实际作业可用资源2倍设置 动态资源调度情况下(spark.dynamicAllocation.enabled= true)时,资源按照spark.dynamicAllocation.maxExecutors评估。
  • 调整Spark调度参数优化OBS场景下Spark调度时延 开启对于OBS存储,可以关闭Spark的本地性进行优化,尽可能提升Spark调度效率 配置项 集群默认值 调整后 --conf spark.locality.wait 3s 0s --conf spark.locality.wait.process 3s 0s --conf spark.locality.wait.node 3s 0s --conf spark.locality.wait.rack 3s 0s
  • 初始化Hudi表时,可以使用BulkInsert方式快速写入数据 示例: set hoodie.combine.before.insert=true; // 入库前去重,如果数据没有重复 该参数无需设置 set hoodie.datasource.write.operation = bulk_insert; // 指定写入方式为bulk insert方式。 set hoodie.bulkinsert.shuffle.parallelism = 4; // 指定bulk_insert写入时的并行度,等于写入完成后保存的分区parquet文件数 insert into dsrTable select * from srcTabble
  • 初始化Hudi表时,可以使用BulkInsert方式快速写入数据 示例: set hoodie.combine.before.insert=true; // 入库前去重,如果数据没有重复 该参数无需设置 set hoodie.datasource.write.operation = bulk_insert; // 指定写入方式为bulk insert方式。 set hoodie.bulkinsert.shuffle.parallelism = 4; // 指定bulk_insert写入时的并行度,等于写入完成后保存的分区parquet文件数 insert into dsrTable select * from srcTabble
  • Spark加工Hudi表时其他参数优化 设置spark.sql.enableToString=false,降低Spark解析复杂SQL时候内存使用,提升解析效率。 设置spark.speculation=false,关闭推测执行,开启该参数会带来额外的cpu消耗,同时Hudi不支持启动该参数,启用该参数写Hudi有概率导致文件损坏。 配置项 集群默认值 调整后 --conf spark.sql.enableToString true false --conf spark.speculation false false
  • 调整Spark调度参数优化OBS场景下Spark调度时延 开启对于OBS存储,可以关闭Spark的本地性进行优化,尽可能提升Spark调度效率 配置项 集群默认值 调整后 --conf spark.locality.wait 3s 0s --conf spark.locality.wait.process 3s 0s --conf spark.locality.wait.node 3s 0s --conf spark.locality.wait.rack 3s 0s
  • 优化Spark Shuffle参数提升Hudi写入效率 开启spark.shuffle.readHostLocalDisk=true,本地磁盘读取shuffle数据,减少网络传输的开销。 开启spark.io.encryption.enabled=false,关闭shuffle过程写加密磁盘,提升shuffle效率。 开启spark.shuffle.service.enabled=true,启动shuffle服务,提升任务shuffle的稳定性。 配置项 集群默认值 调整后 --conf spark.shuffle.readHostLocalDisk false true --conf spark.io.encryption.enabled true false --conf spark.shuffle.service.enabled false true
  • 优化shuffle并行度,提升Spark加工效率 所谓的shuffle并发度如下图所示: 集群默认是200,作业可以单独设置。如果发现瓶颈stage(执行时间长),且分配给当前作业的核数大于当前的并发数,说明并发度不足。通过以下配置优化。 场景 配置项 集群默认值 调整后 Jar作业 spark.default.parallelism 200 按实际作业可用资源2倍设置 SQL作业 spark.sql.shuffle.partitions 200 按实际作业可用资源2倍设置 hudi入库作业 hoodie.upsert.shuffle.parallelism 200 非bucket表使用,按实际作业可用资源2倍设置 动态资源调度情况下(spark.dynamicAllocation.enabled= true)时,资源按照spark.dynamicAllocation.maxExecutors评估。