华为云用户手册

  • 示例二 使用datagen随机生成数据写入obs的bucketName桶下的fileName目录中。文件生成时间与checkpoint有关,达到checkpoint间隔或达到100MB时,生成新文件。 create table orders( name string, num INT ) with ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.name.kind' = 'random', 'fields.name.length' = '5' ); CREATE TABLE sink_table ( name string, num INT ) WITH ( 'connector' = 'filesystem', 'path' = 'obs://bucketName/fileName', 'format' = 'csv', 'sink.rolling-policy.file-size'='128m', 'sink.rolling-policy.rollover-interval'='30 min', 'auto-compaction'='true', 'compaction.file-size'='100m' ); INSERT into sink_table SELECT * from orders;
  • 参数说明 表1 参数说明 参数 是否必选 默认值 类型 说明 connector 是 无 String 固定位filesystem。 path 是 无 String OBS路径。 format 是 无 String 文件格式。 支持csv、parquet格式。 sink.rolling-policy.file-size 否 128MB MemorySize 单个part文件最大大小,超过该数值会滚动产生新文件。 说明: RollingPolicy 定义了何时关闭给定的In-progress Part文件,并将其转换为Pending状态,然后在转换为Finished状态。 Finished状态的文件,可供查看并且可以保证数据的有效性,在出现故障时不会恢复。 在STREAMING模式下,滚动策略结合Checkpoint间隔(到下一个Checkpoint成功时,文件的Pending状态才转换为Finished状态)共同控制Part文件对下游readers是否可见以及这些文件的大小和数量。 sink.rolling-policy.rollover-interval 否 30 min Duration 单个Part文件处于打开状态的最长时间,超过该时间会滚动产生新文件(默认值30分钟,以避免产生大量小文件)。检查频率是通过sink.rolling-policy.check-interval参数控制的。 说明: 该参数数字与单位之间必须要有空格。 支持的时间单位包括: d,h,min,s,ms等。 对于bulk格式的文件(parquet、orc、avro),checkpoint的时间间隔也会控制单个part文件打开的最长时间。 sink.rolling-policy.check-interval 否 1 min Duration 基于时间的滚动策略的检查间隔。 该属性控制了基于sink.rolling-policy.rollover-interval属性检查文件是否该被滚动的检查频率。 auto-compaction 否 false Boolean 在流式 sink 中是否开启自动合并功能。数据首先会被写入临时文件。当checkpoint完成后,该checkpoint产生的临时文件会被合并。 compaction.file-size 否 `sink.rolling-policy.file-size`的大小 MemorySize 合并目标文件大小,默认值为滚动文件大小。 说明: 只有在同个checkpoint内的文件会被合并,因此最终文件的数量至少等于checkpoint的数量。 如果合并时间较长,可能会引起反压,延长checkpoint所需时间。 开启该功能后,checkpoint时会产生最终文件,并打开新的文件接收下个checkpoint产生的数据。
  • 功能描述 FileSystem sink用于将数据输出到分布式文件系统HDFS或者 对象存储服务 OBS等文件系统。适用于数据转储、大数据分析、备份或活跃归档、深度或冷归档等场景。 考虑到输入流可以是无界的,每个桶中的数据被组织成有限大小的Part文件。完全可以配置为基于时间的方式往桶中写入数据,比如可以设置每个小时的数据写入一个新桶中。即桶中将包含一个小时间隔内接收到的记录。 桶目录中的数据被拆分成多个Part文件。对于相应的接收数据的桶的Sink的每个Subtask,每个桶将至少包含一个Part文件。将根据配置的滚动策略来创建其他Part文件。对于Row Formats默认的策略是根据Part文件大小进行滚动,需要指定文件打开状态最长时间的超时以及文件关闭后的非活动状态的超时时间。对于Bulk Formats在每次创建Checkpoint时进行滚动,并且用户也可以添加基于大小或者时间等的其他条件。 在STREAMING模式下使用FileSink需要开启Checkpoint功能。Part文件只在Checkpoint成功时生成。如果没有开启Checkpoint功能,文件将永远停留在in-progress或者pending的状态,并且下游系统将不能安全读取该文件数据。 sink end算子的接受记录数为checkpoint的个数,非实际的发送数据,实际发送数据量请参考streaming-writer或StreamingFileWriter算子的记录数。
  • 示例 图片分类预测我们采用Mnist数据集作为流的输入,通过加载预训练的deeplearning4j模型或者keras模型,可以实时预测每张图片代表的数字。 1 2 3 4 5 6 CREATE SOURCE STREAM Mnist( image Array[TINYINT] ) SELECT DL_IMAGE_MAX_PREDICTION_INDEX(image, 'your_dl4j_model_path', false) FROM Mnist SELECT DL_IMAGE_MAX_PREDICTION_INDEX(image, 'your_keras_model_path', true) FROM Mnist SELECT DL_IMAGE_MAX_PREDICTION_INDEX(image, 'your_keras_model_config_path', 'keras_weights_path') FROM Mnist 文本分类预测我们采用一组新闻标题数据作为流的输入,通过加载预训练的deeplearning4j模型或者keras模型,可以实时预测每个新闻标题所属的类别,比如经济,体育,娱乐等。 1 2 3 4 5 6 7 CREATE SOURCE STREAM News( title String ) SELECT DL_TEXT_MAX_PREDICTION_INDEX(title, 'your_dl4j_word2vec_model_path','your_dl4j_model_path', false) FROM News SELECT DL_TEXT_MAX_PREDICTION_INDEX(title, 'your_keras_word2vec_model_path','your_keras_model_path', true) FROM News SELECT DL_TEXT_MAX_PREDICTION_INDEX(title, 'your_dl4j_model_path', false) FROM New SELECT DL_TEXT_MAX_PREDICTION_INDEX(title, 'your_keras_model_path', true) FROM New
  • 参数说明 表1 参数说明 参数 是否必选 说明 field_name 是 数据在数据流中的字段名。 图像分类中field_name类型需声明为ARRAY[TINYINT]。 文本分类中field_name类型需声明为String。 model_path 是 模型存放在OBS上的完整路径,包括模型结构和模型权值。 is_dl4j_model 是 是否是deeplearning4j的模型。 true代表是deeplearning4j,false代表是keras模型。 keras_model_config_path 是 模型结构存放在OBS上的完整路径。在keras中通过model.to_json()可得到模型结构。 keras_weights_path 是 模型权值存放在OBS上的完整路径。在keras中通过model.save_weights(filepath)可得到模型权值。 word2vec_path 是 word2vec模型存放在OBS上的完整路径。
  • 语法格式 1 2 3 4 5 6 7 -- 图像分类, 返回预测图像分类的类别id DL_IMAGE_MAX_PREDICTION_INDEX(field_name, model_path, is_dl4j_model) DL_IMAGE_MAX_PREDICTION_INDEX(field_name, keras_model_config_path, keras_weights_path) -- 适用于Keras模型 -- 文本分类,返回预测文本分类的类别id DL_TEXT_MAX_PREDICTION_INDEX(field_name, model_path, is_dl4j_model) -- 采用默认word2vec模型 DL_TEXT_MAX_PREDICTION_INDEX(field_name, word2vec_path, model_path, is_dl4j_model) 模型及配置文件等需存储在用户的OBS中,路径格式为"obs://your_ak:your_sk@obs.your_obs_region.xxx.com:443/your_model_path"。例如你的模型存放在OBS上,桶名为dl_model,文件名为model.h5,则路径填写为"obs://your_ak:your_sk@obs.xxx.com:443/dl_model/model.h5"。
  • 注意事项 暂不支持通过python写UDF、UDTF、UDAF自定义函数。 如果使用IntelliJ IDEA工具对创建的自定义函数进行调试,则需要在IDEA上勾选:include dependencies with "Provided" scope,否则本地调试运行时会加载不到pom文件中的依赖包。 具体操作以IntelliJ IDEA版本2020.2为例,参考如下: 在IntelliJ IDEA界面,选择调试的配置文件,单击“Edit Configurations”。 在“Run/Debug Configurations”界面,勾选:include dependencies with "Provided" scope。 单击“OK”完成应用配置。
  • 按列GROUP BY 功能描述 按列进行分组操作。 语法格式 1 2 3 4 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] 语法说明 GROUP BY:按列可分为单列GROUP BY与多列GROUP BY。 单列GROUP BY:指GROUP BY子句中仅包含一列。 多列GROUP BY:指GROUP BY子句中不止一列,查询语句将按照GROUP BY的所有字段分组,所有字段都相同的记录将被放在同一组中。 注意事项 GroupBy在流处理表中会产生更新结果 示例 根据score及name两个字段对表student进行分组,并返回分组结果。 1 2 insert into temp SELECT name,score, max(score) FROM student GROUP BY name,score;
  • 表达式GROUP BY 功能描述 按表达式对流进行分组操作。 语法格式 1 2 3 4 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] 语法说明 groupItem:可以是单字段,多字段,也可以是字符串函数等调用,不能是聚合函数。 注意事项 无 示例 先利用substring函数取字段name的子字符串,并按照该子字符串进行分组,返回每个子字符串及对应的记录数。 1 2 insert into temp SELECT substring(name,6),count(name) FROM student GROUP BY substring(name,6);
  • Grouping sets, Rollup, Cube 功能描述 GROUPING SETS 的 GROUP BY 子句可以生成一个等效于由多个简单 GROUP BY 子句的 UNION ALL 生成的结果集,并且其效率比 GROUP BY 要高。 ROLLUP与CUBE按一定的规则产生多种分组,然后按各种分组统计数据。 CUBE生成的结果集显示了所选列中值的所有组合的聚合。 Rollup生成的结果集显示了所选列中值的某一层次结构的聚合。 语法格式 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY groupingItem] 语法说明 groupingItem:是Grouping sets(columnName [, columnName]*)、Rollup(columnName [, columnName]*)、Cube(columnName [, columnName]*) 注意事项 无 示例 分别产生基于user和product的结果 INSERT INTO temp SELECT SUM(amount) FROM Orders GROUP BY GROUPING SETS ((user), (product));
  • 示例 偏航检测样例: 1 2 3 4 INSERT INTO yaw_warning SELECT "The car is yawing" FROM driver_behavior WHERE NOT ST_WITHIN(ST_POINT(cast(Longitude as DOUBLE), cast(Latitude as DOUBLE)), ST_BUFFER(ST_LINE(ARRAY[ST_POINT(34.585555,105.725221),ST_POINT(34.586729,105.735974),ST_POINT(34.586492,105.740538),ST_POINT(34.586388,105.741651),ST_POINT(34.586135,105.748712),ST_POINT(34.588691,105.74997)]),0.001));
  • IP地理函数 当前仅支持IPV4的IP地址。 表6 IP地理函数表 函数 返回值 说明 IP_TO_COUNTRY STRING 获取IP地址所在的国家名称。 IP_TO_PROVINCE STRING 获取IP地址所在的省份。 用法说明: IP_TO_PROVINCE(STRING ip):返回IP地址所在的省份。 IP_TO_PROVINCE(STRING ip, STRING lang):以指定语言返回IP地址所在的省份。 说明: 当IP无法被解析到省份时,返回该IP所属的国家。当IP无法被解析时,返回“未知”。 函数返回的省份名称均为简称。 中文参考如下链接:http://www.gov.cn/guoqing/2005-09/13/content_5043917.htm IP_TO_CITY STRING 获取IP地址所在的城市名称。 说明: 当IP无法被解析到城市时,返回该IP所属的省份或者国家。当IP无法被解析时,返回“未知”。 IP_TO_CITY_GEO STRING 获取IP地址所在城市的经纬度,格式为“纬度,经度”。 用法说明: IP_TO_CITY_GEO(STRING ip):返回IP所在城市的经纬度。
  • 函数说明 基本地理空间几何元素介绍说明如表1所示。 表1 基本地理空间几何元素表 地理空间几何元素(统称geometry) 说明 举例 ST_POINT(latitude, longitude) 地理点,包含经度和维度两个信息。 ST_POINT(1.12012, 1.23401) ST_LINE(array[point1...pointN]) 地理线,由多个地理点(ST_POINT)按顺序连接成的折线或直线。 ST_LINE(ARRAY[ST_POINT(1.12, 2.23), ST_POINT(1.13, 2.44), ST_POINT(1.13, 2.44)]) ST_POLYGON(array[point1...point1]) 地理多边形,由首尾相同的多个地理点(ST_POINT)按顺序连线围成的封闭多边形区域。 ST_POLYGON(ARRAY[ST_POINT(1.0, 1.0), ST_POINT(2.0, 1.0), ST_POINT(2.0, 2.0), ST_POINT(1.0, 1.0)]) ST_CIRCLE(point, radius) 地理圆形,由圆心地理点(ST_POINT)和半径构成的地理圆形区域。 ST_CIRCLE(ST_POINT(1.0, 1.0), 1.234) 用户可以以基本地理空间几何元素为基础,构造复杂的地理空间几何元素,具体的变换方法见表2。 表2 基于基本地理空间几何元素构造复杂几何元素的变换表 变换方法 说明 举例 ST_BUFFER(geometry, distance) 创建一个环绕包含给定地理空间几何元素的多边形,并以给定距离作为环绕距离,通常使用该函数构造一定宽度的公路范围用于偏航检测。 ST_BUFFER(ST_LINE(ARRAY[ST_POINT(1.12, 2.23), ST_POINT(1.13, 2.44), ST_POINT(1.13, 2.44)]),1.0) ST_INTERSECTION(geometry, geometry) 创建一个多边形,其范围为给定的两个地理空间几何元素的交叠区域。 ST_INTERSECTION(ST_CIRCLE(ST_POINT(1.0, 1.0), 2.0), ST_CIRCLE(ST_POINT(3.0, 1.0), 1.234)) ST_ENVELOPE(geometry) 创建一个包含给定的地理空间几何元素的最小矩形。 ST_ENVELOPE(ST_CIRCLE(ST_POINT(1.0, 1.0), 2.0)) DLI 提供丰富的对地理空间几何元素的操作和位置判断函数,具体的SQL标量函数介绍说明见表3。 表3 SQL标量函数表 函数 返回值 说明 ST_DISTANCE(point_1, point_2) DOUBLE 计算两个地理点之间的欧几里得距离。 示例如下: Select ST_DISTANCE(ST_POINT(x1, y1), ST_POINT(x2, y2)) FROM input ST_GEODESIC_DISTANCE(point_1, point_2) DOUBLE 计算两个地理点之间的测地距离,即两个地理点之间地表最短路径距离。 示例如下: Select ST_GEODESIC_DISTANCE(ST_POINT(x1, y1), ST_POINT(x2, y2)) FROM input ST_PERIMETER(polygon) DOUBLE 计算多边形的周长。 示例如下: Select ST_PERIMETER(ST_POLYGON(ARRAY[ST_POINT(x11, y11), ST_POINT(x12, y12), ST_POINT(x11, y11)]) FROM input ST_AREA(polygon) DOUBLE 计算多边形区域的面积。 示例如下: Select ST_AREA(ST_POLYGON(ARRAY[ST_POINT(x11, y11), ST_POINT(x12, y12), ST_POINT(x11, y11)]) FROM input ST_OVERLAPS(polygon_1, polygon_2) BOOLEAN 判断一个多边形是否与另一个多边形有重叠区域。 示例如下: SELECT ST_OVERLAPS(ST_POLYGON(ARRAY[ST_POINT(x11, y11), ST_POINT(x12, y12), ST_POINT(x11, y11)]), ST_POLYGON(ARRAY[ST_POINT(x21, y21), ST_POINT(x22, y22), ST_POINT(x23, y23), ST_POINT(x21, y21)])) FROM input ST_INTERSECT(line1, line2) BOOLEAN 检查两条线段是否相互交叉,而非线条所在的直线是否交叉。 示例如下: SELECT ST_INTERSECT(ST_LINE(ARRAY[ST_POINT(x11, y11), ST_POINT(x12, y12)]), ST_LINE(ARRAY[ST_POINT(x21, y21), ST_POINT(x22, y22), ST_POINT(x23, y23)])) FROM input ST_WITHIN(point, polygon) BOOLEAN 一个点是否包含在几何体(多边形或圆形)内。 示例如下: SELECT ST_WITHIN(ST_POINT(x11, y11), ST_POLYGON(ARRAY[ST_POINT(x21, y21), ST_POINT(x22, y22), ST_POINT(x23, y23), ST_POINT(x21, y21)])) FROM input ST_CONTAINS(polygon_1, polygon_2) BOOLEAN 判断第一个几何体是否包含第二个几何体。 示例如下: SELECT ST_CONTAINS(ST_POLYGON(ARRAY[ST_POINT(x11, y11), ST_POINT(x12, y12), ST_POINT(x11, y11)]), ST_POLYGON(ARRAY[ST_POINT(x21, y21), ST_POINT(x22, y22), ST_POINT(x23, y23), ST_POINT(x21, y21)])) FROM input ST_COVERS(polygon_1, polygon_2) BOOLEAN 第一个几何体是否覆盖第二个几何体。与ST_CONTAINS相似,但在边界重叠情况下ST_COVER判断为TRUE,ST_CONTAINS判断为FALSE。 示例如下: SELECT ST_COVERS(ST_POLYGON(ARRAY[ST_POINT(x11, y11), ST_POINT(x12, y12), ST_POINT(x11, y11)]), ST_POLYGON([ST_POINT(x21, y21), ST_POINT(x22, y22), ST_POINT(x23, y23), ST_POINT(x21, y21)])) FROM input ST_DISJOINT(polygon_1, polygon_2) BOOLEAN 判断一个多边形是否与另一个多边形不相交(不重叠)。 示例如下: SELECT ST_DISJOINT(ST_POLYGON(ARRAY[ST_POINT(x11, y11), ST_POINT(x12, y12), ST_POINT(x11, y11)]), ST_POLYGON(ARRAY[ST_POINT(x21, y21), ST_POINT(x22, y22), ST_POINT(x23, y23), ST_POINT(x21, y21)])) FROM input 地理函数的基准坐标系标准为全球通用的GPS坐标系标准WGS84,GPS坐标不能直接在百度地图(BD09标准)或者google地图(GCJ02标准)上使用,会有偏移现象,为了在不同地理坐标系之间切换,DLI提供了坐标系转换的一系列函数,并且还提供地理距离与米之间的转换函数。详见表4。 表4 地理坐标系转换函数与距离单位转换函数表 函数 返回值 说明 WGS84_TO_BD09(geometry) 对应的百度地图坐标系地理空间几何元素 将GPS坐标系下的地理空间几何元素转换成百度地图坐标系下对应的地理空间几何元素。示例如下: WGS84_TO_BD09(ST_CIRCLE(ST_POINT(x, y), r)) WGS84_TO_CJ02(geometry) 对应的Google地图坐标系地理空间几何元素 将GPS坐标系下的地理空间几何元素转换成Google地图坐标系下对应的地理空间几何元素。示例如下: WGS84_TO_CJ02(ST_CIRCLE(ST_POINT(x, y), r)) BD09_TO_WGS84(geometry) 对应的GPS坐标系地理空间几何元素 将百度地图坐标系下的地理空间几何元素转换成GPS坐标系下对应的地理空间几何元素。示例如下: BD09_TO_WGS84(ST_CIRCLE(ST_POINT(x, y), r)) BD09_TO_CJ02(geometry) 对应的Google地图坐标系地理空间几何元素 将百度地图坐标系下的地理空间几何元素转换成Google地图坐标系下对应的地理空间几何元素。示例如下: BD09_TO_CJ02(ST_CIRCLE(ST_POINT(x, y), r)) CJ02_TO_WGS84(geometry) 对应的GPS坐标系地理空间几何元素 将Google地图坐标系下的地理空间几何元素转换成GPS坐标系下对应的地理空间几何元素。示例如下: CJ02_TO_WGS84(ST_CIRCLE(ST_POINT(x, y), r)) CJ02_TO_BD09(geometry) 对应的百度地图坐标系地理空间几何元素 将Google地图坐标系下的地理空间几何元素转换成百度地图坐标系下对应的地理空间几何元素。示例如下: CJ02_TO_BD09(ST_CIRCLE(ST_POINT(x, y), r)) DEGREE_TO_METER(distance) DOUBLE 将地理函数的距离数值转换成以“米”为单位的数值。示例如下(以米为单位计算地理三边形周长): DEGREE_TO_METER(ST_PERIMETER(ST_POLYGON(ARRAY[ST_POINT(x1,y1), ST_POINT(x2,y2), ST_POINT(x3,y3), ST_POINT(x1,y1)]))) METER_TO_DEGREE(numerical_value) DOUBLE 将以“米”为单位的数值转换成地理函数可计算的距离单位数值。示例如下(画出以指定地理点为圆心,半径1公里的圆): ST_CIRCLE(ST_POINT(x,y), METER_TO_DEGREE(1000)) DLI还提供了基于窗口的SQL地理聚合函数用于SQL逻辑涉及窗口和聚合的场景。详见表5的介绍说明。 表5 时间相关SQL地理聚合函数表 函数 说明 举例 AGG_DISTANCE(point) 距离聚合函数,用于计算窗口内所有相邻地理点的距离总和。 SELECT AGG_DISTANCE(ST_POINT(x,y)) FROM input GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY) AVG_SPEED(point) 平均速度聚合函数,用于计算窗口内所有地理点组成的移动轨迹的平均速度,单位为“米/秒”。 SELECT AVG_SPEED(ST_POINT(x,y)) FROM input GROUP BY TUMBLE(proctime, INTERVAL '1' DAY)
  • 语法格式 1 2 3 4 5 6 7 create table printSink ( attr_name attr_type (',' attr_name attr_type) * (',' PRIMARY KEY (attr_name,...) NOT ENFORCED) ) with ( 'connector' = 'print', 'print-identifier' = '', 'standard-error' = '' );
  • 原生数据类型 Flink SQL支持原生数据类型,请参见表1。 表1 原生数据类型 数据类型 描述 存储空间 范围 VARCHAR 可变长度的字符 - - BOOLEAN 布尔类型 - TRUE/FALSE TINYINT 有符号整数 1字节 -128-127 SMALLINT 有符号整数 2字节 -32768-32767 INT 有符号整数 4字节 -2147483648~2147483647 INTEGER 有符号整数 4字节 -2147483648~2147483647 BIGINT 有符号整数 8字节 -9223372036854775808~9223372036854775807 REAL 单精度浮点型 4字节 - FLOAT 单精度浮点型 4字节 - DOUBLE 双精度浮点型 8字节 - DECIMAL 固定有效位数和小数位数的数据类型 - - DATE 日期类型,描述了特定的年月日,以yyyy-MM-dd格式表示,例如2014-05-29 - DATE类型不包含时间,所表示日期的范围为0000-01-01 to 9999-12-31 TIME 时间类型,以HH:mm:ss表示。 例如20:17:40 - - TIMESTAMP(3) 完整日期,包括日期和时间。 例如:1969-07-20 20:17:40 - - INTERVAL timeUnit [TO timeUnit] 时间间隔 例如:INTERVAL '1:5' YEAR TO MONTH, INTERVAL '45' DAY - -
  • 配置Event Time Event Time是指事件产生的时间,即数据产生时自带时间戳。 语法格式 1 2 3 CREATE SOURCE STREAM stream_name(...) WITH (...) TIMESTAMP BY {attr_name}.rowtime SET WATERMARK (RANGE {time_interval} | ROWS {literal}, {time_interval}); 语法说明 设置Event Time需要选定流中的某一个属性来作为时间戳,同时需要设置Watermark策略。 由于网络等原因,有时会导致乱序的产生;对于迟来的数据,需要Watermark来保证一个特定的时间后去触发Window进行计算。Watermark主要是用来处理乱序数据,流处理从事件产生,到发送到DLI服务,中间有一个过程。 Watermark有两种设置策略: 按时间周期 1 SET WATERMARK(range interval {time_unit}, interval {time_unit}) 按事件个数 1 SET WATERMARK(rows literal, interval {time_unit}) 一个逗号表示一个参数,第一个参数表示Watermark发送周期,第二个参数表示允许最大延迟时间。 注意事项 无。 示例 time2事件产生时间开始,每10s发送一次watermark,事件最大允许延迟时间为20s。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 CREATE SOURCE STREAM student_scores ( student_number STRING, /* 学号 */ student_name STRING, /* 姓名 */ subject STRING, /* 学科 */ score INT, /* 成绩 */ time2 TIMESTAMP ) WITH ( type = "dis", region = "", channel = "dliinput", partition_count = "1", encode = "csv", field_delimiter="," ) TIMESTAMP BY time2.rowtime SET WATERMARK (RANGE interval 10 second, interval 20 second); INSERT INTO score_greate_90 SELECT student_name, sum(score) over (order by time2 RANGE UNBOUNDED PRECEDING) FROM student_scores; 每收到10个数据发送一次watermark,事件最大允许延迟时间为20s。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 CREATE SOURCE STREAM student_scores ( student_number STRING, /* 学号 */ student_name STRING, /* 姓名 */ subject STRING, /* 学科 */ score INT, /* 成绩 */ time2 TIMESTAMP ) WITH ( type = "dis", region = "", channel = "dliinput", partition_count = "1", encode = "csv", field_delimiter="," ) TIMESTAMP BY time2.rowtime SET WATERMARK (ROWS 10, interval 20 second); INSERT INTO score_greate_90 SELECT student_name, sum(score) over (order by time2 RANGE UNBOUNDED PRECEDING) FROM student_scores;
  • 配置Processing Time Processing Time是指系统时间,与数据本身的时间戳无关,即在Flink算子内计算完成的时间。 语法格式 1 2 3 4 CREATE SOURCE STREAM stream_name(...) WITH (...) TIMESTAMP BY proctime.proctime; CREATE TEMP STREAM stream_name(...) TIMESTAMP BY proctime.proctime; 语法说明 设置Processing Time只需在timestamp by后配置proctime.proctime即可,后续可以直接使用proctime字段。 注意事项 无。 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 CREATE SOURCE STREAM student_scores ( student_number STRING, /* 学号 */ student_name STRING, /* 姓名 */ subject STRING, /* 学科 */ score INT /* 成绩 */ ) WITH ( type = "dis", region = "", channel = "dliinput", partition_count = "1", encode = "csv", field_delimiter="," )TIMESTAMP BY proctime.proctime; INSERT INTO score_greate_90 SELECT student_name, sum(score) over (order by proctime RANGE UNBOUNDED PRECEDING) FROM student_scores;
  • Union/Union ALL/Intersect/Except 语法格式 1 query UNION [ ALL ] | Intersect | Except query 语法说明 UNION返回多个查询结果的并集。 Intersect返回多个查询结果的交集。 Except返回多个查询结果的差集。 注意事项 集合运算是以一定条件将表首尾相接,所以其中每一个SELECT语句返回的列数必须相同,列的类型一定要相同,列名不一定要相同。 UNION默认是去重的,UNION ALL是不去重的。 示例 输出Orders1和Orders2的并集,不包含重复记录。 1 2 insert into temp SELECT * FROM Orders1 UNION SELECT * FROM Orders2;
  • IN 语法格式 1 2 3 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression WHERE column_name IN (value (, value)* ) | query 语法说明 IN操作符允许在where子句中规定多个值。若表达式在给定的表子查询中存在,则返回 true 。 注意事项 子查询表必须由单个列构成,且该列的数据类型需与表达式保持一致。 示例 输出Orders中NewProducts中product的user和amount信息。 1 2 3 4 5 insert into temp SELECT user, amount FROM Orders WHERE product IN ( SELECT product FROM NewProducts );
  • 示例 将car_infos数据输出到OBS的obs-sink桶下,输出目录为car_infos, 输出文件以greater_30作为文件名前缀,当单个文件超过100M时新起一个文件,同时数据输出用csv编码,使用逗号作为属性分隔符,换行符作为行分隔符。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 CREATE SINK STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH ( type = "obs", encode = "csv", region = "xxx", field_delimiter = ",", row_delimiter = "\n", obs_dir = "obs-sink/car_infos", file_prefix = "greater_30", rolling_size = "100m" ); orc编码格式示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CREATE SINK STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, car_timestamp LONG ) WITH ( type = "obs", region = "xxx", encode = "orc", obs_dir = "dli-append-2/obsorc", FILE_PREFIX = "es_info", max_record_num_per_file = "100000", dump_interval = "60" ); parquet编码示例请参考文件系统输出流(推荐)中的示例。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "obs", region = "", encode = "", field_delimiter = "", row_delimiter = "", obs_dir = "", file_prefix = "", rolling_size = "", rolling_interval = "", quote = "", array_bracket = "", append = "", max_record_num_per_file = "", dump_interval = "", dis_notice_channel = "" )
  • 功能描述 创建sink流将DLI数据输出到对象存储服务(OBS)。DLI可以将作业分析结果输出到OBS上。适用于大数据分析、原生云应用程序数据、静态网站托管、备份/活跃归档、深度/冷归档等场景。 对象存储服务(Object Storage Service,简称OBS)是一个基于对象的海量存储服务,为客户提供海量、安全、高可靠、低成本的数据存储能力。OBS的更多信息,请参见《对象存储服务控制台指南》。 推荐使用《文件系统输出流(推荐)》。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,“obs”表示输出到对象存储服务。 region 是 对象存储服务所在区域。 ak 否 访问密钥ID(Access Key ID)。访问密钥获取方式请参见我的凭证。 sk 否 Secret Access Key,与访问密钥ID结合使用的密钥。访问密钥获取方式请参见我的凭证。 encode 是 编码方式。当前支持csv/json/orc/avro/avro_merge/parquet格式。 field_delimiter 否 属性分隔符。 仅当编码方式为csv时需要配置,若不配置,默认分隔符为逗号。 row_delimiter 否 行分隔符。当编码格式为csv、json时需要设置。 json_config 否 当编码格式为json时,用户可以通过该参数来指定json字段和流定义字段的映射关系,格式为“field1=data_json.field1;field2=data_json.field2”。 obs_dir 是 文件存储目录。格式为{桶名}/{目录名}, 如obs-a1/dir1/subdir。当编码格式为csv(append为false)、json(append为false)、avro_merge、parquet时,支持参数化。 file_prefix 否 输出文件名前缀。生成的文件会以file_prefix.x的方式命名,如file_prefix.1、 file_prefix.2,若没有设置,默认文件前缀为temp。 rolling_size 否 单个文件最大允许大小。 说明: rolling_size和rolling_interval必须至少配一样或者都配置。 当文件大小超过设置size后,会生成新文件。 支持的单位包括KB/MB/GB,若没写单位,表示单位为字节数。 当编码格式为orc时不需要设置。 rolling_interval 否 数据保存到对应目录的时间模式。 说明: rolling_size和rolling_interval必须至少配一样或者都配置。 设置后数据会按照输出时间输出到相应时间目录下。 支持的格式为yyyy/MM/dd/HH/mm, 最小单位只到分钟,大小写敏感。例如配置为yyyy/MM/dd/HH, 则数据会写入对应小时这个时间点所产生的目录下,比如2018-09-10 16时产生的数据就会写到{obs_dir}/2018-09-10_16目录下。 当rolling_size和rolling_interval都配置时,表示每个时间所对应的目录下,单个文件超过设置大小时,另起新文件。 quote 否 修饰符,仅当编码格式为csv时可配置,配置后会在每个属性前后各加上修饰符,建议使用不可见字符配置,如"\u0007"。 array_bracket 否 数组括号,仅当编码格式为csv时可配置, 可选值为"()", "{}", "[]", 例如配置了"{}", 则数组输出格式为{a1,a2}。 append 否 值为true或者false,默认为true。 当OBS不支持append模式,且编码格式为csv和json时,可将该参数设置为false。Append为false时需要设置max_record_num_per_file和dump_interval。 max_record_num_per_file 否 文件最大记录数,当编码格式为csv(append为false)、json(append为false)、orc、avro、avro_merge和parquet时需配置,表明一个文件最多存储记录数,当达到最大值,则另起新文件。 dump_interval 否 触发周期, 当编码格式为orc或者配置了DIS通知提醒时需进行配置。 在orc编码方式中,该配置表示周期到达时,即使文件记录数未达到最大个数配置,也将文件上传到OBS上。 在DIS通知提醒功能中,该配置表示每周期往DIS发送一个通知提醒,表明该目录已写完。 dis_notice_channel 否 OBS目录完成通知通道。表示每周期往DIS通道中发送一条记录,该记录内容为OBS目录路径,表明该目录已书写完毕。 encoded_data 否 当编码格式为json(append为false)、avro_merge和parquet时,可通过配置该参数指定真正需要编码的数据,格式为${field_name},表示直接将该流字段的内容作为一个完整的记录进行编码。
  • 前提条件 Kafka服务端的端口如果监听在hostname上,则需要将Kafka Broker节点的hostname和IP的对应关系添加到DLI队列中。Kafka Broker节点的hostname和IP请联系Kafka服务的部署人员。如何添加IP域名映射,请参见《 数据湖探索 用户指南》中修改主机信息章节。 Kafka是线下集群,需要通过增强型跨源连接功能将Flink作业与Kafka进行对接。且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖 探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 语法格式 1 2 3 4 5 6 7 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH( type = "kafka", kafka_bootstrap_servers = "", kafka_topic = "", encode = "json" )
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,"kafka"表示输出到Kafka中。 kafka_bootstrap_servers 是 Kafka的连接端口,需要确保能连通(需要通过增强型跨源开通DLI队列和Kafka集群的连接)。 kafka_topic 是 写入的topic。 encode 是 编码格式,当前支持“json”和“user_defined”。 若编码格式为“user_defined”,则需配置“encode_class_name”和“encode_class_parameter”属性。 encode_class_name 否 当encode为user_defined时,需配置该参数,指定用户自实现编码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 encode_class_parameter 否 当encode为user_defined时,可以通过配置该参数指定用户自实现编码类的入参,仅支持一个string类型的参数。 krb_auth 否 创建跨源认证的认证名。开启kerberos认证时,需配置该参数。如果创建的 MRS 集群未开启kerb认证的集群,请确保在DLI队列host文件中添加MRS集群master节点的“/etc/hosts”信息。 kafka_properties 否 可通过该参数配置kafka的原生属性,格式为"key1=value1;key2=value2" kafka_certificate_name 否 跨源认证信息名称。跨源认证信息类型为“Kafka_SSL”时,该参数有效。 说明: 指定该配置项时,服务仅加载该认证下指定的文件和密码,系统将自动设置到“kafka_properties”属性中。 Kafka SSL认证需要的其他配置信息,需要用户手动在“kafka_properties”属性中配置。
  • 示例 将数据输出到Kafka中。 示例一 1 2 3 4 5 6 7 CREATE SINK STREAM kafka_sink (name STRING) WITH ( type="kafka", kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_topic = "testsink", encode = "json" ); 示例二 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 CREATE SINK STREAM kafka_sink ( a1 string, a2 string, a3 string, a4 INT ) // 输出字段 WITH ( type="kafka", kafka_bootstrap_servers = "192.x.x.x:9093, 192.x.x.x:9093, 192.x.x.x:9093", kafka_topic = "testflink", // 写入的topic encode = "csv", // 编码格式,支持json/csv kafka_certificate_name = "Flink", kafka_properties_delimiter = ",", kafka_properties = "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"xxx\";,sasl.mechanism=PLAIN,security.protocol=SASL_SSL" );
  • 示例 从Kafka名称为test的topic中读取数据。 1 2 3 4 5 6 7 8 9 10 11 CREATE SOURCE STREAM kafka_source ( name STRING, age int ) WITH ( type = "kafka", kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_group_id = "sourcegroup1", kafka_topic = "test", encode = "json" ); 从Kafka读取对象为test的topic,使用json_config将json数据和表字段对应。 数据编码格式为json且不含嵌套,例如: {"attr1": "lilei", "attr2": 18} 建表语句参考如下: 1 2 3 4 5 6 7 8 9 CREATE SOURCE STREAM kafka_source (name STRING, age int) WITH ( type = "kafka", kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_group_id = "sourcegroup1", kafka_topic = "test", encode = "json", json_config = "name=attr1;age=attr2" );
  • 前提条件 Kafka服务端的端口如果监听在hostname上,则需要将Kafka Broker节点的hostname和IP的对应关系添加到DLI队列中。Kafka Broker节点的hostname和IP请联系Kafka服务的部署人员。如何添加IP域名映射,请参见《数据湖探索用户指南》中修改主机信息章节。 Kafka是线下集群,需要通过增强型跨源连接功能将Flink作业与Kafka进行对接。且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 语法格式 1 2 3 4 5 6 7 8 CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "kafka", kafka_bootstrap_servers = "", kafka_group_id = "", kafka_topic = "", encode = "json" );
共100000条