数据湖探索 DLI-Upsert Kafka:示例

时间:2024-04-23 20:19:32

示例

  • 示例1:该示例是从DMS Kafka数据源中读取数据,并写入到Print结果表中。
    1. 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
    2. 设置Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
    3. 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,提交运行作业。
      注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。
      CREATE TABLE upsertKafkaSource (
        order_id string,
        order_channel string,
        order_time string,
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,  
        area_id string,
        PRIMARY KEY (order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'upsert-kafka',
        'topic' = 'KafkaTopic',
        'properties.bootstrap.servers' =  'KafkaAddress1:KafkaPort,KafkAddress2:KafkaPort',
        'key.format' = 'csv',
        'value.format' = 'json'
      );
      
      CREATE TABLE printSink (
        order_id string,
        order_channel string,
        order_time string,
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,  
        area_id string,
        PRIMARY KEY (order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO printSink SELECT * FROM upsertKafkaSource;
    4. 向Kafka中的指定topic中插入如下数据(注意:kafka插入数据时请指定key)。
      {"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
      
      {"order_id":"202303251505050001", "order_channel":"appshop", "order_time":"2023-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2023-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
      
      {"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330111"}
    5. 查看taskmanager的out文件,数据结果参考如下:
      +I(202303251202020001,miniAppShop,2023-03-2512:02:02,60.0,60.0,2023-03-2512:03:00,0002,Bob,330110)
      +I(202303251505050001,appshop,2023-03-25 15:05:05,500.0,400.0,2023-03-2515:10:00,0003,Cindy,330108)
      -U(202303251202020001,miniAppShop,2023-03-2512:02:02,60.0,60.0,2023-03-2512:03:00,0002,Bob,330110)
      +U(202303251202020001,miniAppShop,2023-03-2512:02:02,60.0,60.0,2023-03-2512:03:00,0002,Bob,330111)
support.huaweicloud.com/sqlref-flink-dli/dli_08_15065.html