数据湖探索 DLI-Doris结果表:示例

时间:2024-04-23 20:19:33

示例

该示例是从Datagen数据源中生成数据,并将结果写入到Doris结果表中。

  1. 参考增强型跨源连接,在DLI上根据Doris所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。参考“修改主机信息”章节描述,在增强型跨源中增加MRS的主机信息。
  2. 设置Doris的安全组,添加入向规则使其对Flink的队列网段放通。分别根据Doris的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。

    参考测试地址连通性

  3. 参考MRS Doris使用指南,创建doris表,创建语句如下:
    CREATE TABLE IF NOT EXISTS dorisdemo
    (
      `user_id` varchar(10) NOT NULL,
      `city` varchar(10),
      `age` int,
      `gender` int
    )
    DISTRIBUTED BY HASH(`user_id`) BUCKETS 10
    
  4. 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本将Datagen作为数据源,将数据写入到Doris作为结果表中。
    create table student_datagen_source(
      `user_id` String NOT NULL,
      `city` String,
      `age` int,
      `gender` int
    ) with (
      'connector' = 'datagen',
      'rows-per-second' = '1',
      'fields.user_id.kind' = 'random',
      'fields.user_id.length' = '7', 
      'fields.city.kind' = 'random',
      'fields.city.length' = '7'
    );
    
    
    CREATE TABLE dorisDemo (
      `user_id` String NOT NULL,
      `city` String,
      `age` int,
      `gender` int
    ) with (
      'connector' = 'doris',
      'fenodes' = 'FE_IP:PORT',
      'table.identifier' = 'demo.dorisdemo',
      'username' = 'dorisUser',
      'password' = 'dorisPassword',
      'sink.label-prefix' = 'demo',
      'sink.enable-2pc' = 'true',
      'sink.buffer-count' = '10'
    );
    
    insert into dorisDemo select * from student_datagen_source
  1. 查看doris结果表是否已成功写入数据。

    user_id

    city

    age

    gender

    50aff04

    93406c5

    12

    1

    681a230

    1f27d06

    16

    1

    006eff4

    3521ded

    18

    0

support.huaweicloud.com/sqlref-flink-dli/dli_08_15035.html