数据仓库服务 GAUSSDB(DWS)-做结果表:使用flink SQL直连DN入库
时间:2025-04-30 16:21:09
使用flink SQL直连DN入库
该能力依赖flink sql DISTRIBUTEBY能力,mrs有提供此能力,具体请参见Flink SQL语法增强。
connector提供udf函数可根据分布列值计算出下游并发结合flink sql DISTRIBUTEBY能力实现将数据按DN分区能力,示例:
- 需要在SQL中引入UDF。
CREATE temporary FUNCTION dn_hash AS 'com.huaweicloud.dws.connectors.flink.partition.DnHashFunction';
- 正常写Source SQL。
CREATE TABLE users ( id BIGINT, name STRING, age INT, text STRING, created_at TIMESTAMP(3), updated_at TIMESTAMP(3) ) WITH ( 'connector' = 'datagen', 'fields.id.kind' = 'sequence', 'fields.id.start' = '1', 'fields.id.end' = '1000', 'fields.name.length' = '10', 'fields.age.min' = '18', 'fields.age.max' = '60', 'fields.text.length' = '5' )
- Sink表定义SQL中需要新增一个字段并且要求int类型值用于接收UDF计算的结果,示例中叫dn_hash。
create table dws_users ( dn_hash int, id BIGINT, name STRING, age INT, text STRING, created_at TIMESTAMP(3), updated_at TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'dws', 'url' = '%s', 'tableName' = 'test.users', 'username' = '%s', 'autoFlushBatchSize' = '50000', 'password' = '%s' )
- Insert into sql使用 udf获取数据下游算子信息,同时使用DISTRIBUTEBY对返回结果做数据分区,数据就会按照udf返回信息到下游指定并行度。
insert into dws_users select /*+ DISTRIBUTEBY('dn_hash') */ dn_hash('test.users',10,1024, id) as dn_hash, * from users
support.huaweicloud.com/tg-dws/dws_07_0184.html