华为云用户手册

  • 功能描述 本节介绍利用Flink来读写Hive的表。Hive源表的定义,以及创建源表时使用的参数和示例代码。详情可参考:Apache Flink Hive Read & Write Flink支持在BATCH 和 STREAMING模式下从Hive读取数据。当作为BATCH应用程序运行时,Flink将在执行查询的时间点对表的状态执行查询。STREAMING读取将持续监控表,并在新数据可用时以增量方式获取新数据。默认情况下,Flink会读取有界的表。 STREAMING读取支持同时使用分区表和非分区表。对于分区表,Flink将监控新分区的生成,并在可用时增量读取它们。对于未分区的表,Flink 会监控文件夹中新文件的生成情况,并增量读取新文件。
  • 简介 Apache Hive 已经成为了 数据仓库 生态系统中的核心。 它不仅仅是一个用于大数据分析和ETL场景的SQL引擎,同样它也是一个数据管理平台,可用于发现,定义,和演化数据。 Flink与Hive的集成包含两个层面,一是利用了Hive的MetaStore作为持久化的Catalog,二是利用Flink来读写Hive的表。Overview | Apache Flink 从Flink 1.11.0开始,在使用 Hive方言时,Flink允许用户用Hive语法来编写SQL语句。通过提供与Hive语法的兼容性,改善与Hive的互操作性,并减少用户需要在Flink和Hive之间切换来执行不同语句的情况。详情可参考:Apache Flink Hive 方言 使用HiveCatalog,Apache Flink可以用于统一处理Apache Hive表的BATCH和STREAM。Flink可以作为Hive批处理引擎的更高效的替代方案,或者用于连续读写Hive表,以支持实时数据仓库应用程序。Apache Flink Hive Read & Write
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 数据类型的使用,请参考Format章节。 Hive 方言支持的 DDL 语句,Flink 1.15 当前仅支持使用Hive语法创建OBS表和使用hive语法的 DLI Lakehouse表。 使用Hive语法创建OBS表 defalut方言: with 属性中需要设置hive.is-external为true。 使用hive 方言:建表语句需要使用EXTERNAL关键字。 使用hive语法的DLI Lakehouse表 使用hive 方言:表属性中需要添加'is_lakehouse'='true'。 开启checkpoint功能。 建议切换到Hive方言来创建Hive兼容表。如果你想用默认的方言创建Hive兼容表,确保在你的表属性中设置'connector'='hive',否则在HiveCatalog中一个表默认被认为是通用的。注意,如果你使用Hive方言,就不需要connector属性了。 监视策略是扫描当前位置路径中的所有目录/文件。许多分区可能会导致性能下降。 对未分区表进行流式读取时,要求将每个文件以原子方式写入目标目录。 分区表的流式读取要求在 hive 元存储的视图中以原子方式添加每个分区。否则,将使用添加到现有分区的新数据。 流式读取不支持 Flink DDL 中的水印语法。这些表不能用于窗口运算符。
  • 功能描述 Debezium是一个 CDC(Changelog Data Capture,变更数据捕获)的工具,可以把来自 MySQL、PostgreSQL、Oracle、Microsoft SQL Server 和许多其他数据库的更改实时流式传输到 Kafka 中。 Debezium 为变更日志提供了统一的格式结构,并支持使用 JSON 和 Apache Avro 序列化消息。 Flink 支持将 Debezium JSON 和 Avro 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如: 将增量数据从数据库同步到其他系统 日志审计 数据库的实时物化视图 关联维度数据库的变更历史 Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Debezium 格式的 JSON 或 Avro 消息,输出到 Kafka 等存储中。 但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Debezium 消息。 更多具体使用可参考开源社区文档:Debezium Format。
  • 功能描述 Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。Upsert Kafka 连接器支持以upsert方式从Kafka topic中读取数据并将数据写入Kafka topic。表类型支持源表和结果表。 作为source,upsert-kafka 连接器生产changelog流,其中每条数据记录代表一个更新或删除事件。 数据记录中的value被解释为同一key的最后一个value的UPDATE,如果有这个key(如果不存在相应的key,则该更新被视为INSERT)。用表来类比,changelog 流中的数据记录被解释为UPSERT,也称为INSERT/UPDATE,因为任何具有相同key的现有行都被覆盖。另外,value为空的消息将会被视作为DELETE消息。 作为sink,upsert-kafka连接器可以消费changelog流。它会将INSERT/UPDATE_AFTER数据作为正常的Kafka消息写入,并将DELETE数据以value为空的Kafka消息写入(表示对应 key 的消息被删除)。Flink将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。 表1 支持类别 类别 详情 支持表类型 源表、结果表
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 Upsert Kafka 始终以upsert方式工作,并且需要在DDL中定义主键。在具有相同主键值的消息按序存储在同一个分区的前提下,在 changlog source 定义主键意味着 在物化后的 changelog 上主键具有唯一性。定义的主键将决定哪些字段出现在Kafka消息的key中。 由于该连接器以 upsert 的模式工作,该连接器作为 source 读入时,可以确保具有相同主键值下仅最后一条消息会生效。 数据类型的使用,请参考Format章节。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 create table kafkaTable( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'upsert-kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'key.format' = '', 'value.format' = '' );
  • 示例 示例1:该示例是从DMS Kafka数据源中读取数据,并写入到Print结果表中。 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。 设置Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,提交运行作业。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE upsertKafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkAddress2:KafkaPort', 'key.format' = 'csv', 'value.format' = 'json' ); CREATE TABLE printSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'print' ); INSERT INTO printSink SELECT * FROM upsertKafkaSource; 向Kafka中的指定topic中插入如下数据(注意:kafka插入数据时请指定key)。 {"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202303251505050001", "order_channel":"appshop", "order_time":"2023-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2023-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330111"} 查看taskmanager的out文件,数据结果参考如下: +I(202303251202020001,miniAppShop,2023-03-2512:02:02,60.0,60.0,2023-03-2512:03:00,0002,Bob,330110) +I(202303251505050001,appshop,2023-03-25 15:05:05,500.0,400.0,2023-03-2515:10:00,0003,Cindy,330108) -U(202303251202020001,miniAppShop,2023-03-2512:02:02,60.0,60.0,2023-03-2512:03:00,0002,Bob,330110) +U(202303251202020001,miniAppShop,2023-03-2512:02:02,60.0,60.0,2023-03-2512:03:00,0002,Bob,330111)
  • 参数说明 表2 参数说明 参数 是否必选 默认参数 数据类型 说明 connector 是 无 String connector类型,对于upsert kafka连接器,需配置为'upsert-kafka'。 topic 是 无 String Kafka topic名。 properties.bootstrap.servers 是 无 String Kafka brokers地址,以逗号分隔。 key.format 是 无 String 用于对Kafka消息中key部分序列化和反序列化的格式。key字段由PRIMARY KEY语法指定。支持的格式如下: csv json avro 请参考Format页面以获取更多详细信息和格式参数。 key.fields-prefix 否 无 String 为键格式的所有字段定义自定义前缀,以避免与值格式的字段发生名称冲突。 默认情况下,前缀为空。如果定义了自定义前缀,则表架构和'key.fields'都将使用前缀名称。在构造密钥格式的数据类型时,将删除前缀,并在密钥格式中使用无前缀的名称。请注意,此选项要求'value.fields-include' 必须设置为'EXCEPT_KEY'。 value.format 是 无 String 用于对 Kafka消息中 value 部分序列化和反序列化的格式。支持的格式: csv json avro 请参考Format页面以获取更多详细信息和格式参数。 value.fields-include 是 ALL String 控制哪些字段应该出现在值中。取值范围如下: ALL:消息的value部分将包含schema的所有字段,包括定义中键的字段。 EXCEPT_KEY:记录的value部分包含schema的所有内容,定义为主键的字段除外。 properties.* 否 无 String 该选项可以传递任意的Kafka参数。 “properties.”后的后缀名必须匹配定义在 kafka参数文档中的参数名。 Flink会自动移除选项名中的 "properties." 前缀,并将转换后的键名以及值传入KafkaClient。 例如:你可以通过 'properties.allow.auto.create.topics' = 'false' 来禁止自动创建 topic。 但是'key.deserializer' 和 'value.deserializer' 是不允许通过该方式传递参数,因为Flink会重写这些参数的值。 sink.parallelism 否 无 Integer 定义upsert-kafka sink 算子的并行度。默认情况下,由框架确定并行度,与上游链接算子的并行度保持一致。 sink.buffer-flush.max-rows 否 0 Integer 缓存刷新前,最多能缓存的记录条数。 当sink收到很多同key上的更新时,缓存将保留同 key 的最后一条记录,因此sink缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。可以通过设置为'0'来禁用它。 默认情况下,该选项是未开启的。如果要开启 sink 缓存,需要同时设置'sink.buffer-flush.max-rows'和'sink.buffer-flush.interval'两个选项为大于零的值。 sink.buffer-flush.interval 否 0 Duration 缓存刷新的间隔时间,超过该时间后异步线程将刷新缓存数据。单位可以为毫秒(ms)、秒(s)、分钟(min)或小时(h)。例如'sink.buffer-flush.interval'='10 ms'。 默认情况下,该选项是未开启的。如果要开启 sink 缓存,需要同时设置'sink.buffer-flush.max-rows'和'sink.buffer-flush.interval'两个选项为大于零的值。
  • 示例 RDS表用于与输入流连接。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 CREATE TABLE car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, proctime as PROCTIME() ) WITH ( 'connector.type' = 'dis', 'connector.region' = 'cn-north-1', 'connector.channel' = 'disInput', 'format.type' = 'csv' ); CREATE TABLE db_info ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( 'connector.type' = 'gaussdb', 'connector.driver' = 'org.postgresql.Driver', 'connector.url' = 'jdbc:gaussdb://xx.xx.xx.xx:8000/xx', 'connector.table' = 'car_info', 'connector.username' = 'xx', 'connector.password' = 'xx', 'connector.lookup.cache.max-rows' = '10000', 'connector.lookup.cache.ttl' = '24h' ); CREATE TABLE audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( 'connector.type' = 'dis', 'connector.region' = 'cn-north-1', 'connector.channel' = 'disOutput', 'connector.partition-key' = 'car_id,car_owner', 'format.type' = 'csv' ); INSERT INTO audi_cheaper_than_30w SELECT a.car_id, b.car_owner, b.car_brand, b.car_price FROM car_infos as a join db_info FOR SYSTEM_TIME AS OF a.proctime AS b on a.car_id = b.car_id;
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector.type' = 'gaussdb', 'connector.url' = '', 'connector.table' = '', 'connector.username' = '', 'connector.password' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,需配置为'gaussdb' connector.url 是 jdbc连接地址,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。 connector.table 是 读取数据库中的数据所在的表名 connector.driver 否 jdbc连接驱动,默认为: org.postgresql.Driver。 connector.username 否 数据库认证用户名,需要和'connector.password'一起配置 connector.password 否 数据库认证密码,需要和'connector.username'一起配置 connector.read.partition.column 否 用于对输入进行分区的列名 与connector.read.partition.lower-bound、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.lower-bound 否 第一个分区的最小值 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.upper-bound 否 最后一个分区的最大值 与connector.read.partition.column、connector.read.partition.lower-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.num 否 分区的个数 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.upper-bound必须同时存在或者同时不存在 connector.read.fetch-size 否 每次从数据库拉取数据的行数。默认值为0,表示忽略该提示 connector.lookup.cache.max-rows 否 维表配置,缓存的最大行数,超过该值时,最先添加的数据将被标记为过期。-1表示不使用缓存。 connector.lookup.cache.ttl 否 维表配置,缓存超时时间,超过该时间的数据会被剔除。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。 connector.lookup.max-retries 否 维表配置,数据拉取最大重试次数,默认为3。
  • 示例 示例1:使用JDBC作为数据源,Print作为结果表,从RDS MySQL数据库中读取数据,并写入到Print结果表中。 参考增强型跨源连接,根据RDS MySQL所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。 设置RDS MySQL的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根RDS的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 登录RDS MySQL,并使用下述命令在flink库下创建orders表,并插入数据。创建数据库的操作可以参考创建RDS数据库。 在flink数据库库下创建orders表: CREATE TABLE `flink`.`orders` ( `order_id` VARCHAR(32) NOT NULL, `order_channel` VARCHAR(32) NULL, PRIMARY KEY (`order_id`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci; 插入表数据: insert into orders( order_id, order_channel ) values ('1', 'webShop'), ('2', 'miniAppShop'); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 认证用的username和password硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 CREATE TABLE jdbcSource ( order_id string, order_channel string ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://MySQLAddress:MySQLPort/flink',--flink为RDS MySQL创建的数据库名 'table-name' = 'orders', 'username' = 'MySQLUsername', 'password' = 'MySQLPassword', 'scan.fetch-size' = '10', 'scan.auto-commit' = 'true' ); CREATE TABLE printSink ( order_id string, order_channel string ) WITH ( 'connector' = 'print' ); insert into printSink select * from jdbcSource; 查看taskmanager.out文件中的数据结果,数据结果参考如下: +I(1,webShop) +I(2,miniAppShop) 示例2:使用DataGen源表发送数据,通过JDBC结果表将数据输出到MySQL数据库中。 参考增强型跨源连接,根据RDS MySQL所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。 设置RDS MySQL的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根RDS的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 登录RDS MySQL,并使用下述命令在flink库下创建orders表,并插入数据。创建数据库的操作可以参考创建RDS数据库。 在flink数据库库下创建orders表: CREATE TABLE `flink`.`orders` ( `order_id` VARCHAR(32) NOT NULL, `order_channel` VARCHAR(32) NULL, PRIMARY KEY (`order_id`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci; 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE dataGenSource ( order_id string, order_channel string ) WITH ( 'connector' = 'datagen', 'fields.order_id.kind' = 'sequence', 'fields.order_id.start' = '1', 'fields.order_id.end' = '1000', 'fields.order_channel.kind' = 'random', 'fields.order_channel.length' = '5' ); CREATE TABLE jdbcSink ( order_id string, order_channel string, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://MySQLAddress:MySQLPort/flink',--其中url中的flink表示MySQL中orders表所在的数据库名 'table-name' = 'orders', 'username' = 'MySQLUsername', 'password' = 'MySQLPassword', 'sink.buffer-flush.max-rows' = '1' ); insert into jdbcSink select * from dataGenSource; 查看表中数据,在MySQL中执行sql查询语句 select * from orders; 示例3:从DataGen源表中读取数据,将JDBC表作为维表,并将二者生成的表信息写入Print结果表中。 参考增强型跨源连接,根据RDS MySQL所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。 设置RDS MySQL的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根RDS的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 登录RDS MySQL,并使用下述命令在flink库下创建orders表,并插入数据。创建数据库的操作可以参考创建RDS数据库。 在flink数据库库下创建orders表: CREATE TABLE `flink`.`orders` ( `order_id` VARCHAR(32) NOT NULL, `order_channel` VARCHAR(32) NULL, PRIMARY KEY (`order_id`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci; 插入表数据: insert into orders( order_id, order_channel ) values ('1', 'webShop'), ('2', 'miniAppShop'); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。该作业脚本将DataGen为数据源,JDBC作为维表,数据写入到Print结果表。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE dataGenSource ( order_id string, order_time timestamp, proctime as Proctime() ) WITH ( 'connector' = 'datagen', 'fields.order_id.kind' = 'sequence', 'fields.order_id.start' = '1', 'fields.order_id.end' = '2' ); --创建维表 CREATE TABLE jdbcTable ( order_id string, order_channel string ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://JDBCAddress:JDBCPort/flink',--flink为RDS MySQL中orders表所在的数据库名 'table-name' = 'orders', 'username' = 'JDBCUserName', 'password' = 'JDBCPassWord', 'lookup.cache.max-rows' = '100', 'lookup.cache.ttl' = '1000', 'lookup.cache.caching-missing-key' = 'false', 'lookup.max-retries' = '5' ); CREATE TABLE printSink ( order_id string, order_time timestamp, order_channel string ) WITH ( 'connector' = 'print' ); insert into printSink SELECT dataGenSource.order_id, dataGenSource.order_time, jdbcTable.order_channel from dataGenSource left join jdbcTable for system_time as of dataGenSource.proctime on dataGenSource.order_id = jdbcTable.order_id; 查看taskmanager.out文件中的数据结果,数据结果参考如下: +I(1, xxx, webShop) +I(2, xxx, miniAppShop)
  • 分区扫描功能介绍 为了加速Source任务实例中的数据读取,Flink为JDBC表提供了分区扫描功能。以下参数定义了从多个任务并行读取时如何对表进行分区。 scan.partition.column:用于对输入进行分区的列名,该列的数据类型必须是数字,日期或时间戳。 scan.partition.num: 分区数。 scan.partition.lower-bound:第一个分区的最小值。 scan.partition.upper-bound:最后一个分区的最大值。 建表时以上扫描分区参数必须同时存在或者同时不存在。 scan.partition.lower-bound和scan.partition.upper-bound参数仅用于决定分区步长,而不是用于过滤表中的行,表中的所有行都会被分区并返回。
  • Lookup Cache功能介绍 JDBC连接器可以用在时态表关联中作为一个可lookup的维表,当前只支持同步的查找模式。 默认情况下,Lookup cache是未启用的,所有请求都会发送到外部数据库。你可以设置Lookup.cache.max-rows和Lookup.cache.ttl参数来启用。Lookup cache的主要目的是用于提高时态表关联JDBC连接器的性能。 当Lookup cache被启用时,每个进程(即TaskManager)将维护一个缓存。Flink将优先查找缓存,只有当缓存未查找到时才向外部数据库发送请求,并使用返回的数据更新缓存。当缓存命中最大缓存行Lookup.cache.max-rows或当行超过最大存活时间Lookup.cache.ttl时,缓存中最先添加的条目将被标记为过期。缓存中的记录可能不是最新的,用户可以将Lookup.cache.ttl设置为一个更小的值以获得更好的刷新数据,但这可能会增加发送到数据库的请求数。所以要做好吞吐量和正确性之间的平衡。 默认情况下,Flink会缓存主键的空查询结果,你可以通过将Lookup.cache.caching-missing-key设置为false来切换行为。
  • 数据类型映射 表3 数据类型映射 MySQL类型 PostgreSQL类型 Flink SQL类型 TINYINT - TINYINT SMALLINT TINYINT UNSIGNED SMALLINT INT2 SMALLSERIAL SERIAL2 SMALLINT INT MEDIUMINT SMALLINT UNSIGNED INTEGER SERIAL INT BIGINT INT UNSIGNED BIGINT BIGSERIAL BIGINT BIGINT UNSIGNED - DECIMAL(20, 0) BIGINT BIGINT BIGINT FLOAT REAL FLOAT4 FLOAT DOUBLE DOUBLE PRECISION FLOAT8 DOUBLE PRECISION DOUBLE NUMERIC(p, s) DECIMAL(p, s) NUMERIC(p, s) DECIMAL(p, s) DECIMAL(p, s) BOOLEAN TINYINT(1) BOOLEAN BOOLEAN DATE DATE DATE TIME [(p)] TIME [(p)] [WITHOUT TIMEZONE] TIME [(p)] [WITHOUT TIMEZONE] DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE] TIMESTAMP [(p)] [WITHOUT TIMEZONE] CHAR(n) VARCHAR(n) TEXT CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT STRING BINARY VARBINARY BLOB BYTEA BYTES - ARRAY ARRAY
  • 注意事项 JDBC结果表如果定义了主键,将以upsert模式与外部系统交换UPDATE/DELETE消息;否则,它将以append模式与外部系统交换消息,不支持消费UPDATE/DELETE消息。 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据
  • 语法格式 create table jbdcTable ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector' = 'jdbc', 'url' = '', 'table-name' = '', 'username' = '', 'password' = '' );
  • 参数说明 表2 参数说明 参数 是否必选 默认值 类型 说明 connector 是 无 String 指定要使用的连接器,当前固定为'jdbc'。 url 是 无 String 数据库的URL。 连接MySQL数据库时,格式为:jdbc:mysql://MySQLAddress:MySQLPort/dbName 。 连接PostgreSQL数据库时,格式为:jdbc:postgresql://PostgreSQLAddress:PostgreSQLPort/dbName。 table-name 是 无 String 读取数据库中的数据所在的表名。 driver 否 无 String 连接数据库所需要的驱动。如果未配置,则会自动通过URL提取。 MySQL数据库默认驱动为com.mysql.jdbc.Driver。 PostgreSQL数据库默认驱动为org.postgresql.Driver。 username 否 无 String 数据库认证用户名,需要和'password'一起配置。 password 否 无 String 数据库认证密码,需要和'username'一起配置。 connection.max-retry-timeout 否 60s Duration 尝试连接数据库服务器最大重试超时时间,不应小于1s。 scan.partition.column 否 无 String 用于对输入进行分区的列名。分区扫描参数,具体请参考分区扫描功能介绍。 scan.partition.num 否 无 Integer 分区的个数。分区扫描参数,具体请参考分区扫描功能介绍。 scan.partition.lower-bound 否 无 Integer 第一个分区的最小值。分区扫描参数,具体请参考分区扫描功能介绍。 scan.partition.upper-bound 否 无 Integer 最后一个分区的最大值。分区扫描参数,具体请参考分区扫描功能介绍。 scan.fetch-size 否 0 Integer 每次从数据库拉取数据的行数。如果指定为0,则会忽略sql hint。 scan.auto-commit 否 true Boolean 是否设置自动提交,以确定事务中的每个statement是否自动提交 lookup.cache.max-rows 否 无 Integer lookup cache的最大行数,如果超过该值,缓存中最先添加的条目将被标记为过期。 默认情况下,lookup cache是未开启的。具体请参考Lookup Cache功能介绍。 lookup.cache.ttl 否 无 Duration lookup cache中每一行记录的最大存活时间,如果超过该时间,缓存中最先添加的条目将被标记为过期。 默认情况下,lookup cache是未开启的。具体请参考Lookup Cache功能介绍。 lookup.cache.caching-missing-key 否 true Boolean 是否缓存空查询结果,默认为true。具体请参考Lookup Cache功能介绍。 lookup.max-retries 否 3 Integer 查询数据库失败的最大重试次数。 sink.buffer-flush.max-rows 否 100 Integer flush前缓存记录的最大值,可以设置为 '0' 来禁用它。 sink.buffer-flush.interval 否 1s Duration flush间隔时间,超过该时间后异步线程将flush数据。可以设置为 '0' 来禁用它。如果想完全异步地处理缓存的flush事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' ,并配置适当的flush时间间隔。 sink.max-retries 否 3 Integer 写入到数据库失败后的最大重试次数。 sink.parallelism 否 无 Integer 用于定义JDBC sink算子的并行度。默认情况下,并行度是由框架决定,即与上游并行度一致。
  • 参数说明 表1 参数 是否必选 默认值 类型 描述 format 是 (none) String 指定要使用的格式, 这里应该是 'raw'。 raw.charset 否 UTF-8 String 指定字符集来编码文本字符串。 raw.endianness 否 big-endian String 指定字节序来编码数字值的字节。有效值为'big-endian'和'little-endian'。 更多细节可查阅 字节序。
  • 数据类型映射 下表详细说明了这种格式支持的 SQL 类型,包括用于编码和解码的序列化类和反序列化类的详细信息。 表2 数据类型映射 Flink SQL 类型 值 CHAR / VARCHAR / STRING UTF-8(默认)编码的文本字符串。编码字符集可以通过 'raw.charset' 进行配置。 BINARY / VARBINARY / BYTES 字节序列本身。 BOOLEAN 表示布尔值的单个字节,0表示 false, 1 表示 true。 TINYINT 有符号数字值的单个字节。 SMALLINT 采用big-endian(默认)编码的两个字节。字节序可以通过 'raw.endianness' 配置。 INT 采用 big-endian (默认)编码的四个字节。字节序可以通过 'raw.endianness' 配置。 BIGINT 采用 big-endian (默认)编码的八个字节。字节序可以通过 'raw.endianness' 配置。 FLOAT 采用 IEEE 754 格式和 big-endian (默认)编码的四个字节。字节序可以通过 'raw.endianness' 配置。 DOUBLE 采用 IEEE 754 格式和 big-endian (默认)编码的八个字节。字节序可以通过 'raw.endianness' 配置。 RAW 通过 RAW 类型的底层 TypeSerializer 序列化的字节序列。
  • 功能描述 Raw format 允许读写原始(基于字节)值作为单个列。 Raw Format将 null 值编码成 byte[] 类型的 null。这样在 upsert-kafka 中使用时可能会有限制,因为 upsert-kafka 将 null 值视为 墓碑消息(在键上删除)。因此,如果该字段可能具有 null 值,我们建议避免使用 upsert-kafka 连接器和 raw format 作为 value.format。 Raw format 连接器是内置的。更多具体使用可参考开源社区文档:Raw Format。
  • Spark 2.4.5 版本说明 表1列举了Spark 2.4.5 版本主要的功能特性。 更多版本新特性请参考Release Notes - Spark 3.1.1。 表1 Spark 2.4.5版本优势 特性 说明 支持配置小文件合并 使用SQL过程中,生成的小文件过多时,会导致作业执行时间过长,且查询对应表时耗时增大,建议对小文件进行合并。 参考如何合并小文件完成合并小文件。 支持修改非分区表或分区表的列注释 修改非分区表或分区表的列注释。 支持统计SQL作业的CPU消耗 支持在控制台查看“CPU累计使用量”。 支持容器集群Spark日志跳转查看 需要在容器查看日志。 支持动态加载UDF(公测) 无需重启队列UDF即可生效。 Spark UI支持火焰图 Spark UI支持绘制火焰图。 优化SQL作业NOT IN语句查询性能 NOT IN语句查询性能提升。 优化Multi-INSERT语句查询性能 Multi-INSERT语句查询性能提升。
  • 影响 DLI Flink 1.7版本停止服务(EOS)后,不再提供该软件版本相关的任何技术服务支持。建议您在执行作业时选择新版本的Flink引擎,推荐使用DLI Flink 1.12版本。 正在使用Flink 1.7版本的作业也请您尽快切换至新版本的Flink引擎,否则作业执行过程中出现的错误,不再提供该版本的任何技术服务支持。 如您有任何问题,可随时通过工单或者服务热线(4000-955-988或950808)与我们联系。
  • 影响 DLI Spark 2.3.2版本停止服务(EOS)后,不再提供该软件版本的任何技术服务支持。建议您在执行作业时选择新版本的Spark引擎,推荐使用DLI Spark 2.4.5版本。 正在使用DLI Spark 2.3.2版本的作业也请您尽快切换至新版本的Spark引擎,否则作业执行过程中出现的错误,不再提供该版本的任何技术服务支持。 如您有任何问题,可随时通过工单或者服务热线(4000-955-988或950808)与我们联系。
  • 影响 EOM后,不再支持新新购队列资源,包括不支持新购“包年/包月”和“按需计费”计费模式队列和“队列CU时套餐包”。 为了满足部分业务的使用需求,在2024年6月30日 00:00:00(北京时间)前,您可以最大续订1年队列或者变更队列。 2024年6月30日 00:00:00(北京时间)之后将无法续订、变更队列。 EOL后,队列资源将无法继续使用,请务必在下线前更换使用“弹性资源池”或“default队列”。推荐您购买弹性资源池,并在弹性资源池中创建队列,体验更丰富的DLI产品能力。 如您有任何问题,可随时通过工单或者服务热线(4000-955-988或950808)与我们联系。
  • 公告说明 为了更好的实现资源共享,提高计算资源利用率,DLI将“包年/包月”和“按需计费”计费模式队列升级为“弹性资源池队列”。即使用DLI计算资源需先购买弹性资源池,并在弹性资源池中创建队列。 DLI“包年/包月”和“按需计费”计费模式队列和和“队列CU时套餐包”计划于2024年3月31日 00:00(北京时间)停止销售(EOM)。 DLI“包年/包月”和“按需计费”计费模式队列和和“队列CU时套餐包”计划于2025年6月30日 00:00(北京时间)下线(EOL)。
  • 影响 DLI Flink 1.10、Flink1.11版本停止服务(EOS)后,不再提供该软件版本的任何技术服务支持。建议您在执行作业时选择新版本的Flink引擎,推荐使用DLI Flink 1.12版本。 正在使用Flink 1.10、Flink1.11版本的作业也请您尽快切换至新版本的Flink引擎,否则作业执行过程中出现的错误,不再提供该版本的任何技术服务支持。 如您有任何问题,可随时通过工单或者服务热线(4000-955-988或950808)与我们联系。
  • Spark 3.1.1版本说明 下表列举了Spark 3.1.1 版本主要的功能特性。 更多版本新特性请参考Release Notes - Spark 3.1.1。 【SPARK-33050】:Apache ORC 升级到1.5.12。 【SPARK-33092】:增强子表达式消减。 【SPARK-33480】:支持char/varchar数据类型。 【SPARK-32302】: 部分谓词下推优化。 【SPARK-30648】:支持JSON datasource表谓词下推。 【SPARK-32346】:支持avro datasource表谓词下推 。 【SPARK-32461】:shuffle hash join优化。 【SPARK-32272】:添加SQL标准命令SET TIME ZONE。 【SPARK-21492】:修复排序合并加入中的内存泄漏。 【SPARK-27812】:K8S客户端版本提升到4.6.1。 DLI从Spark 3.x版本开始不支持内置地理空间查询函数。
  • Spark 3.3.1版本说明 表1列举了Spark 3.3.1 版本主要的功能特性。 更多版本新特性及性能优化请参考Release Notes - Spark 3.3.1。 表1 Spark 3.3.1版本优势 特性 说明 Native性能加速 Spark查询语句性能提升。 元数据访问性能提升 提升Spark在处理大数据时的元数据访问性能,提高数据处理流程效率。 提升OBS committer小文件写性能 提升 对象存储服务 (OBS)在处理小文件写入时的性能,提高数据传输效率。 动态Executor shuffle数据优化 提升资源扩缩容的稳定性,当shuffle文件不需要时清理Executor。 支持配置小文件合并 使用SQL过程中,生成的小文件过多时,会导致作业执行时间过长,且查询对应表时耗时增大,建议对小文件进行合并。 参考如何合并小文件完成合并小文件。 支持修改非分区表或分区表的列注释 修改非分区表或分区表的列注释。 支持统计SQL作业的CPU消耗 支持在控制台查看“CPU累计使用量”。 支持容器集群Spark日志跳转查看 需要在容器查看日志。 支持动态加载UDF(公测) 无需重启队列UDF即可生效。 Spark UI支持火焰图 Spark UI支持绘制火焰图。 优化SQL作业NOT IN语句查询性能 NOT IN语句查询性能提升。 优化Multi-INSERT语句查询性能 Multi-INSERT语句查询性能提升。
共100000条