云服务器内容精选

  • 操作场景 Hudi提供多种写入方式,具体见hoodie.datasource.write.operation配置项,这里主要介绍UPSERT、INSERT和BULK_INSERT。 INSERT(插入): 该操作流程和UPSERT基本一致,但是不需要通过索引去查询具体更新的文件分区,因此它的速度比UPSERT快。当数据源不包含更新数据时建议使用该操作,若数据源中存在更新数据,则在数据湖中会出现重复数据。 BULK_INSERT(批量插入):用于初始数据集加载, 该操作会对主键进行排序后直接以写普通parquet表的方式插入Hudi表,该操作性能是最高的,但是无法控制小文件,而UPSERT和INSERT操作使用启发式方法可以很好的控制小文件。 UPSERT(插入更新): 默认操作类型。Hudi会根据主键进行判断,如果历史数据存在则update如果不存在则insert。因此在对于CDC之类几乎肯定包括更新的数据源,建议使用该操作。 由于INSERT时不会对主键进行排序,所以初始化数据集不建议使用INSERT。 在确定数据都为新数据时建议使用INSERT,当存在更新数据时建议使用UPSERT,当初始化数据集时建议使用BULK_INSERT。
  • 批量写入Hudi表 引入Hudi包生成测试数据,参考快速入门章节的2到4。 写入Hudi表,写入命令中加入参数:option("hoodie.datasource.write.operation", "bulk_insert"),指定写入方式为bulk_insert,如下所示: df.write.format("org.apache.hudi"). options(getQuickstartWriteConfigs). option("hoodie.datasource.write.precombine.field", "ts"). option("hoodie.datasource.write.recordkey.field", "uuid"). option("hoodie.datasource.write.partitionpath.field", ""). option("hoodie.datasource.write.operation", "bulk_insert"). option("hoodie.table.name", tableName). option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator"). option("hoodie.datasource.hive_sync.enable", "true"). option("hoodie.datasource.hive_sync.partition_fields", ""). option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.NonPartitionedExtractor"). option("hoodie.datasource.hive_sync.table", tableName). option("hoodie.datasource.hive_sync.use_jdbc", "false"). option("hoodie.bulkinsert.shuffle.parallelism", 4). mode(Overwrite). save(basePath) 示例中各参数介绍请参考表1。 使用spark datasource接口更新Mor表,Upsert写入小数据量时可能触发更新数据的小文件合并,使在Mor表的读优化视图中能查到部分更新数据。 当update的数据对应的base文件是小文件时,insert中的数据和update中的数据会被合在一起和base文件直接做合并产生新的base文件,而不是写log。