华为云用户手册

  • 语法说明 ROW_NUMBER(): 从第一行开始,依次为每一行分配一个唯一且连续的号码。 PARTITION BY col1[, col2...]: 指定分区的列,例如去重的键。 ORDER BY time_attr [asc|desc]: 指定排序的列。所指定的列必须为时间属性。目前仅支持proctime。升序( ASC )排列指只保留第一行,而降序排列( DESC )则只保留最后一行。 WHERE rownum = 1: Flink 需要 rownum = 1 以确定该查询是否为去重查询。
  • Join表函数(UDTF) 功能描述 将表与表函数的结果进行 join 操作。左表(outer)中的每一行将会与调用表函数所产生的所有结果中相关联行进行 join 。 注意事项 针对横向表的左外部连接当前仅支持文本常量 TRUE 作为谓词。 示例 若表函数返回了空结果,左表(outer)的行将会被删除 SELECT users, tag FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag; 若表函数返回了空结果,将会保留相对应的外部行并用空值填充 SELECT users, tag FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE;
  • Join Temporal Table Function 功能描述 注意事项 目前仅支持在 Temporal Tables 上的 inner join 示例 假如Rates是一个 Temporal Table Function, join 可以使用 SQL 进行如下的表达: SELECT o_amount, r_rate FROM Orders, LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency;
  • OVER WINDOW Over Window与Group Window区别在于Over window每一行都会输出一条记录。 语法格式 1 2 3 4 5 6 7 8 9 10 11 SELECT agg1(attr1) OVER ( [PARTITION BY partition_name] ORDER BY proctime|rowtime ROWS BETWEEN (UNBOUNDED|rowCOUNT) PRECEDING AND CURRENT ROW FROM TABLENAME SELECT agg1(attr1) OVER ( [PARTITION BY partition_name] ORDER BY proctime|rowtime RANGE BETWEEN (UNBOUNDED|timeInterval) PRECEDING AND CURRENT ROW FROM TABLENAME 语法说明 表5 参数说明 参数 参数说明 PARTITION BY 指定分组的主键,每个分组各自进行计算。 ORDER BY 指定数据按processing time或event time作为时间戳。 ROWS 个数窗口。 RANGE 时间窗口。 注意事项 所有的聚合必须定义到同一个窗口中,即相同的分区、排序和区间。 当前仅支持 PRECEDING (无界或有界) 到 CURRENT ROW 范围内的窗口、FOLLOWING 所描述的区间并未支持。 ORDER BY 必须指定于单个的时间属性。 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 // 计算从规则启动到目前为止的计数及总和(in proctime) insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 FROM Orders; // 计算最近四条记录的计数及总和(in proctime) insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) as cnt2 FROM Orders; // 计算最近60s的计数及总和(in eventtime),基于事件时间处理,事件时间为Orders中的timeattr字段。 insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY timeattr RANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY timeattr RANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW) as cnt2 FROM Orders;
  • GROUP WINDOW 语法说明 Group Window定义在GROUP BY里,每个分组只输出一条记录,包括以下几种: 分组函数 表1 分组函数表 分组窗口函数 说明 TUMBLE(time_attr, interval) 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。 HOP(time_attr, interval, interval) 定义一个跳跃的时间窗口(在 Table API 中称为滑动窗口)。滑动窗口有一个固定的持续时间( 第二个 interval 参数 )以及一个滑动的间隔(第一个 interval 参数 )。若滑动间隔小于窗口的持续时间,滑动窗口则会出现重叠;因此,行将会被分配到多个窗口中。比如,一个大小为 15 分钟的滑动窗口,其滑动间隔为 5 分钟,将会把每一行数据分配到 3 个 15 分钟的窗口中。滑动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。 SESSION(time_attr, interval) 定义一个会话时间窗口。会话时间窗口没有一个固定的持续时间,但是它们的边界会根据 interval 所定义的不活跃时间所确定;即一个会话时间窗口在定义的间隔时间内没有事件出现,该窗口会被关闭。例如时间窗口的间隔时间是 30 分钟,当其不活跃的时间达到30分钟后,若观测到新的记录,则会启动一个新的会话时间窗口(否则该行数据会被添加到当前的窗口),且若在 30 分钟内没有观测到新纪录,这个窗口将会被关闭。会话时间窗口可以使用事件时间(批处理、流处理)或处理时间(流处理)。 在流处理表中的 SQL 查询中,分组窗口函数的 time_attr 参数必须引用一个合法的时间属性,且该属性需要指定行的处理时间或事件时间。 time_attr设置为event-time时参数类型为timestamp(3)类型。 time_attr设置为processing-time时无需指定类型。 对于批处理的 SQL 查询,分组窗口函数的 time_attr 参数必须是一个timestamp类型的属性。 窗口辅助函数 可以使用以下辅助函数选择组窗口的开始和结束时间戳以及时间属性 表2 窗口辅助函数表 辅助函数 说明 TUMBLE_START(time_attr, interval) HOP_START(time_attr, interval, interval) SESSION_START(time_attr, interval) 返回相对应的滚动、滑动和会话窗口范围内的下界时间戳。 TUMBLE_END(time_attr, interval) HOP_END(time_attr, interval, interval) SESSION_END(time_attr, interval) 返回相对应的滚动、滑动和会话窗口范围以外的上界时间戳。 注意: 范围以外的上界时间戳不可以 在随后基于时间的操作中,作为行时间属性使用,比如基于时间窗口的join以及分组窗口或分组窗口上的聚合。 TUMBLE_ROWTIME(time_attr, interval) HOP_ROWTIME(time_attr, interval, interval) SESSION_ROWTIME(time_attr, interval) 返回的是一个可用于后续需要基于时间的操作的时间属性(rowtime attribute),比如基于时间窗口的join以及 分组窗口或分组窗口上的聚合。 TUMBLE_PROCTIME(time_attr, interval) HOP_PROCTIME(time_attr, interval, interval) SESSION_PROCTIME(time_attr, interval) 返回一个可用于后续需要基于时间的操作的 处理时间参数,比如基于时间窗口的join以及分组窗口或分组窗口上的聚合. 注意:辅助函数必须使用与GROUP BY 子句中的分组窗口函数完全相同的参数来调用. 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 // 每天计算SUM(金额)(事件时间)。 insert into temp SELECT name, TUMBLE_START(ts, INTERVAL '1' DAY) as wStart, SUM(amount) FROM Orders GROUP BY TUMBLE(ts, INTERVAL '1' DAY), name; // 每天计算SUM(金额)(处理时间)。 insert into temp SELECT name, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), name; // 每个小时计算事件时间中最近24小时的SUM(数量)。 insert into temp SELECT product, SUM(amount) FROM Orders GROUP BY HOP(ts, INTERVAL '1' HOUR, INTERVAL '1' DAY), product; // 计算每个会话的SUM(数量),间隔12小时的不活动间隙(事件时间)。 insert into temp SELECT name, SESSION_START(ts, INTERVAL '12' HOUR) AS sStart, SESSION_END(ts, INTERVAL '12' HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(ts, INTERVAL '12' HOUR), name;
  • Union/Union ALL/Intersect/Except 语法格式 1 query UNION [ ALL ] | Intersect | Except query 语法说明 UNION返回多个查询结果的并集。 Intersect返回多个查询结果的交集。 Except返回多个查询结果的差集。 注意事项 集合运算是以一定条件将表首尾相接,所以其中每一个SELECT语句返回的列数必须相同,列的类型一定要相同,列名不一定要相同。 UNION默认是去重的,UNION ALL是不去重的。 示例 输出Orders1和Orders2的并集,不包含重复记录。 1 2 insert into temp SELECT * FROM Orders1 UNION SELECT * FROM Orders2;
  • IN 语法格式 1 2 3 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression WHERE column_name IN (value (, value)* ) | query 语法说明 IN操作符允许在where子句中规定多个值。若表达式在给定的表子查询中存在,则返回 true 。 注意事项 子查询表必须由单个列构成,且该列的数据类型需与表达式保持一致。 示例 输出Orders中NewProducts中product的user和amount信息。 1 2 3 4 5 insert into temp SELECT user, amount FROM Orders WHERE product IN ( SELECT product FROM NewProducts );
  • 按列GROUP BY 功能描述 按列进行分组操作。 语法格式 1 2 3 4 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] 语法说明 GROUP BY:按列可分为单列GROUP BY与多列GROUP BY。 单列GROUP BY:指GROUP BY子句中仅包含一列。 多列GROUP BY:指GROUP BY子句中不止一列,查询语句将按照GROUP BY的所有字段分组,所有字段都相同的记录将被放在同一组中。 注意事项 GroupBy在流处理表中会产生更新结果 示例 根据score及name两个字段对表student进行分组,并返回分组结果。 1 2 insert into temp SELECT name,score, max(score) FROM student GROUP BY name,score;
  • 表达式GROUP BY 功能描述 按表达式对流进行分组操作。 语法格式 1 2 3 4 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] 语法说明 groupItem:可以是单字段,多字段,也可以是字符串函数等调用,不能是聚合函数。 注意事项 无 示例 先利用substring函数取字段name的子字符串,并按照该子字符串进行分组,返回每个子字符串及对应的记录数。 1 2 insert into temp SELECT substring(name,6),count(name) FROM student GROUP BY substring(name,6);
  • Grouping sets, Rollup, Cube 功能描述 GROUPING SETS 的 GROUP BY 子句可以生成一个等效于由多个简单 GROUP BY 子句的 UNION ALL 生成的结果集,并且其效率比 GROUP BY 要高。 ROLLUP与CUBE按一定的规则产生多种分组,然后按各种分组统计数据。 CUBE生成的结果集显示了所选列中值的所有组合的聚合。 Rollup生成的结果集显示了所选列中值的某一层次结构的聚合。 语法格式 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY groupingItem] 语法说明 groupingItem:是Grouping sets(columnName [, columnName]*)、Rollup(columnName [, columnName]*)、Cube(columnName [, columnName]*) 注意事项 无 示例 分别产生基于user和product的结果 INSERT INTO temp SELECT SUM(amount) FROM Orders GROUP BY GROUPING SETS ((user), (product));
  • 功能描述 Raw format 允许读写原始(基于字节)值作为单个列。 注意: 这种格式将 null 值编码成 byte[] 类型的 null。这样在 upsert-kafka 中使用时可能会有限制,因为 upsert-kafka 将 null 值视为 墓碑消息(在键上删除)。因此,如果该字段可能具有 null 值,我们建议避免使用 upsert-kafka 连接器和 raw format 作为 value.format。 Raw format 连接器是内置的。
  • 参数说明 表1 参数 是否必选 默认值 类型 描述 format 是 (none) String 指定要使用的格式, 这里应该是 'raw'。 raw.charset 否 UTF-8 String 指定字符集来编码文本字符串。 raw.endianness 否 big-endian String 指定字节序来编码数字值的字节。有效值为'big-endian'和'little-endian'。 更多细节可查阅字节序。
  • 参数说明 参数 是否必选 默认值 类型 说明 format 是 (none) String 指定使用格式,此处使用'maxwell-json'。 maxwell-json.ignore-parse-errors 否 false Boolean 跳过解析错误而不是失败的字段和行。出现错误时,字段设置为空。 maxwell-json.timestamp-format.standard 否 'SQL' String 指定输入和输出时间戳格式。当前支持的值为“SQL”和“ISO-8601”:选项“SQL”将以“yyyy-MM-dd HH:mm:ss.s{precision}”格式解析输入时间戳,例如“2020-12-30 12” :13:14.123' 并以相同格式输出时间戳。选项'ISO-8601'将以“yyyy-MM-ddTHH:mm:ss.s{precision}”格式解析输入时间戳,例如'2020-12-30T12: 13:14.123' 并以相同格式输出时间戳。 maxwell-json.map-null-key.mode 否 'FAIL' String 在序列化地图数据的空键时指定处理模式。当前支持的值为“FAIL”、“DROP”和“LITERAL”:选项“FAIL”将在遇到带有空键的地图时抛出异常。选项“DROP”将删除地图数据的空键条目。选项“LITERAL”将替换空带字符串文字的键。字符串文字由 maxwell-json.map-null-key.literal 选项定义。 maxwell-json.map-null-key.literal 否 'null' String 当 'maxwell-json.map-null-key.mode' 为 LITERAL 时,指定字符串文字以替换空键。
  • 参数说明 表1 参数 是否必选 默认值 类型 说明 format 是 (none) String 声明使用的格式,这里应为'json'。 json.fail-on-missing-field 否 false Boolean 当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,不抛出错误失败)。 json.ignore-parse-errors 否 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 json.timestamp-format.standard 否 'SQL' String 声明输入和输出的TIMESTAMP和TIMESTAMP WITH LOCAL TIME ZONE 的格式。 当前支持的格式为'SQL'和'ISO-8601': 可选参数 'SQL' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析 TIMESTAMP, 例如 "2020-12-30 12:13:14.123", 以 "yyyy-MM-dd HH:mm:ss.s{precision}'Z'" 的格式解析 TIMESTAMP WITH LOCAL TIME ZONE, 例如 "2020-12-30 12:13:14.123Z" 且会以相同的格式输出。 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入 TIMESTAMP, 例如 "2020-12-30T12:13:14.123" , 以 "yyyy-MM-ddTHH:mm:ss.s{precision}'Z'" 的格式解析 TIMESTAMP WITH LOCAL TIME ZONE, 例如 "2020-12-30T12:13:14.123Z" 且会以相同的格式输出。 json.map-null-key.mode 否 'FALL' String 指定处理 Map 中 key 值为空的方法。当前支持的值有:'FAIL','DROP'和'LITERAL'。 Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。 Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。 Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'json.map-null-key.literal' 定义。 json.map-null-key.literal 否 'null' String 当 'json.map-null-key.mode' 是LITERAL的时候,指定字符串常量替换 Map 中的空 key 值。
  • 参数说明 表1 参数 是否必选 默认值 是否必选 描述 format 是 (none) String 指定要使用的格式,此处应为 'debezium-json'。 debezium-json.schema-include 否 false Boolean 设置 Debezium Kafka Connect 时,用户可以启用 Kafka 配置 'value.converter.schemas.enable' 以在消息中包含 schema。此选项表明 Debezium JSON 消息是否包含 schema。 debezium-json.ignore-parse-errors 否 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 debezium-json.timestamp-format.standard 否 'SQL' String 声明输入和输出的时间戳格式。当前支持的格式为'SQL'和'ISO-8601'。 可选参数 'SQL' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析时间戳, 例如 '2020-12-30 12:13:14.123',且会以相同的格式输出。 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入时间戳, 例如 '2020-12-30T12:13:14.123' ,且会以相同的格式输出。 debezium-json.map-null-key.mode 否 'FAIL' String 指定处理 Map 中 key 值为空的方法。 当前支持的值有FAIL、DROP和LITERAL。 Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。 Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。 Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'debezium-json.map-null-key.literal' 定义。 debezium-json.map-null-key.literal 否 'null' String 当 'debezium-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。
  • 功能描述 Debezium是一个 CDC(Changelog Data Capture,变更数据捕获)的工具,可以把其他数据库的更改实时流式传输到 Kafka 中。 Debezium 为变更日志提供了统一的格式结构,并支持使用 JSON消息。 Flink 支持将 Debezium JSON解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如 将增量数据从数据库同步到其他系统 日志审计 数据库的实时物化视图 关联维度数据库的变更历史,等等。
  • 参数说明 表1 参数 是否必选 默认值 类型 说明 format 是 (none) String 指定要使用的格式,这里应该是 'csv'。 csv.field-delimiter 否 , String 字段分隔符 (默认','),必须为单字符。您可以使用反斜杠字符指定一些特殊字符,例如 '\t' 代表制表符。 您也可以通过 unicode 编码在纯 SQL 文本中指定一些特殊字符,例如 'csv.field-delimiter' = '\u0001' 代表 0x01 字符。 csv.disable-quote-character 否 false Boolean 是否禁止对引用的值使用引号 (默认是 false). 如果禁止,选项 'csv.quote-character' 不能设置。 csv.quote-character 否 ‘’ String 用于围住字段值的引号字符 (默认"). csv.allow-comments 否 false Boolean 是否允许忽略注释行(默认不允许),注释行以 '#' 作为起始字符。 如果允许注释行,请确保 csv.ignore-parse-errors 也开启了从而允许空行。 csv.ignore-parse-errors 否 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 csv.array-element-delimiter 否 ; String 分隔数组和行元素的字符串(默认';'). csv.escape-character 否 (none) String 转义字符(默认关闭). csv.null-literal 否 (none) String 是否将 "null" 字符串转化为 null 值。
  • 功能描述 Avro Schema Registry (avro-confluent) 格式能让您读取被 io.confluent.kafka.serializers.KafkaAvroSerializer序列化的记录,以及可以写入成能被 io.confluent.kafka.serializers.KafkaAvroDeserializer反序列化的记录。 当以这种格式读取(反序列化)记录时,将根据记录中编码的 schema 版本 id 从配置的 Confluent Schema Registry 中获取 Avro writer schema ,而从 table schema 中推断出 reader schema。 当以这种格式写入(序列化)记录时,Avro schema 是从 table schema 中推断出来的,并会用来检索要与数据一起编码的 schema id。我们会在配置的 Confluent Schema Registry 中配置的 subject 下,检索 schema id。subject 通过 avro-confluent.schema-registry.subject 参数来指定。
  • 参数说明 表1 参数说明 参数 是否必选 默认值 类型 说明 format 是 (none) String 指定要使用的格式,此处应为 'canal-json'. canal-json.ignore-parse-errors 否 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 canal-json.timestamp-format.standard 否 'SQL' String 指定输入和输出时间戳格式。当前支持的值是:'SQL'和'ISO-8601'。 选项 'SQL' 将解析 "yyyy-MM-dd HH:mm:ss.s{precision}" 格式的输入时间戳,例如 '2020-12-30 12:13:14.123',并以相同格式输出时间戳。 选项 'ISO-8601' 将解析 "yyyy-MM-ddTHH:mm:ss.s{precision}" 格式的输入时间戳,例如 '2020-12-30T12:13:14.123',并以相同的格式输出时间戳。 canal-json.map-null-key.mode 否 'FALL' String 指定处理 Map 中 key 值为空的方法. 当前支持的值有'FAIL', 'DROP'和 'LITERAL'。 Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。 Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。 Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'canal-json.map-null-key.literal' 定义。 canal-json.map-null-key.literal 否 'null' String 当 'canal-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。 canal-json.database.include 否 (none) String 仅读取指定数据库的 changelog 记录(通过对比 Canal 记录中的 "database" 元数据字段)。 canal-json.table.include 否 (none) String 仅读取指定表的 changelog 记录(通过对比 Canal 记录中的 "table" 元数据字段)。
  • 功能描述 Canal是一个 CDC(ChangeLog Data Capture,变更日志数据捕获)工具,可以实时地将 MySQL 变更传输到其他系统。Canal 为变更日志提供了统一的数据格式,并支持使用 JSON 或 protobuf序列化消息(Canal 默认使用 protobuf)。 Flink 支持将 Canal 的 JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如 将增量数据从数据库同步到其他系统 日志审计 数据库的实时物化视图 关联维度数据库的变更历史,等等。 Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Canal 格式的 JSON 消息,输出到 Kafka 等存储中。 但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Canal 消息。
  • 数据类型映射 目前,Avro schema 通常是从 table schema 中推导而来。尚不支持显式定义 Avro schema。因此,下表列出了从 Flink 类型到 Avro 类型的类型映射。 除了下面列出的类型,Flink 支持读取/写入 nullable 的类型。Flink 将 nullable 的类型映射到 Avro union(something, null),其中 something 是从 Flink 类型转换的 Avro 类型。 表2 数据类型映射 Flink SQL类型 Avro类型 Avro逻辑类型 CHAR / VARCHAR / STRING string - BOOLEAN boolean - BINARY / VARBINARY bytes - DECIMAL fixed decimal TINYINT int - SMALLINT int - INT int - BIGINT long - FLOAT float - DOUBLE double - DATE int date TIME int time-millis TIMESTAMP long timestamp-millis ARRAY array - MAP(key 必须是 string/char/varchar 类型) map - MULTISET(元素必须是 string/char/varchar 类型) map - ROW record -
  • 参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 说明 connector 是 无 String connector类型,需配置为'redis'。 host 是 无 String redis连接地址。 port 否 6379 Integer redis连接端口。 password 否 无 String redis认证密码。 namespace 否 无 String redis key的namespace delimiter 否 : String redis的key和namespace之间的分隔符。 data-type 否 hash String redis的数据类型,有下列选项 hash list set sorted-set string data-type取值约束详见data-type取值约束说明。 schema-syntax 否 fields String redis的schema语义,包含以下值: fields:适用于所有数据类型 fields-scores:适用于sorted set数据类型 array:适用于list、set、sorted set数据类型 array-scores:适用于sorted set数据类型 map:适用于hash、sorted set数据类型 schema-syntax取值约束详见schema-syntax取值约束说明。 deploy-mode 否 standalone String redis集群的部署模式,支持standalone、master-replica、cluster,默认standalone。 retry-count 是 5 Integer 设置每个连接请求的队列大小。如果超过队列大小,则命令调用将导致RedisException。将requestQueueSize设置为较低的值将导致在过载期间或连接处于断开状态时更早出现异常。更高的值意味着达到边界需要更长的时间,但可能会有更多的请求排队,并使用更多的堆空间。默认请设置为2147483647。 connection-timeout-millis 否 10000 Integer 尝试连接redis集群时的最大超时时间。 commands-timeout-millis 否 2000 Integer 等待操作完成响应的最大时间。 rebalancing-timeout-millis 否 15000 Integer redis集群失败时的休眠时间。 scan-keys-count 否 1000 Integer 每次扫描时读取的数量。 default-score 否 0 Double 当data-type设置为“sorted-set”数据类型的默认score。 deserialize-error-policy 否 fail-job Enum 数据解析失败时的处理方式。 枚举类型,包含以下值: fail-job:作业失败 skip-row:跳过当前数据 null-field:设置当前数据为null skip-null-values 否 true Boolean 是否跳过null。 lookup.async 否 false Boolean 作为redis维表时,是否使用异步 I/O。 pwd_auth_name 否 无 String DLI 侧创建的Password类型的跨源认证名称。 使用跨源认证则无需在作业中配置和账号密码。
  • 示例 从Kafka源表中读取数据,将Redis表作为维表,并将二者生成的宽表信息写入Kafka结果表中,其具体步骤如下: 参考增强型跨源连接,根据Redis和Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。 设置Redis和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Redis的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。 登录Redis客户端,通过如下命令向Redis发送如下数据: HMSET 330102 area_province_name a1 area_province_name b1 area_county_name c1 area_street_name d1 region_name e1 HMSET 330106 area_province_name a1 area_province_name b1 area_county_name c2 area_street_name d2 region_name e1 HMSET 330108 area_province_name a1 area_province_name b1 area_county_name c3 area_street_name d3 region_name e1 HMSET 330110 area_province_name a1 area_province_name b1 area_county_name c4 area_street_name d4 region_name e1 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,提交运行作业。该作业脚本将Kafka为数据源,Redis作为维表,数据写入到Kafka结果表中。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaSourceTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); --创建地址维表 create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string, primary key (area_id) not enforced -- redis的key ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'hash', 'deploy-mode' = 'master-replica' ); --根据地址维表生成详细的包含地址的订单信息宽表 create table order_detail( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) with ( 'connector' = 'kafka', 'topic' = 'kafkaSinkTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'format' = 'json' ); insert into order_detail select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name, area.area_id, area.area_province_name, area.area_city_name, area.area_county_name, area.area_street_name, area.region_name from orders left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id; 连接Kafka集群,向Kafka的source topic中插入如下测试数据: {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} 连接Kafka集群,在Kafka的sink topic读取数据,结果数据参考如下: {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106","area_province_name":"a1","area_city_name":"b1","area_county_name":"c2","area_street_name":"d2","region_name":"e1"} {"order_id":"202103251202020001","order_channel":"miniAppShop","order_time":"2021-03-25 12:02:02","pay_amount":60.0,"real_pay":60.0,"pay_time":"2021-03-25 12:03:00","user_id":"0002","user_name":"Bob","area_id":"330110","area_province_name":"a1","area_city_name":"b1","area_county_name":"c4","area_street_name":"d4","region_name":"e1"} {"order_id":"202103251505050001","order_channel":"qqShop","order_time":"2021-03-25 15:05:05","pay_amount":500.0,"real_pay":400.0,"pay_time":"2021-03-25 15:10:00","user_id":"0003","user_name":"Cindy","area_id":"330108","area_province_name":"a1","area_city_name":"b1","area_county_name":"c3","area_street_name":"d3","region_name":"e1"}
  • 语法格式 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (',' watermark for rowtime_column_name as watermark-strategy_expression) ,PRIMARY KEY (attr_name, ...) NOT ENFORCED ) with ( 'connector' = 'redis', 'host' = '' );
  • 示例 从Kafka源表中读取数据,将JDBC表作为维表,并将二者生成的表信息写入Kafka结果表中,其具体步骤如下: 参考增强型跨源连接,在DLI上根据MySQL和Kafka所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。 设置MySQL和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据MySQL和Kafka的地址测试队列连通性。若能连通,则表示跨源已经绑定成功,否则表示未成功。 连接MySQL数据库实例,在flink数据库中创建相应的表,作为维表,表名为area_info,SQL语句如下: CREATE TABLE `flink`.`area_info` ( `area_id` VARCHAR(32) NOT NULL, `area_province_name` VARCHAR(32) NOT NULL, `area_city_name` VARCHAR(32) NOT NULL, `area_county_name` VARCHAR(32) NOT NULL, `area_street_name` VARCHAR(32) NOT NULL, `region_name` VARCHAR(32) NOT NULL, PRIMARY KEY (`area_id`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci; 连接MySQL数据库实例,向JDBC维表area_info中插入测试数据,其语句如下: insert into flink.area_info (area_id, area_province_name, area_city_name, area_county_name, area_street_name, region_name) values ('330102', 'a1', 'b1', 'c1', 'd1', 'e1'), ('330106', 'a1', 'b1', 'c2', 'd2', 'e1'), ('330108', 'a1', 'b1', 'c3', 'd3', 'e1'), ('330110', 'a1', 'b1', 'c4', 'd4', 'e1'); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业运行脚本,提交运行作业。该作业脚本将Kafka为数据源,JDBC作为维表,数据写入到Kafka结果表。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.12”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaSourceTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'jdbc-order', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); --创建地址维表 create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://JDBCAddress:JDBCPort/flink',--其中url中的flink表示MySQL中area_info表所在的数据库名 'table-name' = 'area_info', 'username' = 'JDBCUserName', 'password' = 'JDBCPassWord' ); --根据地址维表生成详细的包含地址的订单信息宽表 create table order_detail( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) with ( 'connector' = 'kafka', 'topic' = 'KafkaSinkTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'format' = 'json' ); insert into order_detail select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name, area.area_id, area.area_province_name, area.area_city_name, area.area_county_name, area.area_street_name, area.region_name from orders left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id; 连接Kafka集群,向Kafka的source topic中插入如下测试数据: {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} 连接Kafka集群,在Kafka的sink topic读取数据,结果参考如下: {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106","area_province_name":"a1","area_city_name":"b1","area_county_name":"c2","area_street_name":"d2","region_name":"e1"} {"order_id":"202103251202020001","order_channel":"miniAppShop","order_time":"2021-03-25 12:02:02","pay_amount":60.0,"real_pay":60.0,"pay_time":"2021-03-25 12:03:00","user_id":"0002","user_name":"Bob","area_id":"330110","area_province_name":"a1","area_city_name":"b1","area_county_name":"c4","area_street_name":"d4","region_name":"e1"} {"order_id":"202103251505050001","order_channel":"qqShop","order_time":"2021-03-25 15:05:05","pay_amount":500.0,"real_pay":400.0,"pay_time":"2021-03-25 15:10:00","user_id":"0003","user_name":"Cindy","area_id":"330108","area_province_name":"a1","area_city_name":"b1","area_county_name":"c3","area_street_name":"d3","region_name":"e1"}
  • 数据类型映射 表2 数据类型映射 MySQL类型 PostgreSQL类型 Flink SQL类型 TINYINT - TINYINT SMALLINT TINYINT UNSIGNED SMALLINT INT2 SMALLSERIAL SERIAL2 SMALLINT INT MEDIUMINT SMALLINT UNSIGNED INTEGER SERIAL INT BIGINT INT UNSIGNED BIGINT BIGSERIAL BIGINT BIGINT UNSIGNED - DECIMAL(20, 0) BIGINT BIGINT BIGINT FLOAT REAL FLOAT4 FLOAT DOUBLE DOUBLE PRECISION FLOAT8 DOUBLE PRECISION DOUBLE NUMERIC(p, s) DECIMAL(p, s) NUMERIC(p, s) DECIMAL(p, s) DECIMAL(p, s) BOOLEAN TINYINT(1) BOOLEAN BOOLEAN DATE DATE DATE TIME [(p)] TIME [(p)] [WITHOUT TIMEZONE] TIME [(p)] [WITHOUT TIMEZONE] DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE] TIMESTAMP [(p)] [WITHOUT TIMEZONE] CHAR(n) VARCHAR(n) TEXT CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT STRING BINARY VARBINARY BLOB BYTEA BYTES - ARRAY ARRAY
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector 是 数据源类型,固定为:jdbc。 url 是 数据库的URL。 table-name 是 读取数据库中的数据所在的表名。 driver 否 连接数据库所需要的驱动。若未配置,则会自动通过URL提取。 username 否 数据库认证用户名,需要和'password'一起配置。 password 否 数据库认证密码,需要和'username'一起配置。 scan.partition.column 否 用于对输入进行分区的列名。 与scan.partition.lower-bound、scan.partition.upper-bound、scan.partition.num必须同时存在或者同时不存在。 scan.partition.lower-bound 否 第一个分区的最小值。 与scan.partition.column、scan.partition.upper-bound、scan.partition.num必须同时存在或者同时不存在 scan.partition.upper-bound 否 最后一个分区的最大值。 与scan.partition.column、scan.partition.lower-bound、scan.partition.num必须同时存在或者同时不存在 scan.partition.num 否 分区的个数。 与scan.partition.column、scan.partition.upper-bound、scan.partition.upper-bound必须同时存在或者同时不存在。 scan.fetch-size 否 每次从数据库拉取数据的行数。默认值为0,表示忽略该提示。 lookup.cache.max-rows 否 维表配置,缓存的最大行数,超过该值时,最先添加的数据将被标记为过期。-1表示不使用缓存。 lookup.cache.ttl 否 维表配置,缓存超时时间,超过该时间的数据会被剔除。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。 lookup.max-retries 否 维表配置,数据拉取最大重试次数,默认为3。 pwd_auth_name 否 DLI侧创建的Password类型的跨源认证名称。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 CREATE TABLE table_id ( attr_name attr_type (',' attr_name attr_type)* ) WITH ( 'connector' = 'jdbc', 'url' = '', 'table-name' = '', 'driver' = '', 'username' = '', 'password' = '' );
  • 常见问题 Q:Flink作业日志中有如下报错信息应该怎么解决? org.apache.zookeeper.ClientCnxn$SessionTimeoutException: Client session timed out, have not heard from server in 90069ms for connection id 0x0 A:可能是跨源连接未绑定或跨源绑定失败。参考增强型跨源连接重新配置跨源,Kafka集群安全组放通DLI队列的网段地址。
  • 前提条件 该场景作业需要运行在DLI的独享队列上,因此要与HBase建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖探索 用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 若使用 MRS HBase,请在增强型跨源的主机信息中添加MRS集群所有节点的主机IP信息。 详细操作请参考《 数据湖 探索用户指南》中的“修改主机信息”章节描述。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全