数据湖探索 DLI-JDBC:示例

时间:2024-04-23 20:19:32

示例

  • 示例1:使用JDBC作为数据源,Print作为结果表,从RDS MySQL数据库中读取数据,并写入到Print结果表中。
    1. 参考增强型跨源连接,根据RDS MySQL所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
    2. 设置RDS MySQL的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根RDS的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
    3. 登录RDS MySQL,并使用下述命令在flink库下创建orders表,并插入数据。创建数据库的操作可以参考创建RDS数据库

      在flink数据库库下创建orders表:

      CREATE TABLE `flink`.`orders` (
      	`order_id` VARCHAR(32) NOT NULL,
      	`order_channel` VARCHAR(32) NULL,
      	PRIMARY KEY (`order_id`)
      )	ENGINE = InnoDB
      	DEFAULT CHARACTER SET = utf8mb4
      	COLLATE = utf8mb4_general_ci;

      插入表数据:

      insert into orders(
        order_id,
        order_channel
      ) values
        ('1', 'webShop'),  
        ('2', 'miniAppShop');
    4. 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。

      注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改

      认证用的username和password硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据

      CREATE TABLE jdbcSource (
        order_id string,
        order_channel string
      ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://MySQLAddress:MySQLPort/flink',--flink为RDS MySQL创建的数据库名
        'table-name' = 'orders',
        'username' = 'MySQLUsername',
        'password' = 'MySQLPassword',
        'scan.fetch-size' = '10',
        'scan.auto-commit' = 'true'
      );
      
      CREATE TABLE printSink (
        order_id string,
        order_channel string
      ) WITH (
        'connector' = 'print'
      );
      
      insert into printSink select * from jdbcSource;
    5. 查看taskmanager.out文件中的数据结果,数据结果参考如下:
      +I(1,webShop)
      +I(2,miniAppShop)
  • 示例2:使用DataGen源表发送数据,通过JDBC结果表将数据输出到MySQL数据库中。
    1. 参考增强型跨源连接,根据RDS MySQL所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
    2. 设置RDS MySQL的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根RDS的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
    3. 登录RDS MySQL,并使用下述命令在flink库下创建orders表,并插入数据。创建数据库的操作可以参考创建RDS数据库
      在flink数据库库下创建orders表:
      CREATE TABLE `flink`.`orders` (
      	`order_id` VARCHAR(32) NOT NULL,
      	`order_channel` VARCHAR(32) NULL,
      	PRIMARY KEY (`order_id`)
      )	ENGINE = InnoDB
      	DEFAULT CHARACTER SET = utf8mb4
      	COLLATE = utf8mb4_general_ci;
    4. 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。

      注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改

      CREATE TABLE dataGenSource (
        order_id string,
        order_channel string
      ) WITH (
        'connector' = 'datagen',
        'fields.order_id.kind' = 'sequence',
        'fields.order_id.start' = '1',
        'fields.order_id.end' = '1000',
        'fields.order_channel.kind' = 'random',
        'fields.order_channel.length' = '5'
      );
      
      CREATE TABLE jdbcSink (
        order_id string,
        order_channel string,
        PRIMARY KEY(order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://MySQLAddress:MySQLPort/flink',--其中url中的flink表示MySQL中orders表所在的数据库名
        'table-name' = 'orders',
        'username' = 'MySQLUsername',
        'password' = 'MySQLPassword',
        'sink.buffer-flush.max-rows' = '1'
      );
      
      insert into jdbcSink select * from dataGenSource;
    5. 查看表中数据,在MySQL中执行sql查询语句
      select * from orders;
  • 示例3:从DataGen源表中读取数据,将JDBC表作为维表,并将二者生成的表信息写入Print结果表中。
    1. 参考增强型跨源连接,根据RDS MySQL所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。
    2. 设置RDS MySQL的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根RDS的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。
    3. 登录RDS MySQL,并使用下述命令在flink库下创建orders表,并插入数据。创建数据库的操作可以参考创建RDS数据库

      在flink数据库库下创建orders表:

      CREATE TABLE `flink`.`orders` (
      	`order_id` VARCHAR(32) NOT NULL,
      	`order_channel` VARCHAR(32) NULL,
      	PRIMARY KEY (`order_id`)
      )	ENGINE = InnoDB
      	DEFAULT CHARACTER SET = utf8mb4
      	COLLATE = utf8mb4_general_ci;

      插入表数据:

      insert into orders(
        order_id,
        order_channel
      ) values
        ('1', 'webShop'),  
        ('2', 'miniAppShop');
    4. 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。该作业脚本将DataGen为数据源,JDBC作为维表,数据写入到Print结果表。

      注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改

      CREATE TABLE dataGenSource (
        order_id string,
        order_time timestamp,
        proctime as Proctime()
      ) WITH (
        'connector' = 'datagen',
        'fields.order_id.kind' = 'sequence',
        'fields.order_id.start' = '1',
        'fields.order_id.end' = '2'
      );
      
      --创建维表
      CREATE TABLE jdbcTable (
        order_id string,
        order_channel string
      ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://JDBCAddress:JDBCPort/flink',--flink为RDS MySQL中orders表所在的数据库名
        'table-name' = 'orders',
        'username' = 'JDBCUserName',
        'password' = 'JDBCPassWord',
        'lookup.cache.max-rows' = '100',
        'lookup.cache.ttl' = '1000',
        'lookup.cache.caching-missing-key' = 'false',
        'lookup.max-retries' = '5'
      );
      
      CREATE TABLE printSink (
        order_id string,
        order_time timestamp,
        order_channel string
      ) WITH (
        'connector' = 'print'
      );
      
      insert into
        printSink
      SELECT
        dataGenSource.order_id, dataGenSource.order_time, jdbcTable.order_channel
      from
        dataGenSource
        left join jdbcTable for system_time as of dataGenSource.proctime on dataGenSource.order_id = jdbcTable.order_id;
    5. 查看taskmanager.out文件中的数据结果,数据结果参考如下:
      +I(1, xxx, webShop)
      +I(2, xxx, miniAppShop)
support.huaweicloud.com/sqlref-flink-dli/dli_08_15057.html