数据湖探索 DLI-Kafka:示例3:将DMS Kafka作为源表,Print作为结果表(适用于Kafka集群已开启SASL_SSL场景)

时间:2024-04-23 20:19:33

示例3:将DMS Kafka作为源表,Print作为结果表(适用于Kafka集群已开启SASL_SSL场景)

创建DMS的kafka集群,开启SASL_SSL,并下载SSL证书,将下载的证书client.jks上传到OBS桶中。

其中,properties.sasl.jaas.config字段包含账号密码,使用DEW进行加密。

CREATE TABLE ordersSource (
  order_id string,
  order_channel string,
  order_time timestamp(3),
  pay_amount double,
  real_pay double,
  pay_time string,
  user_id string,
  user_name string,
  area_id string
) WITH (
  'connector' = 'kafka',
  'topic' = 'KafkaTopic',
  'properties.bootstrap.servers' = 'KafkaAddress1:9093,KafkaAddress2:9093',
  'properties.group.id' = 'GroupId',
  'scan.startup.mode' = 'latest-offset',
  'properties.connector.auth.open' = 'true',
  'properties.ssl.truststore.location' = 'obs://xx/client.jks',  -- 用户上传证书的位置
  'properties.sasl.mechanism' = 'PLAIN', 
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.jaas.config' = 'xx',  -- dew凭据管理中的key,其值如:org.apache.kafka.common.security.plain.PlainLoginModule required username=xx password=xx;
  'format' = 'json',
  'dew.endpoint' = 'kms.xx.com', --使用的DEW服务所在的endpoint信息
  'dew.csms.secretName' = 'xx', --DEW服务通用凭据的凭据名称
  'dew.csms.decrypt.fields' = 'properties.sasl.jaas.config', --其中properties.sasl.jaas.config字段值,需要利用DEW凭证管理,进行解密替换
  'dew.csms.version' = 'v1'
);

CREATE TABLE ordersSink (
  order_id string,
  order_channel string,
  order_time timestamp(3),
  pay_amount double,
  real_pay double,
  pay_time string,
  user_id string,
  user_name string,
  area_id string
) WITH (
  'connector' = 'print'
);
 insert into ordersSink select * from ordersSource;
support.huaweicloud.com/sqlref-flink-dli/dli_08_15058.html