华为云用户手册

  • 功能描述 DLI 将Flink作业的输出数据输出到分布式缓存服务(D CS )的Redis中。Redis是一种支持Key-Value等多种数据结构的存储系统。可用于缓存、事件发布或订阅、高速队列等场景,提供字符串、哈希、列表、队列、集合结构直接存取,基于内存,可持久化。有关Redis的详细信息,请访问Redis官方网站https://redis.io/。 分布式缓存服务(DCS)为DLI提供兼容Redis的即开即用、安全可靠、弹性扩容、便捷管理的在线分布式缓存能力,满足用户高并发及快速数据访问的业务诉求。 DCS的更多信息,请参见《分布式缓存服务用户指南》。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,dcs_redis表示输出到分布式缓存服务的Redis存储系统中。 region 是 数据所在的DCS所在区域。 cluster_address 是 Redis实例连接地址。 password 否 Redis实例连接密码,当设置为免密访问时,省略该配置项。 value_type 是 该参数可配置为如下选项或选项的组合: 支持指定插入数据类型,包括:string, list, hash, set, zset; 支持设置key的过期时间,包括expire, pexpire, expireAt, pexpireAt; 支持删除key命令,包括del, hdel; 当需要使用多个命令时,用“;”分隔。 key_value 是 设置具体的key和value,key_value对必须与value_type所指定的类型数相对应,用“;”分隔,且key和value均支持参数化,动态列名采用${列名}表示。
  • 前提条件 请务必确保您的账户下已在 数据仓库 服务(DWS)里创建了DWS集群。 如何创建DWS集群,请参考《数据仓库服务管理指南》中“创建集群”章节。 请确保已创建DWS数据库表。 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖探索 用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector.type' = 'gaussdb', 'connector.url' = '', 'connector.table' = '', 'connector.username' = '', 'connector.password' = '' );
  • 示例 使用gsjdbc4驱动连接时,加载的数据库驱动类为:org.postgresql.Driver。该驱动为默认,创建表时可以不填该驱动参数。 表car_info没有在schema下时。 1 2 3 4 5 6 7 8 9 10 11 12 create table dwsSource( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'gaussdb', 'connector.url' = 'jdbc:postgresql://xx.xx.xx.xx:8000/xx', 'connector.table' = 'car_info', 'connector.username' = 'xx', 'connector.password' = 'xx' ); 当DWS表test在名为test_schema的schema下时,可以参考如下样例。 1 2 3 4 5 6 7 8 9 10 11 12 create table dwsSource( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'gaussdb', 'connector.url' = 'jdbc:postgresql://xx.xx.xx.xx:8000/xx', 'connector.table' = 'test_schema\".\"test', 'connector.username' = 'xx', 'connector.password' = 'xx' ); 使用gsjdbc200驱动连接时,加载的数据库驱动类为:com.huawei.gauss200.jdbc.Driver。 当DWS表test在名为ads_game_sdk_base的schema下时,可以参考如下样例。 create table dwsSource( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'gaussdb', 'connector.table' = 'ads_game_sdk_base\".\"test', 'connector.driver' = 'com.huawei.gauss200.jdbc.Driver', 'connector.url' = 'jdbc:gaussdb://xx.xx.xx.xx:8000/xx', 'connector.username' = 'xx', 'connector.password' = 'xx' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,需配置为'gaussdb' connector.url 是 jdbc连接地址,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。DWS数据库版本为8.1.0以后的版本时,格式为:jdbc:gaussdb://${ip}:${port}/${dbName}。 connector.table 是 操作的表名。如果该DWS表在某schema下,则格式为:'schema\".\"具体表名',具体可以参考示例说明。 connector.driver 否 jdbc连接驱动,默认为: org.postgresql.Driver。 DWS数据库版本为8.1.0以后的版本时,连接驱动为:com.huawei.gauss200.jdbc.Driver。 connector.username 否 数据库认证用户名,需要和'connector.password'一起配置 connector.password 否 数据库认证密码,需要和'connector.username'一起配置 connector.read.partition.column 否 用于对输入进行分区的列名 与connector.read.partition.lower-bound、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.lower-bound 否 第一个分区的最小值 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.upper-bound 否 最后一个分区的最大值 与connector.read.partition.column、connector.read.partition.lower-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.num 否 分区的个数 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.upper-bound必须同时存在或者同时不存在 connector.read.fetch-size 否 每次从数据库拉取数据的行数。默认值为0,表示忽略该提示
  • 功能描述 DLI将Flink作业从数据仓库服务(DWS)中读取数据。DWS数据库内核兼容PostgreSQL,PostgreSQL数据库可存储更加复杂类型的数据,支持空间信息服务、多版本并发控制(MVCC)、高并发,适用场景包括位置应用、金融保险、互联网电商等。 数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。DWS的更多信息,请参见《数据仓库服务管理指南》。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 CREATE TABLE table_id ( attr_name attr_type (',' attr_name attr_type)* ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = '', 'connector.table' = '', 'connector.username' = '', 'connector.password' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 数据源类型,‘jdbc’表示使用JDBC connector,必须为jdbc connector.url 是 数据库的URL connector.table 是 读取数据库中的数据所在的表名 connector.driver 否 连接数据库所需要的驱动。若未配置,则会自动通过URL提取 connector.username 否 数据库认证用户名,需要和'connector.password'一起配置 connector.password 否 数据库认证密码,需要和'connector.username'一起配置 connector.read.partition.column 否 用于对输入进行分区的列名 与connector.read.partition.lower-bound、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.lower-bound 否 第一个分区的最小值 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.upper-bound 否 最后一个分区的最大值 与connector.read.partition.column、connector.read.partition.lower-bound、 connector.read.partition.num必须同时存在或者同时不存在 connector.read.partition.num 否 分区的个数 与connector.read.partition.column、connector.read.partition.upper-bound、 connector.read.partition.upper-bound必须同时存在或者同时不存在 connector.read.fetch-size 否 每次从数据库拉取数据的行数。默认值为0,表示忽略该提示。 connector.lookup.cache.max-rows 否 维表配置,缓存的最大行数,超过该值时,老的数据会被踢除。-1表示不使用缓存。 connector.lookup.cache.ttl 否 维表配置,缓存超时时间,超过该时间的数据会被剔除。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。 connector.lookup.max-retries 否 维表配置,数据拉取最大重试次数,默认为3。
  • 示例 RDS表用于与输入流连接。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 CREATE TABLE car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT, proctime as PROCTIME() ) WITH ( 'connector.type' = 'dis', 'connector.region' = 'cn-north-1', 'connector.channel' = 'disInput', 'format.type' = 'csv' ); CREATE TABLE db_info ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://xx.xx.xx.xx:3306/xx', 'connector.table' = 'jdbc_table_name', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'xxx', 'connector.password' = 'xxxxx' ); CREATE TABLE audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( 'connector.type' = 'dis', 'connector.region' = 'cn-north-1', 'connector.channel' = 'disOutput', 'connector.partition-key' = 'car_id,car_owner', 'format.type' = 'csv' ); INSERT INTO audi_cheaper_than_30w SELECT a.car_id, b.car_owner, b.car_brand, b.car_price FROM car_infos as a join db_info FOR SYSTEM_TIME AS OF a.proctime AS b on a.car_id = b.car_id;
  • OVER WINDOW Over Window与Group Window区别在于Over window每一行都会输出一条记录。 语法格式 1 2 3 4 5 6 7 8 9 10 11 SELECT agg1(attr1) OVER ( [PARTITION BY partition_name] ORDER BY proctime|rowtime ROWS BETWEEN (UNBOUNDED|rowCOUNT) PRECEDING AND CURRENT ROW FROM TABLENAME SELECT agg1(attr1) OVER ( [PARTITION BY partition_name] ORDER BY proctime|rowtime RANGE BETWEEN (UNBOUNDED|timeInterval) PRECEDING AND CURRENT ROW FROM TABLENAME 语法说明 表5 参数说明 参数 参数说明 PARTITION BY 指定分组的主键,每个分组各自进行计算。 ORDER BY 指定数据按processing time或event time作为时间戳。 ROWS 个数窗口。 RANGE 时间窗口。 注意事项 所有的聚合必须定义到同一个窗口中,即相同的分区、排序和区间。 当前仅支持 PRECEDING (无界或有界) 到 CURRENT ROW 范围内的窗口、FOLLOWING 所描述的区间并未支持。 ORDER BY 必须指定于单个的时间属性。 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 // 计算从规则启动到目前为止的计数及总和(in proctime) insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 FROM Orders; // 计算最近四条记录的计数及总和(in proctime) insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) as cnt2 FROM Orders; // 计算最近60s的计数及总和(in eventtime),基于事件时间处理,事件时间为Orders中的timeattr字段。 insert into temp SELECT name, count(amount) OVER (PARTITION BY name ORDER BY timeattr RANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW) as cnt1, sum(amount) OVER (PARTITION BY name ORDER BY timeattr RANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW) as cnt2 FROM Orders;
  • GROUP WINDOW 语法说明 Group Window定义在GROUP BY里,每个分组只输出一条记录,包括以下几种: 分组函数 表1 分组函数表 分组窗口函数 说明 TUMBLE(time_attr, interval) 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。 HOP(time_attr, interval, interval) 定义一个跳跃的时间窗口(在 Table API 中称为滑动窗口)。滑动窗口有一个固定的持续时间( 第二个 interval 参数 )以及一个滑动的间隔(第一个 interval 参数 )。若滑动间隔小于窗口的持续时间,滑动窗口则会出现重叠;因此,行将会被分配到多个窗口中。比如,一个大小为 15 分组的滑动窗口,其滑动间隔为 5 分钟,将会把每一行数据分配到 3 个 15 分钟的窗口中。滑动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。 SESSION(time_attr, interval) 定义一个会话时间窗口。会话时间窗口没有一个固定的持续时间,但是它们的边界会根据 interval 所定义的不活跃时间所确定;即一个会话时间窗口在定义的间隔时间内没有时间出现,该窗口会被关闭。例如时间窗口的间隔时间是 30 分钟,当其不活跃的时间达到30分钟后,若观测到新的记录,则会启动一个新的会话时间窗口(否则该行数据会被添加到当前的窗口),且若在 30 分钟内没有观测到新纪录,这个窗口将会被关闭。会话时间窗口可以使用事件时间(批处理、流处理)或处理时间(流处理)。 注意: 在流处理表中的 SQL 查询中,分组窗口函数的 time_attr 参数必须引用一个合法的时间属性,且该属性需要指定行的处理时间或事件时间。 time_attr设置为event-time时参数类型为timestamp(3)类型。 time_attr设置为processing-time时无需指定类型。 对于批处理的 SQL 查询,分组窗口函数的 time_attr 参数必须是一个timestamp类型的属性。 窗口辅助函数 可以使用以下辅助函数选择组窗口的开始和结束时间戳以及时间属性 表2 窗口辅助函数表 辅助函数 说明 TUMBLE_START(time_attr, interval) HOP_START(time_attr, interval, interval) SESSION_START(time_attr, interval) 返回相对应的滚动、滑动和会话窗口范围内的下界时间戳。 TUMBLE_END(time_attr, interval) HOP_END(time_attr, interval, interval) SESSION_END(time_attr, interval) 返回相对应的滚动、滑动和会话窗口范围以外的上界时间戳。 注意: 范围以外的上界时间戳不可以 在随后基于时间的操作中,作为行时间属性使用,比如基于时间窗口的join以及分组窗口或分组窗口上的聚合。 TUMBLE_ROWTIME(time_attr, interval) HOP_ROWTIME(time_attr, interval, interval) SESSION_ROWTIME(time_attr, interval) 返回的是一个可用于后续需要基于时间的操作的时间属性(rowtime attribute),比如基于时间窗口的join以及 分组窗口或分组窗口上的聚合。 TUMBLE_PROCTIME(time_attr, interval) HOP_PROCTIME(time_attr, interval, interval) SESSION_PROCTIME(time_attr, interval) 返回一个可用于后续需要基于时间的操作的 处理时间参数,比如基于时间窗口的join以及分组窗口或分组窗口上的聚合. 注意:辅助函数必须使用与GROUP BY 子句中的分组窗口函数完全相同的参数来调用. 示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 // 每天计算SUM(金额)(事件时间)。 insert into temp SELECT name, TUMBLE_START(ts, INTERVAL '1' DAY) as wStart, SUM(amount) FROM Orders GROUP BY TUMBLE(ts, INTERVAL '1' DAY), name; // 每天计算SUM(金额)(处理时间)。 insert into temp SELECT name, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), name; // 每个小时计算事件时间中最近24小时的SUM(数量)。 insert into temp SELECT product, SUM(amount) FROM Orders GROUP BY HOP(ts, INTERVAL '1' HOUR, INTERVAL '1' DAY), product; // 计算每个会话的SUM(数量),间隔12小时的不活动间隙(事件时间)。 insert into temp SELECT name, SESSION_START(ts, INTERVAL '12' HOUR) AS sStart, SESSION_END(ts, INTERVAL '12' HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(ts, INTERVAL '12' HOUR), name;
  • 示例 通过将student_info与course_info两张表中的课程编号匹配建立JOIN连接,来查看学生姓名及所选课程名称。 1 2 SELECT student_info.name, course_info.courseName FROM student_info JOIN course_info ON (student_info.courseId = course_info.courseId);
  • 示例 在DLI数据多版本中,恢复非分区表test_table数据到版本20210930。 1 RESTORE TABLE test_table TO VERSION '20210930'; 在DLI数据多版本中,恢复分区表test_table对应dt分区数据到版本20210930。 1 RESTORE TABLE test_table PARTITION (dt='2021-07-27') TO VERSION '20210930';
  • 示例 将流weather_out的数据输出到 表格存储服务 CloudTable的OpenTSDB中。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CREATE SINK STREAM weather_out ( timestamp_value LONG, /* 时间 */ temperature FLOAT, /* 温度值 */ humidity FLOAT, /* 湿度值 */ location STRING /* 地点 */ ) WITH ( type = "opentsdb", region = "xxx", cluster_id = "e05649d6-00e2-44b4-b0ff-7194adaeab3f", tsdb_metrics = "weather", tsdb_timestamps = "${timestamp_value}", tsdb_values = "${temperature}; ${humidity}", tsdb_tags = "location:${location},signify:temperature; location:${location},signify:humidity", batch_insert_data_num = "10" );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,“opentsdb”表示输出到 表格存储 服务CloudTable(OpenTSDB)。 region 是 表格存储服务所在区域。 cluster_id 否 待插入数据所属集群的id,该参数与tsdb_link_address必须指定其中一个。 tsdb_metrics 是 数据点的metric,支持参数化。 tsdb_timestamps 是 数据点的timestamp,数据类型支持LONG、INT、SHORT和STRING,仅支持指定动态列。 tsdb_values 是 数据点的value,数据类型支持SHORT、INT、LONG、FLOAT、DOUBLE和STRING,支持指定动态列或者常数值。 tsdb_tags 是 数据点的tags,每个tags里面至少一个标签值,最多8个标签值,支持参数化。 batch_insert_data_num 否 表示一次性批量写入的数据量(即数据条数),值必须为正整数,上限为65536,默认值为8。 tsdb_link_address 否 待插入数据所属集群的OpenTsdb链接地址,使用该参数时,作业需要运行在独享DLI队列,且DLI队列需要与CloudTable集群建立增强型跨源,该参数与cluster_id必须指定其中一个。 说明: 如何建立增强型跨源连接,请参考《 数据湖 探索用户指南》中增强型跨源连接章节。
  • 功能描述 DLI将Flink作业的输出数据输出到CloudTable的OpenTSDB中。OpenTSDB是基于HBase的分布式的,可伸缩的时间序列数据库。它存储的是时间序列数据,时间序列数据是指在不同时间点上收集到的数据,这类数据反映了一个对象随时间的变化状态或程度。支持秒级别数据的采集监控,进行永久存储,索引和查询,可用于系统监控和测量、物联网数据、金融数据和科学实验结果数据的收集监控。 表格存储服务(CloudTable),是基于Apache HBase提供的分布式、可伸缩、全托管的KeyValue数据存储服务,为DLI提供了高性能的随机读写能力,适用于海量结构化数据、半结构化数据以及时序数据的存储和查询应用,适用于物联网IOT应用和通用海量KeyValue数据存储与查询等场景。CloudTable的更多信息,请参见《表格存储服务用户指南》。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "opentsdb", region = "", cluster_id = "", tsdb_metrics = "", tsdb_timestamps = "", tsdb_values = "", tsdb_tags = "", batch_insert_data_num = "" )
  • 示例 完整的SQL作业提交流程您可以参考《快速入门》中的《提交SQL作业》等章节描述。 队列是使用DLI服务的基础,执行SQL前需要先创建队列。具体可以参考《用户指南》中的“创建队列”章节。 在DLI管理控制台,单击左侧导航栏中的“SQL编辑器”,可进入SQL作业“SQL编辑器”页面。 在“SQL编辑器”页面右侧的编辑窗口中,输入如下创建数据库的SQL语句,单击“执行”。阅读并同意隐私协议,单击“确定”。 若testdb数据库不存在,则创建数据库testdb。 1 CREATE DATABASE IF NOT EXISTS testdb;
  • 按列GROUP BY 功能描述 按列进行分组操作。 语法格式 1 2 3 4 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] 语法说明 GROUP BY:按列可分为单列GROUP BY与多列GROUP BY。 单列GROUP BY:指GROUP BY子句中仅包含一列。 多列GROUP BY:指GROUP BY子句中不止一列,查询语句将按照GROUP BY的所有字段分组,所有字段都相同的记录将被放在同一组中。 注意事项 GroupBy在流处理表中会产生更新结果 示例 根据score及name两个字段对表student进行分组,并返回分组结果。 1 2 insert into temp SELECT name,score, max(score) FROM student GROUP BY name,score;
  • 表达式GROUP BY 功能描述 按表达式对流进行分组操作。 语法格式 1 2 3 4 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] 语法说明 groupItem:可以是单字段,多字段,也可以是字符串函数等调用,不能是聚合函数。 注意事项 无 示例 先利用substring函数取字段name的子字符串,并按照该子字符串进行分组,返回每个子字符串及对应的记录数。 1 2 insert into temp SELECT substring(name,6),count(name) FROM student GROUP BY substring(name,6);
  • Grouping sets, Rollup, Cube 功能描述 GROUPING SETS 的 GROUP BY 子句可以生成一个等效于由多个简单 GROUP BY 子句的 UNION ALL 生成的结果集,并且其效率比 GROUP BY 要高。 ROLLUP与CUBE按一定的规则产生多种分组,然后按各种分组统计数据。 CUBE生成的结果集显示了所选列中值的所有组合的聚合。 Rollup生成的结果集显示了所选列中值的某一层次结构的聚合。 语法格式 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY groupingItem] 语法说明 groupingItem:是Grouping sets(columnName [, columnName]*)、Rollup(columnName [, columnName]*)、Cube(columnName [, columnName]*) 注意事项 无 示例 分别产生基于user和product的结果 INSERT INTO temp SELECT SUM(amount) FROM Orders GROUP BY GROUPING SETS ((user), (product));
  • Union/Union ALL/Intersect/Except 语法格式 1 query UNION [ ALL ] | Intersect | Except query 语法说明 UNION返回多个查询结果的并集。 Intersect返回多个查询结果的交集。 Except返回多个查询结果的差集。 注意事项 集合运算是以一定条件将表首尾相接,所以其中每一个SELECT语句返回的列数必须相同,列的类型一定要相同,列名不一定要相同。 UNION默认是去重的,UNION ALL是不去重的。 示例 输出Orders1和Orders2的并集,不包含重复记录。 1 2 insert into temp SELECT * FROM Orders1 UNION SELECT * FROM Orders2;
  • IN 语法格式 1 2 3 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression WHERE column_name IN (value (, value)* ) | query 语法说明 IN操作符允许在where子句中规定多个值。若表达式在给定的表子查询中存在,则返回 true 。 注意事项 子查询表必须由单个列构成,且该列的数据类型需与表达式保持一致。 示例 输出Orders中NewProducts中product的user和amount信息。 1 2 3 4 5 insert into temp SELECT user, amount FROM Orders WHERE product IN ( SELECT product FROM NewProducts );
  • 示例 建OBS表时仅有一个分区列,建表成功后添加分区数据。 先使用DataSource语法创建一个OBS分区表,分区列为external_data,数据存储在obs://bucketName/datapath路径下。 create table testobstable(id varchar(128), external_data varchar(16)) using JSON OPTIONS (path 'obs://bucketName/datapath') PARTITIONED by (external_data); 拷贝分区数据目录到obs://bucketName/datapath路径下。例如当前拷贝external_data=22的分区目录下所有文件到obs://bucketName/datapath路径下。 执行添加分区命令,将分区的元数据信息生效。 ALTER TABLE testobstable ADD PARTITION (external_data='22') LOCATION 'obs://bucketName/datapath/external_data=22'; 添加分区成功后,即可根据分区列进行数据查询等操作。 select * from testobstable where external_data='22'; 建OBS表时有多个分区列,建表成功后添加分区数据。 先使用DataSource语法创建一个OBS分区表,分区列为external_data和dt,数据存储在obs://bucketName/datapath路径下。 1 2 3 4 5 create table testobstable( id varchar(128), external_data varchar(16), dt varchar(16) ) using JSON OPTIONS (path 'obs://bucketName/datapath') PARTITIONED by (external_data, dt); 拷贝分区数据目录到obs://bucketName/datapath路径下。例如拷贝external_data=22及其子目录dt=2021-07-27和目录下文件到obs://bucketName/datapath路径下。 执行添加分区命令,将分区的元数据信息生效。 1 2 3 4 ALTER TABLE testobstable ADD PARTITION (external_data = '22', dt = '2021-07-27') LOCATION 'obs://bucketName/datapath/external_data=22/dt=2021-07-27'; 添加分区成功后,即可根据分区列进行数据查询等操作。 1 2 select * from testobstable where external_data = '22'; select * from testobstable where external_data = '22' and dt='2021-07-27';
  • 注意事项 向表中添加分区时,此表和分区列(建表时PARTITIONED BY指定的列)必须已存在,而所要添加的分区不能重复添加,否则将出错。已添加的分区可通过IF NOT EXISTS避免报错。 若分区表是按照多个字段进行分区的,添加分区时需要指定所有的分区字段,指定字段的顺序可任意。 “partition_specs”中的参数默认带有“( )”。例如:PARTITION (dt='2009-09-09',city='xxx')。 在添加分区时若指定OBS路径,则该OBS路径必须是已经存在的,否则会出错。 若添加多个分区,每组PARTITION partition_specs LOCATION 'obs_path'之间用空格隔开。例如: PARTITION partition_specs LOCATION 'obs_path' PARTITION partition_specs LOCATION 'obs_path'。 若新增分区指定的路径包含子目录(或嵌套子目录),则子目录下面的所有文件类型及内容也将作为该分区的记录。用户需要保证该分区目录下所有文件类型和文件内容与表的字段一致,否则查询将报错。
  • 功能描述 创建OBS分区表成功后,OBS表实际还没有生成分区信息。生成分区信息主要有以下两种场景: 给OBS分区表插入对应的分区数据,数据插入成功后OBS表才会生成分区元数据信息,后续则可以根据对应分区列进行查询等操作。 手工拷贝分区目录和数据到OBS分区表路径下,执行本章节介绍的分区添加命令生成分区元数据信息,后续即可根据对应分区列进行查询等操作。 本章节重点介绍使用ALTER TABLE命令添加分区的基本操作和使用说明。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,"kafka"表示输出到Kafka中。 kafka_bootstrap_servers 是 Kafka的连接端口,需要确保能连通(需要通过增强型跨源开通DLI队列和Kafka集群的连接)。 kafka_topic 是 写入的topic。 encode 是 编码格式,当前支持“json”和“user_defined”。 若编码格式为“user_defined”,则需配置“encode_class_name”和“encode_class_parameter”属性。 encode_class_name 否 当encode为user_defined时,需配置该参数,指定用户自实现编码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 encode_class_parameter 否 当encode为user_defined时,可以通过配置该参数指定用户自实现编码类的入参,仅支持一个string类型的参数。 krb_auth 否 创建跨源认证的认证名。开启kerberos认证时,需配置该参数。如果创建的 MRS 集群未开启kerb认证的集群,请确保在DLI队列host文件中添加MRS集群master节点的“/etc/hosts”信息。 kafka_properties 否 可通过该参数配置kafka的原生属性,格式为"key1=value1;key2=value2" kafka_certificate_name 否 跨源认证信息名称。跨源认证信息类型为“Kafka_SSL”时,该参数有效。 说明: 指定该配置项时,服务仅加载该认证下指定的文件和密码,系统将自动设置到“kafka_properties”属性中。 Kafka SSL认证需要的其他配置信息,需要用户手动在“kafka_properties”属性中配置。
  • 语法格式 1 2 3 4 5 6 7 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH( type = "kafka", kafka_bootstrap_servers = "", kafka_topic = "", encode = "json" )
  • 示例 将数据输出到Kafka中。 示例一 1 2 3 4 5 6 7 CREATE SINK STREAM kafka_sink (name STRING) WITH ( type="kafka", kafka_bootstrap_servers = "ip1:port1,ip2:port2", kafka_topic = "testsink", encode = "json" ); 示例二 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 CREATE SINK STREAM kafka_sink ( a1 string, a2 string, a3 string, a4 INT ) // 输出字段 WITH ( type="kafka", kafka_bootstrap_servers = "192.x.x.x:9093, 192.x.x.x:9093, 192.x.x.x:9093", kafka_topic = "testflink", // 写入的topic encode = "csv", // 编码格式,支持json/csv kafka_certificate_name = "Flink", kafka_properties_delimiter = ",", kafka_properties = "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"xxx\";,sasl.mechanism=PLAIN,security.protocol=SASL_SSL" );
共100000条