数据仓库服务 GAUSSDB(DWS)-做结果表:使用flink SQL直连DN入库

时间:2025-04-30 16:21:09

使用flink SQL直连DN入库

该能力依赖flink sql DISTRIBUTEBY能力,mrs有提供此能力,具体请参见Flink SQL语法增强

connector提供udf函数可根据分布列值计算出下游并发结合flink sql DISTRIBUTEBY能力实现将数据按DN分区能力,示例:

  1. 需要在SQL中引入UDF。
    CREATE temporary  FUNCTION dn_hash AS 'com.huaweicloud.dws.connectors.flink.partition.DnHashFunction';
  2. 正常写Source SQL。
    CREATE TABLE users
    (
        id         BIGINT,
        name       STRING,
        age        INT,
        text       STRING,
        created_at TIMESTAMP(3),
        updated_at TIMESTAMP(3)
    ) WITH (
          'connector' = 'datagen',
          'fields.id.kind' = 'sequence',
          'fields.id.start' = '1',
          'fields.id.end' = '1000',
          'fields.name.length' = '10',
          'fields.age.min' = '18',
          'fields.age.max' = '60',
          'fields.text.length' = '5'
          )
  3. Sink表定义SQL中需要新增一个字段并且要求int类型值用于接收UDF计算的结果,示例中叫dn_hash。
    create table dws_users
    (
        dn_hash int,
        id         BIGINT,
        name       STRING,
        age        INT,
        text       STRING,
        created_at TIMESTAMP(3),
        updated_at TIMESTAMP(3),
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
          'connector' = 'dws',
          'url' = '%s',
          'tableName' = 'test.users',
          'username' = '%s',
          'autoFlushBatchSize' = '50000',
          'password' = '%s'
          )
  4. Insert into sql使用 udf获取数据下游算子信息,同时使用DISTRIBUTEBY对返回结果做数据分区,数据就会按照udf返回信息到下游指定并行度。
    insert into dws_users select 
    /*+ DISTRIBUTEBY('dn_hash') */ 
    dn_hash('test.users',10,1024, id) as dn_hash, * from users
support.huaweicloud.com/tg-dws/dws_07_0184.html