华为云用户手册

  • 设置分区 多级分区 配置项 说明 hoodie.datasource.write.partitionpath.field 配置为多个业务字段,用逗号分隔。 hoodie.datasource.hive_sync.partition_fields 和hoodie.datasource.write.partitionpath.field的分区字段保持一致。 hoodie.datasource.write.keygenerator.class 配置为org.apache.hudi.keygen.ComplexKeyGenerator。 hoodie.datasource.hive_sync.partition_extractor_class 配置为org.apache.hudi.hive.MultiPartKeysValueExtractor。 例:创建分区为p1/p2/p3的多级分区COW表 df.write.format("org.apache.hudi").option("hoodie.datasource.write.table.type", COW_TABLE_TYPE_OPT_VAL).option("hoodie.datasource.write.precombine.field", "update_time").option("hoodie.datasource.write.recordkey.field", "id").option("hoodie.datasource.write.partitionpath.field", "year,month,day").option("hoodie.datasource.write.operation", "bulk_insert").option("hoodie.table.name", tableName).option("hoodie.write.lock.provider", "com.huawei.luxor.hudi.util.DliCatalogBasedLockProvider").option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator").option("hoodie.datasource.hive_sync.enable", "true").option("hoodie.datasource.hive_sync.partition_fields", "year,month,day").option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").option("hoodie.datasource.hive_sync.database", databaseName).option("hoodie.datasource.hive_sync.table", tableName).option("hoodie.datasource.hive_sync.use_jdbc", "false").mode(SaveMode.Overwrite).save(basePath) 单分区 配置项 说明 hoodie.datasource.write.partitionpath.field 配置为一个业务字段。 hoodie.datasource.hive_sync.partition_fields 和hoodie.datasource.write.partitionpath.field分区字段保持一致。 hoodie.datasource.write.keygenerator.class 默认可以配置为org.apache.hudi.keygen.SimpleKeyGenerator 和org.apache.hudi.keygen.ComplexKeyGenerator hoodie.datasource.hive_sync.partition_extractor_class 配置为org.apache.hudi.hive.MultiPartKeysValueExtractor。 例:创建分区为p1的单分区的MOR表 df.write.format("org.apache.hudi").option("hoodie.datasource.write.table.type", MOR_TABLE_TYPE_OPT_VAL).option("hoodie.datasource.write.precombine.field", "update_time").option("hoodie.datasource.write.recordkey.field", "id").option("hoodie.datasource.write.partitionpath.field", "create_time").option("hoodie.datasource.write.operation", "bulk_insert").option("hoodie.table.name", tableName).option("hoodie.write.lock.provider", "com.huawei.luxor.hudi.util.DliCatalogBasedLockProvider").option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator").option("hoodie.datasource.hive_sync.enable", "true").option("hoodie.datasource.hive_sync.partition_fields", "create_time").option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").option("hoodie.datasource.hive_sync.database", databaseName).option("hoodie.datasource.hive_sync.table", tableName).option("hoodie.datasource.hive_sync.use_jdbc", "false").mode(SaveMode.Overwrite).save(basePath) 无分区 配置项 说明 hoodie.datasource.write.partitionpath.field 配置为空字符串。 hoodie.datasource.hive_sync.partition_fields 配置为空字符串。 hoodie.datasource.write.keygenerator.class 配置为org.apache.hudi.keygen.NonpartitionedKeyGenerator。 hoodie.datasource.hive_sync.partition_extractor_class 配置为org.apache.hudi.hive.NonPartitionedExtractor。 例:创建无分区COW表 df.write.format("org.apache.hudi").option("hoodie.datasource.write.table.type", COW_TABLE_TYPE_OPT_VAL).option("hoodie.datasource.write.precombine.field", "update_time").option("hoodie.datasource.write.recordkey.field", "id").option("hoodie.datasource.write.partitionpath.field", "").option("hoodie.datasource.write.operation", "bulk_insert").option("hoodie.table.name", tableName).option("hoodie.write.lock.provider", "com.huawei.luxor.hudi.util.DliCatalogBasedLockProvider").option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator").option("hoodie.datasource.hive_sync.enable", "true").option("hoodie.datasource.hive_sync.partition_fields", "").option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.NonPartitionedExtractor").option("hoodie.datasource.hive_sync.database", databaseName).option("hoodie.datasource.hive_sync.table", tableName).option("hoodie.datasource.hive_sync.use_jdbc", "false").mode(SaveMode.Overwrite).save(basePath)
  • 使用约束 新增列在设置默认值前,如果数据已经进行了重写,则查询历史数据不支持返回列的默认值,返回NULL。数据入库、更新、执行Compaction、Clustering都会导致部分或全部数据重写。 列的默认值设置要与列的类型一致,如不一致会进行类型强转,导致默认值精度丢失或者默认值为NULL。 历史数据的默认值与列第一次设置的默认值一致,多次修改列的默认值不会影响历史数据的查询结果。 设置默认值后rollback不能回滚默认值配置。 Spark SQL暂不支持查看列默认值信息,可以通过执行show create table SQL查看。 不支持默认缺省列的写入方式,写入时必须指定列名。
  • 示例 SQL语法具体参考 DLI Hudi SQL语法参考章节。 示例: 建表指定列默认值 create table if not exists h3(id bigint,name string,price double default 12.34) using hudioptions (primaryKey = 'id',type = 'mor',preCombineField = 'name'); 添加列指定列默认值 alter table h3 add columns(col1 string default 'col1_value');alter table h3 add columns(col2 string default 'col2_value', col3 int default 1); 修改列默认值 alter table h3 alter column price set default 14.56; 插入数据使用列默认值,需要指定写入的列名,和插入的数据一一对应 insert into h3(id, name) values(1, 'aaa');insert into h3(id, name, price) select 2, 'bbb', 12.5;
  • 参数描述 表1 参数描述 参数 描述 是否必填 table 需要查询表的表名,支持database.tablename格式 是 clean_policy 清理老版本数据文件的策略,默认KEEP_LATEST_COMMITS 否 retain_commits 仅对KEEP_LATEST_COMMITS策略有效 否 hours_retained 仅对KEEP_LATEST_BY_HOURS策略有效 否 file_version_retained 仅对KEEP_LATEST_FILE_VERSIONS策略有效 否
  • 如何执行Compaction 仅执行Schedule Spark SQL(设置如下参数,写数据时触发) hoodie.compact.inline=truehoodie.schedule.compact.only.inline=truehoodie.run.compact.only.inline=falsehoodie.compact.inline.max.delta.commits=5 // 默认值为5,根据业务场景指定 随后执行任意写入SQL时,在满足条件后(同一个file slice下存在5个 delta log文件),会触发compaction。 Spark SQL(设置如下参数,手动触发1次) hoodie.compact.inline=truehoodie.schedule.compact.only.inline=truehoodie.run.compact.only.inline=falsehoodie.compact.inline.max.delta.commits=5 // 默认值为5,根据业务场景指定 随后手动执行SQL: schedule compaction on ${table_name} SparkDataSource(option里设置如下参数,写数据时触发) hoodie.compact.inline=true hoodie.schedule.compact.only.inline=true hoodie.run.compact.only.inline=false hoodie.compact.inline.max.delta.commits=5 // 默认值为5,根据业务场景指定 Flink(with属性里设置如下参数,写数据时触发) compaction.async.enabled=false compaction.schedule.enabled=true compaction.delta_commits=5 // 默认值为5,根据业务场景指定 仅执行Run Spark SQL(设置如下参数,手动触发1次) hoodie.compact.inline=truehoodie.schedule.compact.only.inline=falsehoodie.run.compact.only.inline=true 随后执行如下SQL run compaction on ${table_name} Schedule+Run同时执行 如果TimeLine中没有Compaction Plan,就尝试生成一个Compaction Plan去执行。 Spark SQL(设置如下参数,随后执行任意写入SQL时,在满足条件时触发) hoodie.compact.inline=truehoodie.schedule.compact.only.inline=falsehoodie.run.compact.only.inline=falsehoodie.compact.inline.max.delta.commits=5 // 默认值为5,根据业务场景指定 SparkDataSource(option里设置如下参数,写数据时触发) hoodie.compact.inline=true hoodie.schedule.compact.only.inline=false hoodie.run.compact.only.inline=false hoodie.compact.inline.max.delta.commits=5 // 默认值为5,根据业务场景指 Flink(with属性里设置如下参数,写数据时触发) compaction.async.enabled=true compaction.schedule.enabled=false compaction.delta_commits=5 // 默认值为5,根据业务场景指定 推荐方案 Spark/Flink流任务仅执行Schedule,然后另起一个Spark SQL任务定时仅执行Run。 Spark批任务可以直接同时执行Schedule + Run。 为了保证入湖的最高效率,推荐使用同步产生compaction调度计划,异步执行compaction调度计划。
  • 什么是Compaction Compaction用于合并mor表Base和Log文件,Compaction包含两个过程Schedule和Run。Schedule过程会在TimeLine里生成一个Compaction Plan,这个Compaction Plan会记录哪些parquet文件将会与哪些log文件进行合并,但是仅仅是一个Plan,没有去合并。Run过程会将TimeLine里的所有Compaction Plan一个一个去执行,一直到全部都执行完。 对于Merge-On-Read表,数据使用列式Parquet文件和行式Avro文件存储,更新被记录到增量文件,然后进行同步/异步compaction生成新版本的列式文件。Merge-On-Read表可减少数据摄入延迟,因而进行不阻塞摄入的异步Compaction很有意义。
  • 如何执行Archive 写完数据后archive Spark SQL(set设置如下参数,写数据时触发) hoodie.archive.automatic=truehoodie.keep.max.commits=30 // 默认值为30,根据业务场景指定hoodie.keep.min.commits=20 // 默认值为20,根据业务场景指定 SparkDataSource(option里设置如下参数,写数据时触发) hoodie.archive.automatic=true hoodie.keep.max.commits=30 // 默认值为30,根据业务场景指定 hoodie.keep.min.commits=20 // 默认值为20,根据业务场景指定 Flink(with属性里设置如下参数,写数据时触发) hoodie.archive.automatic=true archive.max_commits=30 // 默认值为30,根据业务场景指定 archive.min_commits=20 // 默认值为20,根据业务场景指定 手动触发1次archive Spark SQL(设置如下参数,手动触发1次) hoodie.archive.automatic=truehoodie.keep.max.commits=30 // 默认值为30,根据业务场景指定hoodie.keep.min.commits=20 // 默认值为20,根据业务场景指定 随后执行SQL,当执行过clean,Timeline中存在数据文件已经被清理的Instant,且总Instant数量超过30时,会触发archive。 run archivelog on ${table_name}
  • 返回结果 参数 描述 path_num 指定目录的子目录数量 file_num 指定目录的文件数量 storage_size 该目录的Size(bytes) storage_size(unit) 该目录的Size(KB) storage_path 指定目录的完整FS绝对路径 space_consumed 返回文件/目录在集群中占用的实际空间,即它考虑了为集群设置的复制因子 quota 名称配额(名称配额是对当前目录树中的文件和目录名称数量的硬性限制) space_quota 空间配额(空间配额是对当前目录树中的文件所使用的字节数量的硬性限制)
  • Hudi锁配置说明 提交Spark jar作业时需要手动配置Hudi锁。 当使用DLI托管的元数据服务时,必须配置Hudi锁开启,且配置使用DLI提供的Hudi锁实现类: 配置项 配置值 hoodie.write.lock.provider com.huawei.luxor.hudi.util.DliCatalogBasedLockProvider 同样在使用LakeFormation服务提供的元数据服务时,需要配置并使用LakeFormation提供的Hudi锁实现类。 配置项 配置值 hoodie.write.lock.provider org.apache.hudi.lakeformation.LakeCatMetastoreBasedLockProvider 关闭Hudi锁,或者使用其他的锁实现类时,存在数据丢失/异常的风险。 在任何情况下,DLI不对因关闭Hudi锁,或者使用与元数据服务不匹配的锁实现类,而直接或间接导致的任何形式的损失或损害承担责任,包括但不限于商业利润损失、业务中断、数据丢失或其他财务损失。 父主题: Spark datasource API语法参考
  • 参数描述 表1 CREATE TABLE参数描述 参数 描述 database_name Database名称,由字母、数字和下划线(_)组成。 table_name Database中的表名,由字母、数字和下划线(_)组成。 columnTypeList 以逗号分隔的带数据类型的列表。列名由字母、数字和下划线(_)组成。 using 参数hudi,定义和创建Hudi table。 table_comment 表的描述信息。 location_path OBS路径,指定该路径Hudi 表会创建为外表。 options_list Hudi table属性列表。
  • 注意事项 Hudi当前不支持使用char、varchar、tinyint、smallint类型,建议使用string或int类型。 Hudi当前只有int、bigint、float、double、decimal、string、date、timestamp、boolean、binary类型支持设置默认值。 Hudi表必须指定primaryKey与preCombineField。 在指定路径下创建表时,如果路径下已存在Hudi表,则建表时不需要指定列,且不能修改表的原有属性。
  • 参数描述 表1 参数描述 参数 描述 table_name 需要清理无效数据文件的Hudi表的表名,必选。 op_type 命令运行模式,可选,默认值为dry_run,取值:dry_run、repair、undo、query。 dry_run:显示需要清理的无效数据文件。 repair:显示并清理无效的数据文件。 undo:恢复已清理的数据文件 query:显示已执行清零操作的备份目录。 backup_path 运行模式为undo时有效,需要恢复数据文件的备份目录,必选。 start_time 运行模式为dry_run、repair时有效,产生无效数据文件的开始时间,可选,默认不限制开始时间。 end_time 运行模式为dry_run、repair时有效,产生无效数据文件的结束时间,可选,默认不限制结束时间。
  • 示例 创建非分区表 create table if not exists hudi_table0 (id int,name string,price double) using hudioptions (type = 'cow',primaryKey = 'id',preCombineField = 'price'); 创建分区表 create table if not exists hudi_table_p0 (id bigint,name string,ts bigint,dt string,hh string) using hudioptions (type = 'cow',primaryKey = 'id',preCombineField = 'ts')partitioned by (dt, hh); 在指定路径下创建表 create table if not exists h3(id bigint,name string,price double) using hudioptions (primaryKey = 'id',preCombineField = 'price')location 'obs://bucket/path/to/hudi/h3';
  • 规则 有数据持续写入的表,24小时内至少执行一次compaction。 对于MOR表,不管是流式写入还是批量写入,需要保证每天至少完成1次Compaction操作。如果长时间不做compaction,Hudi表的log将会越来越大,这必将会出现以下问题: Hudi表读取很慢,且需要很大的资源。 这是由于读MOR表涉及到log合并,大log合并需要消耗大量的资源并且速度很慢。 长时间进行一次Compaction需要耗费很多资源才能完成,且容易出现OOM。 阻塞Clean,如果没有Compaction操作来产生新版本的Parquet文件,那旧版本的文件就不能被Clean清理,增加存储压力。 提交Spark jar作业时,CPU与内存比例为1:4~1:8。 Compaction作业是将存量的parquet文件内的数据与新增的log中的数据进行合并,需要消耗较高的内存资源,按照之前的表设计规范以及实际流量的波动结合考虑,建议Compaction作业CPU与内存的比例按照1:4~1:8配置,保证Compaction作业稳定运行。当Compaction出现OOM问题,可以通过调大内存占比解决。
  • Spark表数据维护规范 禁止通过Alter命令修改表关键属性信息:type/primaryKey/preCombineField/hoodie.index.type 错误示例,执行如下语句修改表关键属性: alter table dsrTable set tblproperties('type'='xx');alter table dsrTable set tblproperties('primaryKey'='xx');alter table dsrTable set tblproperties('preCombineField'='xx');alter table dsrTable set tblproperties('hoodie.index.type'='xx'); 除Spark以外,其他引擎也可以修改Hudi表元数据,但是这种修改会导致整个Hudi表出现数据重复,甚至数据损坏;因此禁止修改上述属性。 父主题: Spark on Hudi开发规范
  • 示例 示例1: delete from h0 where column1 = 'country'; 示例2: delete from h0 where column1 IN ('country1', 'country2'); 示例3: delete from h0 where column1 IN (select column11 from sourceTable2); 示例4: delete from h0 where column1 IN (select column11 from sourceTable2 where column1 = 'xxx'); 示例5: delete from h0;
  • 判断使用分区表还是非分区表 根据表的使用场景一般将表分为事实表和维度表: 事实表通常整表数据规模较大,以新增数据为主,更新数据占比小,且更新数据大多落在近一段时间范围内(年或月或天),下游读取该表进行ETL计算时通常会使用时间范围进行裁剪(例如最近一天、一月、一年),这种表通常可以通过数据的创建时间来做分区以保证最佳读写性能。 维度表数据量一般整表数据规模较小,以更新数据为主,新增较少,表数据量比较稳定,且读取时通常需要全量读取做join之类的ETL计算,因此通常使用非分区表性能更好。 分区表的分区键不允许更新,否则会产生重复数据。 例外场景:超大维度表和超小事实表 特殊情况如存在持续大量新增数据的维度表(表数据量在200G以上或日增长量超过60M)或数据量非常小的事实表(表数据量小于10G且未来三至五年增长后也不会超过10G)需要针对具体场景来进行例外处理: 持续大量新增数据的维度表 方法一:预留桶数,如使用非分区表则需通过预估较长一段时间内的数据增量来预先增加桶数,缺点是随着数据的增长,文件依然会持续膨胀; 方法二:大粒度分区(推荐),如果使用分区表则需要根据数据增长情况来计算,例如使用年分区,这种方式相对麻烦些但是多年后表无需重新导入。 方法三:数据老化,按照业务逻辑分析大的维度表是否可以通过数据老化清理无效的维度数据从而降低数据规模。 数据量非常小的事实表 这种可以在预估很长一段时间的数据增长量的前提下使用非分区表预留稍宽裕一些的桶数来提升读写性能。
  • 确认表内桶数 Hudi表的桶数设置,关系到表的性能,需要格外引起注意。 以下几点,是设置桶数的关键信息,需要建表前确认。 非分区表 单表数据总条数 = select count(1) from tablename(入湖时需提供); 单条数据大小 = 平均 1KB(华为建议通过select * from tablename limit 100,得出100条数据的大小,再除以100得到单条平均大小) 单表数据量大小(G) = 单表数据总条数*单条数据大小/1024/1024 非分区表桶数 = 单表数据量大小(G)/2G*2,再向上取整,如果小于4就设置桶数为4 分区表 最近一个月最大数据量分区数据总条数 = 入湖前咨询产品线 单条数据大小 = 平均 1KB(华为建议通过select * from tablename limit 100,得出100条数据的大小,再除以100得到单条平均大小) 单分区数据量大小(G) = 最近一个月最大数据量分区数据总条数*单条数据大小/1024/1024 分区表桶数 = 单分区数据量大小(G)/2G,再后向上取整,最小设置1个桶 需要使用的是表的总数据大小,而不是压缩以后的文件大小 桶的设置以偶数最佳,非分区表最小桶数请设置4个,分区表最小桶数请设置1个。
  • 参数描述 表1 CREATE TABLE As SELECT参数描述 参数 描述 database_name Database名称,由字母、数字和下划线(_)组成。 table_name Database中的表名,由字母、数字和下划线(_)组成。 using 参数hudi,定义和创建Hudi table。 table_comment 表的描述信息。 location_path OBS路径,指定该路径Hudi表会创建为外表。 options_list Hudi table属性列表。 query_statement select查询表达式
  • 示例 创建分区表 create table h2 using hudioptions (type = 'cow', primaryKey = 'id', preCombineField = 'dt')partitioned by (dt)asselect 1 as id, 'a1' as name, 10 as price, 1000 as dt; 创建非分区表 create table h3 using hudioptions (type = 'cow', primaryKey = 'id', preCombineField = 'dt')asselect 1 as id, 'a1' as name, 10 as price, 1000 as dt;从parquet表加载数据到hudi表# 创建parquet表create table parquet_mngd using parquet options(path=’obs://bucket/path/parquet_dataset/*.parquet’);# CTAS创建hudi表create table hudi_tbl using hudi location 'obs://bucket/path/hudi_tbl/' options (type = 'cow',primaryKey = 'id',preCombineField = 'ts')partitioned by (datestr) as select * from parquet_mngd;
  • 建表示例 create table data_partition(id int, comb int, col0 int,yy int, mm int, dd int) using hudi --指定hudi 数据源partitioned by(yy,mm,dd) --指定分区, 支持多级分区location 'obs://bucket/path/data_partition' --指定路径,使用DLI提供的元数据服务时只支持创建OBS表options(type='mor', --表类型 mor 或者 cowprimaryKey='id', --主键,可以是复合主键但是必须全局唯一preCombineField='comb' --预合并字段,相同主键的数据会按该字段合并,当前不能指定多个字段 )
  • 规则 Hudi表必须执行Archive。 对于Hudi的MOR类型和COW类型的表,都需要开启Archive。 Hudi表在写入数据时会自动判断是否需要执行Archive,因为Archive的开关默认打开(hoodie.archive.automatic默认为true)。 Archive操作并不是每次写数据时都会触发,至少需要满足以下两个条件: Hudi表满足hoodie.keep.max.commits设置的阈值。如果是Flink写hudi至少提交的checkpoint要超过这个阈值;如果是Spark写hudi,写Hudi的次数要超过这个阈值。 Hudi表做过Clean,如果没有做过Clean就不会执行Archive。
  • 读优化查询 读优化查询(Read Optimized Queries)是针对MOR表进行的优化,只会读取最新的commit/compaction产生的快照(不包含delta log文件)。 表1 实时查询和读优化查询的trade-off Trade-off 实时查询 读优化查询 Data Latency(数据时延) 低 高 Query Latency(查询时延) 只对于MOR表,高(合并parquet + delta log) 低 (读取parquet文件性能)
  • MOR表查询 在Spark SQL作业中使用元数据服务,或者配置了HMS同步参数,在创建MOR表后,会额外同步创建:“表名_rt”和“表名_ro”两张表。查询后缀为rt的表等同于实时查询,查询后缀为ro的表代表读优化查询。例如:通过Spark SQL创建hudi表名为${table_name}, 同步元数据服务后,数据库中多出两张表分别为${table_name}_rt和${table_name}_ro。 实时视图读取(SparkSQL为例):直接读取相同数据库中后缀为_rt的hudi表即可。 select count(*) from ${table_name}_rt; 实时视图读取(Spark jar作业为例):与COW表操作一致,请参考COW表相关操作。 增量视图读取(Spark SQL作业为例):与COW表操作一致,请参考COW表相关操作。 增量视图读取(Spark jar作业为例):与COW表操作一致,请参考COW表相关操作。 读优化视图读取(Spark jar作业为例): 配置项 hoodie.datasource.query.type 需要配置为 read_optimized。 object HudiDemoScala { def main(args: Array[String]): Unit = { val spark = SparkSession.builder .enableHiveSupport .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") .appName("HudiIncrementalReadDemo") .getOrCreate spark.read.format("hudi") .option("hoodie.datasource.query.type", "read_optimized") // 指定查询类型为读优化视图 .load("obs://bucket/to_your_table") // 指定读取的hudi表路径 .createTempView("hudi_read_optimized_temp_view") spark.sql("select * from hudi_read_optimized_temp_view").show(100) }}
  • 规则 建表必须指定primaryKey和preCombineField。 Hudi表提供了数据更新的能力和幂等写入的能力,该能力要求数据记录必须设置主键用来识别重复数据和更新操作。不指定主键会导致表丢失数据更新能力,不指定preCombineField会导致主键重复。 参数名称 参数描述 输入值 说明 primaryKey hudi主键 按需 必须指定,可以是复合主键但是必须全局唯一。 preCombineField 预合并键,相同主键的多条数据按该字段进行合并 按需 必须指定,相同主键的数据会按该字段合并,不能指定多个字段。 禁止建表时将hoodie.datasource.hive_sync.enable指定为false。 指定为false将导致新写入的分区无法同步到Hive Metastore中。由于缺失新写入的分区信息,查询引擎读取该时会丢数。 禁止指定Hudi的索引类型为INMEMORY类型。 该索引仅是为了测试使用。生产环境上使用该索引将导致数据重复。
  • 使用HetuEngine on Hudi HetuEngine是高性能的交互式SQL分析及数据虚拟化引擎,它与大数据生态无缝融合,实现海量数据秒级交互式查询,并支持跨源跨域统一访问,使能 数据湖 内、湖间、湖仓一站式SQL融合分析。 HetuEngine对Hudi仅支持select操作,即支持SELECT语法来查询Hudi表中的数据。 HetuEngine暂不支持查询Hudi的增量视图。 详细语法说明请参考《HetuEngine SQL语法参考》中“SELECT”语法说明。 父主题: DLI中使用Hudi开发作业
  • 示例 insert into h0 select 1, 'a1', 20;-- insert static partitioninsert into h_p0 partition(dt = '2021-01-02') select 1, 'a1';-- insert dynamic partitioninsert into h_p0 select 1, 'a1', dt;-- insert dynamic partitioninsert into h_p1 select 1 as id, 'a1', '2021-01-03' as dt, '19' as hh;-- insert overwrite tableinsert overwrite table h0 select 1, 'a1', 20;-- insert overwrite table with static partitioninsert overwrite h_p0 partition(dt = '2021-01-02') select 1, 'a1';-- insert overwrite table with dynamic partitioninsert overwrite table h_p1 select 2 as id, 'a2', '2021-01-03' as dt, '19' as hh;
  • 命令格式 按文件容量进行清理,需要配置参数: hoodie.archive.file.cleaner.policy = KEEP_ARCHIVED_FILES_BY_SIZE;hoodie.archive.file.cleaner.size.retained = 5368709120; 提交SQL run cleanarchive on tableIdentifier/tablelocation; 按保留时间进行清理,需要配置参数: hoodie.archive.file.cleaner.policy = KEEP_ARCHIVED_FILES_BY_DAYS;hoodie.archive.file.cleaner.days.retained = 30; 提交SQL run cleanarchive on tableIdentifier/tablelocation;
  • 注意事项 写入模式:Hudi对于设置了主键的表支持三种写入模式,用户可以设置参数hoodie.sql.insert.mode来指定Insert模式,默认为upsert。 hoodie.sql.insert.mode = upsert strict模式,Insert 语句将保留 COW 表的主键唯一性约束,不允许重复记录。如果在插入过程中已经存在记录,则会为COW表执行 HoodieDuplicateKeyException;对于MOR表,该模式与upsert模式行为一致。 non-strict模式,对主键表采用insert处理。 upsert模式,对于主键表的重复值进行更新操作。 在提交Spark SQL作业时,用户可以在设置中配置以下参数,切换bulk insert作为Insert语句的写入方式。 hoodie.sql.bulk.insert.enable = truehoodie.sql.insert.mode = non-strict 也可以设置hoodie.datasource.write.operation的来控制insert语句的写入方式,可选包括bulk_insert、insert、upsert。(注意:会覆盖配置的hoodie.sql.insert.mode的结果) hoodie.datasource.write.operation = upsert
  • 参数描述 表1 参数描述 参数 描述 tableIdentifier Hudi表的名称。 tablelocation Hudi表的存储路径。 hoodie.archive.file.cleaner.policy 清理归档文件的策略:目前仅支持KEEP_ARCHIVED_FILES_BY_SIZE和KEEP_ARCHIVED_FILES_BY_DAYS两种策略,默认策略为KEEP_ARCHIVED_FILES_BY_DAYS。 KEEP_ARCHIVED_FILES_BY_SIZE策略可以设置归档文件占用的存储空间大小 KEEP_ARCHIVED_FILES_BY_DAYS策略可以清理超过某个时间点之外的归档文件 hoodie.archive.file.cleaner.size.retained 当清理策略为KEEP_ARCHIVED_FILES_BY_SIZE时,该参数可以设置保留多少字节大小的归档文件,默认值5368709120字节(5G)。 hoodie.archive.file.cleaner.days.retained 当清理策略为KEEP_ARCHIVED_FILES_BY_DAYS时,该参数可以设置保留多少天以内的归档文件,默认值30(天)。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全