华为云用户手册

  • 示例 根据order_id对数据进行去重,其中proctime为事件时间属性列 SELECT order_id, user, product, number FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) as row_num FROM Orders) WHERE row_num = 1;
  • Join表函数(UDTF) 功能描述 将表与表函数的结果进行 join 操作。左表(outer)中的每一行将会与调用表函数所产生的所有结果中相关联行进行 join 。 注意事项 针对横向表的左外部连接当前仅支持文本常量 TRUE 作为谓词。 示例 如果表函数返回了空结果,左表(outer)的行将会被删除 SELECT users, tag FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag; 如果表函数返回了空结果,将会保留相对应的外部行并用空值填充 SELECT users, tag FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE;
  • Join Temporal Table Function 功能描述 注意事项 目前仅支持在 Temporal Tables 上的 inner join 示例 假如Rates是一个 Temporal Table Function, join 可以使用 SQL 进行如下的表达: SELECT o_amount, r_rate FROM Orders, LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency;
  • 示例 查询为每个订单计算前一个小时之内接收到的同一产品所有订单的总金额。 1 2 3 4 5 6 7 SELECT order_id, order_time, amount, SUM(amount) OVER ( PARTITION BY product ORDER BY order_time RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW ) AS one_hour_prod_amount_sum FROM Orders
  • 注意事项 当前仅支持 PRECEDING (无界或有界) 到 CURRENT ROW 范围内的窗口、FOLLOWING 所描述的区间并未支持。 ORDER BY 必须指定于单个的时间属性。 可以在一个 SELECT 子句中定义多个 OVER 窗口聚合。然而,对于流式查询,由于目前的限制,所有聚合的 OVER 窗口必须是相同的。 OVER 窗口需要数据是有序的。因为表没有固定的排序,所以 ORDER BY 子句是强制的。对于流式查询,Flink 目前只支持 OVER 窗口定义在升序(asc)的时间属性上。其他的排序不支持。
  • 语法说明 SELECT order_id, order_time, amount, SUM(amount) OVER w AS sum_amount, AVG(amount) OVER w AS avg_amount FROM Orders WINDOW w AS ( PARTITION BY product ORDER BY order_time RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) ORDER BY:OVER 窗口需要数据是有序的。因为表没有固定的排序,所以 ORDER BY 子句是强制的。对于流式查询,Flink 目前只支持 OVER 窗口定义在升序(asc)的时间属性上。其他的排序不支持。 PARTITION BY:OVER 窗口可以定义在一个分区表上。PARTITION BY 子句代表着每行数据只在其所属的数据分区进行聚合。 范围(RANGE)定义:范围(RANGE)定义指定了聚合中包含了多少行数据。范围通过 BETWEEN 子句定义上下边界,其内的所有行都会聚合。Flink 只支持 CURRENT ROW 作为上边界。有两种方法可以定义范围:ROWS 间隔 和 RANGE 间隔: RANGE 间隔 RANGE 间隔是定义在排序列值上的,在 Flink 里,排序列总是一个时间属性。下面的 RANG 间隔定义了聚合会在比当前行的时间属性小 30 分钟的所有行上进行。 RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW ROW 间隔 ROWS 间隔基于计数。它定义了聚合操作包含的精确行数。下面的 ROWS 间隔定义了当前行 + 之前的 10 行(也就是11行)都会被聚合。 ROWS BETWEEN 10 PRECEDING AND CURRENT ROW WINDOW:WINDOW 子句可用于在 SELECT 子句之外定义 OVER 窗口。它让查询可读性更好,也允许多个聚合共用一个窗口定义。
  • GROUPING SETS Grouping Sets 可以通过一个标准的 GROUP BY 语句来描述更复杂的分组操作。数据按每个指定的 Grouping Sets 分别分组,并像简单的 group by 子句一样为每个组进行聚合。 GROUPING SETS 的每个子列表可以是:空的,多列或表达式,它们的解释方式和直接使用 GROUP BY 子句是一样的。一个空的 Grouping Sets 表示所有行都聚合在一个分组下,即使没有数据,也会输出结果。 对于 Grouping Sets 中的空子列表,结果数据中的分组或表达式列会用NULL代替。 SELECT supplier_id, rating, COUNT(*) AS total FROM (VALUES ('supplier1', 'product1', 4), ('supplier1', 'product2', 3), ('supplier2', 'product3', 3), ('supplier2', 'product4', 4)) AS Products(supplier_id, product_id, rating) GROUP BY GROUPING SETS ((supplier_id, rating), (supplier_id), ())
  • CUBE CUBE 是一种特定通用类型 Grouping Sets 的简写。代表着指定列表以及所有可能的子集和幂集。 例如:下面两个查询是等效的。 SELECT supplier_id, rating, product_id, COUNT(*) FROM (VALUES ('supplier1', 'product1', 4), ('supplier1', 'product2', 3), ('supplier2', 'product3', 3), ('supplier2', 'product4', 4)) AS Products(supplier_id, product_id, rating) GROUP BY CUBE (supplier_id, rating, product_id) SELECT supplier_id, rating, product_id, COUNT(*) FROM (VALUES ('supplier1', 'product1', 4), ('supplier1', 'product2', 3), ('supplier2', 'product3', 3), ('supplier2', 'product4', 4)) AS Products(supplier_id, product_id, rating) GROUP BY GROUPING SET ( ( supplier_id, product_id, rating ), ( supplier_id, product_id ), ( supplier_id, rating ), ( supplier_id ), ( product_id, rating ), ( product_id ), ( rating ), ( ) )
  • ROLLUP ROLLUP 是一种特定通用类型 Grouping Sets 的简写。代表着指定表达式和所有前缀的列表,包括空列表。 SELECT supplier_id, rating, COUNT(*) FROM (VALUES ('supplier1', 'product1', 4), ('supplier1', 'product2', 3), ('supplier2', 'product3', 3), ('supplier2', 'product4', 4)) AS Products(supplier_id, product_id, rating) GROUP BY ROLLUP (supplier_id, rating)
  • INNER/LEFT/RIGHT/FULL OUTER INNER/LEFT/RIGHT/FULL OUTER 这几种窗口关联的语法非常相似,我们在这里只举一个 FULL OUTER JOIN 的例子。 当执行窗口关联时,所有具有相同 key 和相同滚动窗口的数据会被关联在一起。这里给出一个基于 TUMBLE Window TVF 的窗口连接的例子。 在下面的例子中,通过将 join 的时间区域限定为固定的 5 分钟,数据集被分成两个不同的时间窗口:[12:00,12:05) 和 [12:05,12:10)。L2 和 R2 不能 join 在一起是因为它们不在一个窗口中。 语法格式 SELECT ... FROM L [LEFT|RIGHT|FULL OUTER] JOIN R -- L and R are relations applied windowing TVF ON L.window_start = R.window_start AND L.window_end = R.window_end AND ...
  • 功能描述 窗口去重是一种特殊的去重,它根据指定的多个列来删除重复的行,保留每个窗口和分区键的第一个或最后一个数据。 对于流式查询,与普通去重不同,窗口去重只在窗口的最后返回结果数据,不会产生中间结果。它会清除不需要的中间状态。 因此,窗口去重查询在用户不需要更新结果时,性能较好。通常,窗口去重直接用于窗口表值函数上。另外,它可以用于基于窗口表值函数的操作。比如窗口聚合,窗口TopN和窗口关联。 窗口Top-N的语法和普通的Top-N相同。 除此之外,窗口去重需要 PARTITION BY 子句包含表的 window_start 和 window_end 列。 否则优化器无法翻译。 Flink 使用 ROW_NUMBER() 移除重复数据,就像窗口TopN一样。理论上,窗口是一种特殊的窗口 Top-N:N是1并且是根据处理时间或事件时间排序的。 更多介绍和使用请参考开源社区文档:窗口去重。
  • 功能描述 窗口 Top-N 是特殊的 Top-N,它返回每个分区键的每个窗口的N个最小或最大值。 与普通Top-N不同,窗口Top-N只在窗口最后返回汇总的Top-N数据,不会产生中间结果。窗口 Top-N 会在窗口结束后清除不需要的中间状态。 窗口 Top-N 适用于用户不需要每条数据都更新Top-N结果的场景,相对普通Top-N来说性能更好。通常,窗口 Top-N 直接用于窗口表值函数(Windowing TVFs)窗口 Top-N 可以用于基于窗口表值函数(Windowing TVFs)的操作之上,比如窗口聚合,窗口Top-N和 窗口关联。 窗口 Top-N 的语法和普通的 Top-N 相同。 除此之外,窗口 Top-N 需要 PARTITION BY 子句包含窗口表值函数或窗口聚合产生的 window_start 和 window_end。 否则优化器无法翻译。 更多介绍和使用请参考开源社区文档:窗口Top-N。
  • 窗口表值函数(Windowing TVFs) 窗口是处理无限流的核心。窗口把流分割为有限大小的 “桶”,这样就可以在其之上进行计算。 Apache Flink 提供了如下 窗口表值函数(table-valued function, 缩写TVF)把表的数据划分到窗口中: 滚动窗口 滑动窗口 累积窗口 逻辑上,每个元素可以应用于一个或多个窗口,这取决于所使用的窗口表值函数的类型。例如:滑动窗口可以把单个元素分配给多个窗口。 窗口表值函数 是 Flink 定义的多态表函数(Polymorphic Table Function,缩写PTF),PTF 是 SQL 2016 标准中的一种特殊的表函数,它可以把表作为一个参数。 窗口表值函数是分组函数(已废弃)的替代方案。窗口表值函数 更符合 SQL 标准,在支持基于窗口的复杂计算上也更强大。例如:窗口 TopN、窗口 Join。而分组窗口函数只支持窗口聚合。 更多介绍和使用请参考开源社区文档:窗口函数。
  • 窗口函数简介 Apache Flink 提供3个内置的窗口表值函数:TUMBLE,HOP 和 CUMULATE。 窗口表值函数的返回值包括原生列和附加的三个指定窗口的列,分别是:“window_start”,“window_end”,“window_time”。 在批计算模式,window_time 是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型(具体哪种类型取决于输入的时间字段类型)的字段。 window_time 字段用于后续基于时间的操作,例如:其他的窗口表值函数,或者interval joins,over aggregations。 它的值总是等于 window_end - 1ms。
  • 累积窗口(CUMULATE) 功能描述 累积窗口在某些场景中非常有用,比如说提前触发的滚动窗口。例如:每日仪表盘从 00:00 开始每分钟绘制累积 UV,10:00 时 UV 就是从 00:00 到 10:00 的UV 总数。累积窗口可以简单且有效地实现它。 CUMULATE 函数指定元素到多个窗口,从初始的窗口开始,直到达到最大的窗口大小的窗口,所有的窗口都包含其区间内的元素,另外,窗口的开始时间是固定的。 您可以将 CUMULATE 函数视为首先应用具有最大窗口大小的 TUMBLE 窗口,然后将每个滚动窗口拆分为具有相同窗口开始但窗口结束步长不同的几个窗口。 所以累积窗口会产生重叠并且没有固定大小。 例如:1小时步长,24小时大小的累计窗口,每天可以获得如下这些窗口:[00:00, 01:00),[00:00, 02:00),[00:00, 03:00), …, [00:00, 24:00) 图3 累积窗口示例图
  • OVER WINDOW Over Window与Group Window区别在于Over window每一行都会输出一条记录。 语法格式 1 2 3 4 5 6 7 8 9 10 11 SELECT agg1(attr1) OVER ( [PARTITION BY partition_name] ORDER BY proctime|rowtime ROWS BETWEEN (UNBOUNDED|rowCOUNT) PRECEDING AND CURRENT ROW FROM TABLENAME SELECT agg1(attr1) OVER ( [PARTITION BY partition_name] ORDER BY proctime|rowtime RANGE BETWEEN (UNBOUNDED|timeInterval) PRECEDING AND CURRENT ROW FROM TABLENAME 语法说明 表5 参数说明 参数 参数说明 PARTITION BY 指定分组的主键,每个分组各自进行计算。 ORDER BY 指定数据按processing time或event time作为时间戳。 ROWS 个数窗口。 RANGE 时间窗口。 注意事项 所有的聚合必须定义到同一个窗口中,即相同的分区、排序和区间。 当前仅支持 PRECEDING (无界或有界) 到 CURRENT ROW 范围内的窗口、FOLLOWING 所描述的区间并未支持。 ORDER BY 必须指定于单个的时间属性。 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 // 计算从规则启动到目前为止的计数及总和(in proctime) insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 FROM Orders; // 计算最近四条记录的计数及总和(in proctime) insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) as cnt2 FROM Orders; // 计算最近60s的计数及总和(in eventtime),基于事件时间处理,事件时间为Orders中的timeattr字段。 insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY timeattr RANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY timeattr RANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW) as cnt2 FROM Orders;
  • TUMBLE WINDOW扩展 功能描述 DLI TUMBLE函数功能增强主要包括以下功能: TUMBLE窗口周期性触发,控制延迟 TUMBLE窗口结束之前,可以根据设置的触发频率周期性地触发窗口,输出从窗口开始时间到当前周期时间窗口内的计算结果值,但不影响最终窗口输出值,从而在窗口结束前的每个周期都可以看到最新的结果。 提高数据的精确性 在窗口结束后,允许设置延迟时间。根据设置的延迟时间,每到达一个迟到数据,则更新窗口的输出结果 注意事项 如果使用insert语句将结果写入sink中,则sink需要支持upsert模式,所以结果表需要支持upsert操作,且定义主键。 延迟时间设置仅用于事件时间,在处理时间中不生效。 辅助函数必须使用与 GROUP BY 子句中的分组窗口函数完全相同的参数来调用。 如果使用事件时间,则需要使用watermark标识,代码如下(其中order_time被标识为事件时间列,watermark时间设置为3秒): CREATE TABLE orders ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, watermark for order_time as order_time - INTERVAL '3' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); 如果使用处理时间,则需要使用计算列设置,其代码如下(其中proc即为处理时间列): CREATE TABLE orders ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proc as proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); 语法格式 TUMBLE(time_attr, window_interval, period_interval, lateness_interval) 语法示例 例如当前time_attr属性列为:testtime,窗口时间间隔为10秒,设置延迟时间为10秒语法示例为: TUMBLE(testtime, INTERVAL '10' SECOND, INTERVAL '10' SECOND, INTERVAL '10' SECOND)
  • GROUP WINDOW 语法说明 Group Window定义在GROUP BY里,每个分组只输出一条记录,包括以下几种: 分组函数 在流处理表中的 SQL 查询中,分组窗口函数的 time_attr 参数必须引用一个合法的时间属性,且该属性需要指定行的处理时间或事件时间。 time_attr设置为event-time时参数类型为timestamp(3)类型。 time_attr设置为processing-time时无需指定类型。 对于批处理的 SQL 查询,分组窗口函数的 time_attr 参数必须是一个timestamp类型的属性。 表1 分组函数表 分组窗口函数 说明 TUMBLE(time_attr, interval) 定义一个滚动窗口。 滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。 例如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。 滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。 HOP(time_attr, interval, interval) 定义一个跳跃的时间窗口(在 Table API 中称为滑动窗口)。 滑动窗口有一个固定的持续时间( 第二个 interval 参数 )以及一个滑动的间隔(第一个 interval 参数 )。 如果滑动间隔小于窗口的持续时间,滑动窗口则会出现重叠;因此,行将会被分配到多个窗口中。 例如,一个大小为 15 分钟的滑动窗口,其滑动间隔为 5 分钟,将会把每一行数据分配到 3 个 15 分钟的窗口中。滑动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。 SESSION(time_attr, interval) 定义一个会话时间窗口。 会话时间窗口没有一个固定的持续时间,但是它们的边界会根据 interval 所定义的不活跃时间所确定;即一个会话时间窗口在定义的间隔时间内没有事件出现,该窗口会被关闭。 例如时间窗口的间隔时间是 30 分钟,当其不活跃的时间达到30分钟后,如果观测到新的记录,则会启动一个新的会话时间窗口(否则该行数据会被添加到当前的窗口),且如果在 30 分钟内没有观测到新纪录,这个窗口将会被关闭。会话时间窗口可以使用事件时间(批处理、流处理)或处理时间(流处理)。 窗口辅助函数 可以使用以下辅助函数选择组窗口的开始和结束时间戳以及时间属性。 辅助函数必须使用与GROUP BY 子句中的分组窗口函数完全相同的参数来调用。 表2 窗口辅助函数表 辅助函数 说明 TUMBLE_START(time_attr, interval) HOP_START(time_attr, interval, interval) SESSION_START(time_attr, interval) 返回相对应的滚动、滑动和会话窗口范围内的下界时间戳。 TUMBLE_END(time_attr, interval) HOP_END(time_attr, interval, interval) SESSION_END(time_attr, interval) 返回相对应的滚动、滑动和会话窗口范围以外的上界时间戳。 范围以外的上界时间戳不可以 在随后基于时间的操作中,作为行时间属性使用,比如基于时间窗口的join以及分组窗口或分组窗口上的聚合。 TUMBLE_ROWTIME(time_attr, interval) HOP_ROWTIME(time_attr, interval, interval) SESSION_ROWTIME(time_attr, interval) 返回的是一个可用于后续需要基于时间的操作的时间属性(rowtime attribute),比如基于时间窗口的join以及 分组窗口或分组窗口上的聚合。 TUMBLE_PROCTIME(time_attr, interval) HOP_PROCTIME(time_attr, interval, interval) SESSION_PROCTIME(time_attr, interval) 返回一个可用于后续需要基于时间的操作的 处理时间参数,比如基于时间窗口的join以及分组窗口或分组窗口上的聚合. 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 // 每天计算SUM(金额)(事件时间)。 insert into temp SELECT name, TUMBLE_START(ts, INTERVAL '1' DAY) as wStart, SUM(amount) FROM Orders GROUP BY TUMBLE(ts, INTERVAL '1' DAY), name; // 每天计算SUM(金额)(处理时间)。 insert into temp SELECT name, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), name; // 每个小时计算事件时间中最近24小时的SUM(数量)。 insert into temp SELECT product, SUM(amount) FROM Orders GROUP BY HOP(ts, INTERVAL '1' HOUR, INTERVAL '1' DAY), product; // 计算每个会话的SUM(数量),间隔12小时的不活动间隙(事件时间)。 insert into temp SELECT name, SESSION_START(ts, INTERVAL '12' HOUR) AS sStart, SESSION_END(ts, INTERVAL '12' HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(ts, INTERVAL '12' HOUR), name;
  • Union/Union ALL/Intersect/Except 语法格式 1 query UNION [ ALL ] | Intersect | Except query 语法说明 UNION返回多个查询结果的并集。 Intersect返回多个查询结果的交集。 Except返回多个查询结果的差集。 注意事项 集合运算是以一定条件将表首尾相接,所以其中每一个SELECT语句返回的列数必须相同,列的类型一定要相同,列名不一定要相同。 UNION默认是去重的,UNION ALL是不去重的。 示例 输出Orders1和Orders2的并集,不包含重复记录。 1 2 insert into temp SELECT * FROM Orders1 UNION SELECT * FROM Orders2;
  • IN 语法格式 1 2 3 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression WHERE column_name IN (value (, value)* ) | query 语法说明 IN操作符允许在where子句中规定多个值。如果表达式在给定的表子查询中存在,则返回 true 。 注意事项 子查询表必须由单个列构成,且该列的数据类型需与表达式保持一致。 示例 输出Orders中NewProducts中product的user和amount信息。 1 2 3 4 5 insert into temp SELECT user, amount FROM Orders WHERE product IN ( SELECT product FROM NewProducts );
  • Grouping sets, Rollup, Cube 功能描述 GROUPING SETS 的 GROUP BY 子句可以生成一个等效于由多个简单 GROUP BY 子句的 UNION ALL 生成的结果集,并且其效率比 GROUP BY 要高。 ROLLUP与CUBE按一定的规则产生多种分组,然后按各种分组统计数据。 CUBE生成的结果集显示了所选列中值的所有组合的聚合。 Rollup生成的结果集显示了所选列中值的某一层次结构的聚合。 语法格式 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY groupingItem] 语法说明 groupingItem:是Grouping sets(columnName [, columnName]*)、Rollup(columnName [, columnName]*)、Cube(columnName [, columnName]*) 注意事项 无 示例 分别产生基于user和product的结果 INSERT INTO temp SELECT SUM(amount) FROM Orders GROUP BY GROUPING SETS ((user), (product));
  • 表达式GROUP BY 功能描述 按表达式对流进行分组操作。 语法格式 1 2 3 4 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] 语法说明 groupItem:可以是单字段,多字段,也可以是字符串函数等调用,不能是聚合函数。 注意事项 无 示例 先利用substring函数取字段name的子字符串,并按照该子字符串进行分组,返回每个子字符串及对应的记录数。 1 2 insert into temp SELECT substring(name,6),count(name) FROM student GROUP BY substring(name,6);
  • 按列GROUP BY 功能描述 按列进行分组操作。 语法格式 1 2 3 4 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] 语法说明 GROUP BY:按列可分为单列GROUP BY与多列GROUP BY。 单列GROUP BY:指GROUP BY子句中仅包含一列。 多列GROUP BY:指GROUP BY子句中不止一列,查询语句将按照GROUP BY的所有字段分组,所有字段都相同的记录将被放在同一组中。 注意事项 GroupBy在流处理表中会产生更新结果 示例 根据score及name两个字段对表student进行分组,并返回分组结果。 1 2 insert into temp SELECT name,score, max(score) FROM student GROUP BY name,score;
  • SPLIT_INDEX 函数说明 SPLIT_INDEX(string1, string2, integer1) 参数说明 string1: 类型:STRING 说明:需要被分割的原始字符串。 示例:'a,b,c,d' 或 'a\bc\bd' string2: 类型:STRING 说明:分割字符串的分隔符。 特殊字符处理: 如果分隔符中包含特殊字符(如 \\、* 等),需要使用双反斜杠 \\ 进行转义。 分隔符为 . 时不需要转义。 如果分隔符为 \\ 本身,应写为 '\\\\'。 integer1: 类型:INT 说明:指定提取的分割后的部分索引,从 0 开始计数。 示例:0 表示提取第一个部分,1 表示提取第二个部分,依此类推。 返回值 返回分割后的字符串中指定索引位置的部分。 如果索引超出范围或输入参数为 NULL,返回 NULL。 示例1 SELECT SPLIT_INDEX('a,b,c,d', ',', 1); -- 返回 'b' 示例2: 点号(.)作为分隔符,不需要转义: SELECT SPLIT_INDEX('a.b.c.d', '.', 2); -- 返回 'c' 示例3: 反斜杠(\)作为分隔符: 反斜杠本身也需要转义,使用 '\\\\': SELECT SPLIT_INDEX('a\\bc\\bd', '\\\\', 1); -- 返回 'bc'
  • 字符串函数 数据湖探索 (DLI)提供了丰富的字符串函数,用于处理和转换字符串数据。这些函数包括拼接、大小写转换、截取子串、替换、正则匹配、编码解码、格式转换等。此外,还支持字符串长度计算、位置查找、填充、反转等功能,以及从JSON字符串中提取值的JSON_VAL函数。这些功能广泛应用于数据清洗、文本处理和数据分析场景,为开发者提供强大的工具支持。 字符串函数简介请参考表1,更多内容参考Apache Flink。 表1 字符串函数 函数 返回类型 描述 string1 || string2 STRING 返回两个字符串的拼接 CHAR_LENGTH(string) CHARACTER_LENGTH(string) INT 返回字符串中的字符数量 UPPER(string) STRING 返回字符串的大写形式 LOWER(string) STRING 返回字符串的小写形式 POSITION(string1 IN string2) INT 返回第一个字符串在第二个字符串中首次出现的位置。若第一个字符串不存在与第二个字符串,则返回0 TRIM([ BOTH | LEADING | TRAILING ] string1 FROM string2) STRING 去除string2字符串的首尾(或首部、或尾部)的string1字符串 LTRIM(string) STRING 返回去除首部空格后的字符串 例如LTRIM(' This is a test String.') 返回"This is a test String." RTRIM(string) STRING 返回去除尾部空格后的字符串 例如RTRIM('This is a test String. ') 返回"This is a test String." REPEAT(string, integer) STRING 返回integer个string连接后的字符串 例如REPEAT('This is a test String.', 2) 返回"This is a test String.This is a test String." REGEXP_REPLACE(string1, string2, string3) STRING 用string3代替string1中的符合正则表达式string2的字符串,并返回替换后的string1字符串 例如REGEXP_REPLACE('foobar', 'oo|ar', '') 返回"fb" REGEXP_REPLACE('ab\ab', '\\', 'e')返回"abeab" OVERLAY(string1 PLACING string2 FROM integer1 [ FOR integer2 ]) STRING 用string2代替string1中的字符串,从integer1开始,替换长度为integer2,并返回替换后的string1字符串 integer2默认为string2的长度 例如OVERLAY('This is an old string' PLACING ' new' FROM 10 FOR 5)返回"This is a new string" SUBSTRING(string FROM integer1 [ FOR integer2 ]) STRING 返回string中从integer1位置开始的长度为integer2的子字符串。若integer2未配置,则默认返回从integer1开始到末尾的子字符串 REPLACE(string1, string2, string3) STRING 用string3代替string1中的string2后的字符串,并返回替换后的string1字符串 例如:REPLACE('hello world', 'world', 'flink') 返回"hello flink" REPLACE('ababab', 'abab', 'z') 返回"zab" REPLACE('ab\\ab', '\\', 'e')返回"abeab" REGEXP_EXTRACT(string1, string2[, integer]) STRING 使用正则表达式string2匹配抽取字符串string1中的第integer个字串,integer从1开始,正则匹配提取。 若参数为 NULL或者正则不合法,则返回NULL。 例如REGEXP_EXTRACT('foothebar', 'foo(.*?)(bar)', 2)" 返回"bar" INITCAP(string) STRING 返回将字符串的首字符大写其余字符转为小写后的字符串 CONCAT(string1, string2,...) STRING 返回将两个或多个字符串拼接后的新字符串。 例如 CONCAT('AA', 'BB', 'CC') 返回"AABBCC" CONCAT_WS(string1, string2, string3,...) STRING 返回将每个参数和第一个参数指定的分隔符依次连接到一起组成的字符串。若string1是null,则返回null。若其他参数为null,在执行拼接过程中跳过取值为null的参数 例如CONCAT_WS('~', 'AA', NULL, 'BB', '', 'CC') 返回"AA~BB~~CC" LPAD(string1, integer, string2) STRING 将string2字符串拼接到string1字符串的左端,直到新的字符串达到指定长度integer为止 任意参数为null时,返回null 若integer为负数,则返回null 若integer不大于string1的长度,则返回string1裁剪为integer长度的字符串 例如LPAD('hi',4,'??') 返回"??hi" LPAD('hi',1,'??') 返回"h" RPAD(string1, integer, string2) STRING 将string2字符串拼接到string1字符串的右端,直到新的字符串达到指定长度integer为止 任意参数为null时,返回null 若integer为负数,则返回null 若integer不大于string1的长度,则返回string1裁剪为integer长度的字符串 例如RPAD('hi',4,'??') 返回 "hi??" RPAD('hi',1,'??') 返回"h" FROM_BASE64(string) STRING 将base64编码的字符串str解析成对应字符串 若字符串为null,则返回null 例如FROM_BASE64('aGVsbG8gd29ybGQ=') 返回"hello world" TO_BASE64(string) STRING 将字符串基于base64编码 若字符串为null,则返回null 例如TO_BASE64('hello world') 返回"aGVsbG8gd29ybGQ=" ASCII(string) INT 返回字符串的第一个字符的ASCII值 若字符串为null,则返回null 例如ascii('abc') 返回97 ascii(CAST(NULL AS VARCHAR)) 返回NULL CHR(integer) STRING 将ASCII码转换为字符 若integer大于255,则计算出integer除以255的余数,并将余数作为ASCII码值 若integer为null,则返回null chr(97) 返回a chr(353) 返回a DECODE(binary, string) STRING 使用提供的字符集string解码参数binary,字符集可以为'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16' 若任意参数为null,则返回null ENCODE(strinh1, string2) STRING 使用提供的字符集string2编码字符串string1,字符集可以为'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16' 若任意参数为null,则返回null INSTR(string1, string2) INT 返回string2在string1中首次出现的位置 若有参数为null,则返回null LEFT(string, integer) STRING 返回最左边的integer个字符 若integer为负数,则返回空 若存在参数为null,则返回null RIGHT(string, integer) STRING 返回最右侧的integer个字符 若integer为负数,则返回空 若存在参数为null,则返回null LOCATE(string1, string2[, integer]) INT 返回string1在string2的位置integer之后首次出现的位置 若string1在string2的位置integer之后不存在,则返回0 若integer不存在,则默认为0 若存在参数为null,则返回null PARSE_URL(string1, string2[, string3]) STRING 返回URL string1中指定的部分解析后的值 string2为'HOST'、'PATH'、'QUERY'、'REF'、'PROTOCOL'、'AUTHORITY'、'FILE'或'USERINFO' 若存在参数为null,则返回null 若string2为QUERY,也可以指定QUERY中的key为string3 例如: parse_url('http://facebook.com/path1/p.php?k1=v1&k2=v2#Ref1', 'HOST')返回 'facebook.com' parse_url('http://facebook.com/path1/p.php?k1=v1&k2=v2#Ref1', 'QUERY', 'k1') 返回'v1' REGEXP(string1, string2) BOOLEAN 对指定的字符串执行一个正则表达式搜索,并返回一个BOOLEAN值表示是否找到指定的匹配模式。若找到,则返回TRUE。其中string1表示指定的字符串,string2表示正则表达式 若存在参数为null,则返回null REVERSE(string) STRING 反转字符串,返回字符串值的相反顺序。 若存在参数为null,则返回null 说明: 使用时请注意需在函数上加反引号 ` REVERSE ` 。 SPLIT_INDEX(string1, string2, integer1) STRING 以string2作为分隔符,将字符串string1分割成若干段,取其中的第integer1段。integer1从0开始 若integer1为负数,则返回null 如果任一参数为null,则返回null 具体函数说明请参考SPLIT_INDEX 函数说明。 STR_TO_MAP(string1[, string2, string3]]) MAP 使用string2分隔符将string1分割成K-V对,并使用string3分隔每个K-V对,组装成MAP返回 string2默认为',' string3默认为'=' SUBSTR(string[, integer1[, integer2]]) STRING 截取从位置integer1开始,长度为integer2的子串,并返回 若为指定integer2,翻截取到字符串结尾 JSON_VAL(STRING json_string, STRING json_path) STRING 从json形式的字符串json_string中提取指定json_path的值。具体函数使用可以参考JSON_VAL函数说明说明。 说明: 以下规则优先级按照顺序从高到低。 不允许json_string和json_path为NULL json_string格式必须为合法的json串,否则函数返回NULL json_string为空字符串,则函数返回空字符串 json_path为空字串或路径不存在,则函数返回NULL
  • JSON_VAL函数使用说明 语法 STRING JSON_VAL(STRING json_string, STRING json_path) 表2 参数说明 参数 数据类型 说明 json_string STRING 需要解析的JSON对象,使用字符串表示。 json_path STRING 解析JSON的路径表达式,使用字符串表示。 目前path支持如下表达式参考下表表3。 表3 json_path参数支持的表达式 表达式 说明 $ 根对象 [] 数组下标 * 数组通配符 . 取子元素 示例 测试输入数据。 测试数据源kafka,具体消息内容参考如下: "{name:James,age:24,gender:male,grade:{math:95,science:[80,85],english:100}}" "{name:James,age:24,gender:male,grade:{math:95,science:[80,85],english:100}]" 使用JSON_VAL编写SQL create table kafkaSource( message STRING ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'topic-swq', 'connector.properties.bootstrap.servers' = 'xxx.xxx.xxx.xxx:9092,yyy.yyy.yyy:9092,zzz.zzz.zzz.zzz:9092', 'connector.startup-mode' = 'earliest-offset', 'format.field-delimiter' = '|', 'format.type' = 'csv' ); create table kafkaSink( message1 STRING, message2 STRING, message3 STRING, message4 STRING, message5 STRING, message6 STRING ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'topic-swq-out', 'connector.properties.bootstrap.servers' = 'xxx.xxx.xxx.xxx:9092,yyy.yyy.yyy:9092,zzz.zzz.zzz.zzz:9092', 'format.type' = 'json' ); INSERT INTO kafkaSink SELECT JSON_VAL(message,""), JSON_VAL(message,"$.name"), JSON_VAL(message,"$.grade.science"), JSON_VAL(message,"$.grade.science[*]"), JSON_VAL(message,"$.grade.science[1]"), JSON_VAL(message,"$.grade.dddd") FROM kafkaSource; 查看输出结果 {"message1":null,"message2":"swq","message3":"[80,85]","message4":"[80,85]","message5":"85","message6":null} {"message1":null,"message2":null,"message3":null,"message4":null,"message5":null,"message6":null}
  • 字符串函数 数据湖 探索(DLI)提供了丰富的字符串函数,用于处理和转换字符串数据。这些函数包括拼接、大小写转换、截取子串、替换、正则匹配、编码解码、格式转换等。此外,还支持字符串长度计算、位置查找、填充、反转等功能,以及从JSON字符串中提取值的JSON_VAL函数。这些功能广泛应用于数据清洗、文本处理和数据分析场景,为开发者提供强大的工具支持。 字符串函数简介请参考表1,更多内容参考Apache Flink。 表1 字符串函数 函数 返回类型 描述 string1 || string2 STRING 返回两个字符串的拼接 CHAR_LENGTH(string) CHARACTER_LENGTH(string) INT 返回字符串中的字符数量 UPPER(string) STRING 返回字符串的大写形式 LOWER(string) STRING 返回字符串的小写形式 POSITION(string1 IN string2) INT 返回第一个字符串在第二个字符串中首次出现的位置。若第一个字符串不存在与第二个字符串,则返回0 TRIM([ BOTH | LEADING | TRAILING ] string1 FROM string2) STRING 去除string2字符串的首尾(或首部、或尾部)的string1字符串 LTRIM(string) STRING 返回去除首部空格后的字符串 例如LTRIM(' This is a test String.') 返回"This is a test String." RTRIM(string) STRING 返回去除尾部空格后的字符串 例如RTRIM('This is a test String. ') 返回"This is a test String." REPEAT(string, integer) STRING 返回integer个string连接后的字符串 例如REPEAT('This is a test String.', 2) 返回"This is a test String.This is a test String." REGEXP_REPLACE(string1, string2, string3) STRING 用string3代替string1中的符合正则表达式string2的字符串,并返回替换后的string1字符串 例如REGEXP_REPLACE('foobar', 'oo|ar', '') 返回"fb" REGEXP_REPLACE('ab\ab', '\\', 'e')返回"abeab" OVERLAY(string1 PLACING string2 FROM integer1 [ FOR integer2 ]) STRING 用string2代替string1中的字符串,从integer1开始,替换长度为integer2,并返回替换后的string1字符串 integer2默认为string2的长度 例如OVERLAY('This is an old string' PLACING ' new' FROM 10 FOR 5)返回"This is a new string" SUBSTRING(string FROM integer1 [ FOR integer2 ]) STRING 返回string中从integer1位置开始的长度为integer2的子字符串。若integer2未配置,则默认返回从integer1开始到末尾的子字符串 REPLACE(string1, string2, string3) STRING 用string3代替string1中的string2后的字符串,并返回替换后的string1字符串 例如:REPLACE('hello world', 'world', 'flink') 返回"hello flink" REPLACE('ababab', 'abab', 'z') 返回"zab" REPLACE('ab\\ab', '\\', 'e')返回"abeab" REGEXP_EXTRACT(string1, string2[, integer]) STRING 使用正则表达式string2匹配抽取字符串string1中的第integer个字串,integer从1开始,正则匹配提取。 若参数为 NULL或者正则不合法,则返回NULL。 例如REGEXP_EXTRACT('foothebar', 'foo(.*?)(bar)', 2)" 返回"bar" INITCAP(string) STRING 返回将字符串的首字符大写其余字符转为小写后的字符串 CONCAT(string1, string2,...) STRING 返回将两个或多个字符串拼接后的新字符串。 例如 CONCAT('AA', 'BB', 'CC') 返回"AABBCC" CONCAT_WS(string1, string2, string3,...) STRING 返回将每个参数和第一个参数指定的分隔符依次连接到一起组成的字符串。若string1是null,则返回null。若其他参数为null,在执行拼接过程中跳过取值为null的参数 例如CONCAT_WS('~', 'AA', NULL, 'BB', '', 'CC') 返回"AA~BB~~CC" LPAD(string1, integer, string2) STRING 将string2字符串拼接到string1字符串的左端,直到新的字符串达到指定长度integer为止 任意参数为null时,返回null 若integer为负数,则返回null 若integer不大于string1的长度,则返回string1裁剪为integer长度的字符串 例如LPAD('hi',4,'??') 返回"??hi" LPAD('hi',1,'??') 返回"h" RPAD(string1, integer, string2) STRING 将string2字符串拼接到string1字符串的右端,直到新的字符串达到指定长度integer为止 任意参数为null时,返回null 若integer为负数,则返回null 若integer不大于string1的长度,则返回string1裁剪为integer长度的字符串 例如RPAD('hi',4,'??') 返回 "hi??" RPAD('hi',1,'??') 返回"h" FROM_BASE64(string) STRING 将base64编码的字符串str解析成对应字符串 若字符串为null,则返回null 例如FROM_BASE64('aGVsbG8gd29ybGQ=') 返回"hello world" TO_BASE64(string) STRING 将字符串基于base64编码 若字符串为null,则返回null 例如TO_BASE64('hello world') 返回"aGVsbG8gd29ybGQ=" ASCII(string) INT 返回字符串的第一个字符的ASCII值 若字符串为null,则返回null 例如ascii('abc') 返回97 ascii(CAST(NULL AS VARCHAR)) 返回NULL CHR(integer) STRING 将ASCII码转换为字符 若integer大于255,则计算出integer除以255的余数,并将余数作为ASCII码值 若integer为null,则返回null chr(97) 返回a chr(353) 返回a DECODE(binary, string) STRING 使用提供的字符集string解码参数binary,字符集可以为'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16' 若任意参数为null,则返回null ENCODE(strinh1, string2) STRING 使用提供的字符集string2编码字符串string1,字符集可以为'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16' 若任意参数为null,则返回null INSTR(string1, string2) INT 返回string2在string1中首次出现的位置 若有参数为null,则返回null LEFT(string, integer) STRING 返回最左边的integer个字符 若integer为负数,则返回空 若存在参数为null,则返回null RIGHT(string, integer) STRING 返回最右侧的integer个字符 若integer为负数,则返回空 若存在参数为null,则返回null LOCATE(string1, string2[, integer]) INT 返回string1在string2的位置integer之后首次出现的位置 若string1在string2的位置integer之后不存在,则返回0 若integer不存在,则默认为0 若存在参数为null,则返回null PARSE_URL(string1, string2[, string3]) STRING 返回URL string1中指定的部分解析后的值 string2为'HOST'、'PATH'、'QUERY'、'REF'、'PROTOCOL'、'AUTHORITY'、'FILE'或'USERINFO' 若存在参数为null,则返回null 若string2为QUERY,也可以指定QUERY中的key为string3 例如: parse_url('http://facebook.com/path1/p.php?k1=v1&k2=v2#Ref1', 'HOST')返回 'facebook.com' parse_url('http://facebook.com/path1/p.php?k1=v1&k2=v2#Ref1', 'QUERY', 'k1') 返回'v1' REGEXP(string1, string2) BOOLEAN 对指定的字符串执行一个正则表达式搜索,并返回一个BOOLEAN值表示是否找到指定的匹配模式。若找到,则返回TRUE。其中string1表示指定的字符串,string2表示正则表达式 若存在参数为null,则返回null REVERSE(string) STRING 反转字符串,返回字符串值的相反顺序。 若存在参数为null,则返回null SPLIT_INDEX(string1, string2, integer1) STRING 以string2作为分隔符,将字符串string1分割成若干段,取其中的第integer1段。integer1从0开始 若integer1为负数,则返回null 如果任一参数为null,则返回null STR_TO_MAP(string1[, string2, string3]]) MAP 使用string2分隔符将string1分割成K-V对,并使用string3分隔每个K-V对,组装成MAP返回 string2默认为',' string3默认为'=' SUBSTR(string[, integer1[, integer2]]) STRING 截取从位置integer1开始,长度为integer2的子串,并返回 若为指定integer2,翻截取到字符串结尾 JSON_VAL(STRING json_string, STRING json_path) STRING 从json形式的字符串json_string中提取指定json_path的值。具体函数使用可以参考JSON_VAL函数使用说明说明。 说明: 以下规则优先级按照顺序从高到低。 不允许json_string和json_path为NULL json_string格式必须为合法的json串,否则函数返回NULL json_string为空字符串,则函数返回空字符串 json_path为空字串或路径不存在,则函数返回NULL
  • 语法格式 create table dorisSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'doris', 'fenodes' = 'FE_IP:PORT,FE_IP:PORT,FE_IP:PORT', 'table.identifier' = 'database.table', 'username' = 'dorisUsername', 'password' = 'dorisPassword' );
  • 前提条件 该场景作业需要运行在DLI的独享队列上,因此要与Doris建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 如果使用 MRS Doris,请在增强型跨源的主机信息中添加MRS集群所有节点的主机ip信息。 详细操作请参考《数据湖探索用户指南》中的“修改主机信息”章节描述。 集群未启用Kerberos认证(普通模式) 使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 集群未启用Kerberos认证(普通模式) Doris的表名是区分大小写。 使用cloudTable的doris时,'fenodes'字段值的端口请用8030,如'xx:8030'。同时安全组请放开端口8030,8040,9030。 开启HTTPS后,需要在创建表的with子句中添加如下配置参数: 'doris.enable.https' = 'true' 'doris.ignore.https.ca' = 'true' 请在Flink“作业编辑”页面选择“运行参数配置”,选择“开启Checkpoint”,否则会导致Doris结果表无法写入数据,且写入Doris的延时取决于设置的Checkpoint的间隔时间。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全