数据湖探索 DLI-MRS Kafka输入流:示例

时间:2023-11-10 09:20:51

示例

  • 从Kafka名称为test的topic中读取数据。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    CREATE SOURCE STREAM kafka_source (
      name STRING, 
      age int
     )
      WITH (
        type = "kafka",
        kafka_bootstrap_servers = "ip1:port1,ip2:port2", 
        kafka_group_id = "sourcegroup1", 
        kafka_topic = "test",
        encode = "json"
    );
    
  • 从Kafka读取对象为test的topic,使用json_config将json数据和表字段对应。

    数据编码格式为json且不含嵌套,例如:

    {"attr1": "lilei", "attr2": 18}
    建表语句参考如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    CREATE SOURCE STREAM kafka_source (name STRING, age int)
    WITH (
      type = "kafka",
      kafka_bootstrap_servers = "ip1:port1,ip2:port2", 
      kafka_group_id = "sourcegroup1", 
      kafka_topic = "test",
      encode = "json",
      json_config = "name=attr1;age=attr2"
    );
    
support.huaweicloud.com/sqlref-flink-dli/dli_08_0238.html