华为云用户手册

  • 示例 将流audi_cheaper_than_30w的数据输出到数据库test的audi_cheaper_than_30w表下。 1 2 3 4 5 6 7 8 9 10 11 12 13 CREATE SINK STREAM audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "rds", username = "root", password = "xxxxxx", db_url = "mysql://192.168.1.1:8635/test", table_name = "audi_cheaper_than_30w" );
  • 功能描述 DLI 将Flink作业的输出数据输出到关系型数据库(RDS)中。目前支持PostgreSQL和MySQL两种数据库。PostgreSQL数据库可存储更加复杂类型的数据,支持空间信息服务、多版本并发控制(MVCC)、高并发,适用场景包括位置应用、金融保险、互联网电商等。MySQL数据库适用于各种WEB应用、电子商务应用、企业应用、移动应用等场景,减少IT部署和维护成本。 关系型数据库(Relational Database Service,简称RDS)是一种基于 云计算平台 的在线关系型数据库服务。 RDS的更多信息,请参见《关系型数据库用户指南》。
  • 前提条件 请务必确保您的账户下已在关系型数据库(RDS)里创建了PostgreSQL或MySQL类型的RDS实例。 如何创建RDS实例,请参见《关系型数据库快速入门》中“购买实例”章节。 该场景作业需要运行在DLI的独享队列上,因此要与RDS实例建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖探索 用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 示例 create table upsertKafkaSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT, primary key (car_id) not enforced ) with ( 'connector.type' = 'upsert-kafka', 'connector.version' = '0.11', 'connector.topic' = 'test-topic', 'connector.properties.bootstrap.servers' = 'xx.xx.xx.xx:9092', 'format.type' = 'csv' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,对于upsert kafka,需配置为'upsert-kafka' connector.version 否 Kafka版本,仅支持:'0.11' format.type 是 数据序列化格式,支持:'csv', 'json'及'avro'等 connector.topic 是 kafka topic名 connector.properties.bootstrap.servers 是 kafka brokers地址,以逗号分隔 connector.sink-partitioner 否 记录分区方式,支持:'fixed', 'round-robin'及'custom' connector.sink-partitioner-class 否 'sink-partitioner'为'custom'时,需配置,如'org.mycompany.MyPartitioner' connector.sink.ignore-retraction 否 是否忽略回撤消息,默认为false。回撤消息将以null值写入kafka update-mode 否 支持:'append', 'retract'及'upsert'三种写入模式 connector.properties.* 否 配置kafka任意原生属性
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'upsert-kafka', 'connector.version' = '', 'connector.topic' = '', 'connector.properties.bootstrap.servers' = '', 'format.type' = '' );
  • 语法格式 1 2 3 4 5 6 7 CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "user_defined", type_class_name = "", type_class_parameter = "" ) (TIMESTAMP BY timeindicator (',' timeindicator)?);timeindicator:PROCTIME '.' PROCTIME| ID '.' ROWTIME
  • 语法格式 DWS结果表中不允许指定所有属性为PRIMARY KEY。 1 2 3 4 5 6 7 8 9 10 11 12 13 create table dwsSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'gaussdb', 'connector.url' = '', 'connector.table' = '', 'connector.driver' = '', 'connector.username' = '', 'connector.password' = '' );
  • 示例 使用gsjdbc4驱动连接时,加载的数据库驱动类为:org.postgresql.Driver。该驱动为默认,创建表时可以不填该驱动参数。 使用upsert模式,写入数据到DWS 1 2 3 4 5 6 7 8 9 10 11 12 13 14 create table dwsSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'gaussdb', 'connector.url' = 'jdbc:postgresql://xx.xx.xx.xx:8000/xx', 'connector.table' = 'car_info', 'connector.username' = 'xx', 'connector.password' = 'xx', 'connector.write.mode' = 'upsert', 'connector.write.flush.interval' = '30s' ); 当DWS表test在名为ads_game_sdk_base的schema下时,可以参考如下样例。 CREATE TABLE ads_rpt_game_sdk_realtime_ada_reg_user_pay_mm ( ddate DATE, dmin TIMESTAMP(3), game_appkey VARCHAR, channel_id VARCHAR, pay_user_num_1m bigint, pay_amt_1m bigint, PRIMARY KEY (ddate, dmin, game_appkey, channel_id) NOT ENFORCED ) WITH ( 'connector.type' = 'gaussdb', 'connector.url' = 'jdbc:postgresql://xx.xx.xx.xx:8000/dws_bigdata_db', 'connector.table' = 'ads_game_sdk_base\".\"test', 'connector.username' = 'xxxx', 'connector.password' = 'xxxxx', 'connector.write.mode' = 'upsert', 'connector.write.flush.interval' = '30s' ); 使用gsjdbc200驱动连接时,加载的数据库驱动类为:com.huawei.gauss200.jdbc.Driver。 当DWS表test在名为ads_game_sdk_base的schema下时,可以参考如下样例。 create table dwsSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'gaussdb', 'connector.table' = 'ads_game_sdk_base\".\"test', 'connector.driver' = 'com.huawei.gauss200.jdbc.Driver', 'connector.url' = 'jdbc:gaussdb://xx.xx.xx.xx:8000/xx', 'connector.username' = 'xx', 'connector.password' = 'xx', 'connector.write.mode' = 'upsert', 'connector.write.flush.interval' = '30s' );
  • 功能描述 DLI将Flink作业的输出数据输出到 数据仓库 服务(DWS)中。DWS数据库内核兼容PostgreSQL,PostgreSQL数据库可存储更加复杂类型的数据,支持空间信息服务、多版本并发控制(MVCC)、高并发,适用场景包括位置应用、金融保险、互联网电商等。 数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。DWS的更多信息,请参见《数据仓库服务管理指南》。
  • 前提条件 请务必确保您的账户下已在数据仓库服务(DWS)里创建了DWS集群。 如何创建DWS集群,请参考《数据仓库服务管理指南》中“创建集群”章节。 请确保已创建DWS数据库表。 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖 探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,需配置为'gaussdb' connector.url 是 jdbc连接地址,格式为:jdbc:postgresql://${ip}:${port}/${dbName} 。 connector.table 是 操作的表名。如果该DWS表在某schema下,则格式为:'schema\".\"具体表名',具体可以参考示例说明。 connector.driver 否 jdbc连接驱动,默认为: org.postgresql.Driver。 connector.username 否 数据库认证用户名,需要和'connector.password'一起配置 connector.password 否 数据库认证密码,需要和'connector.username'一起配置 connector.write.mode 否 数据写入模式,支持: copy, insert以及upsert三种。默认值为upsert。 该参数与'primary key'配合使用。 未配置'primary key'时,支持copy及insert两种模式追加写入。 配置'primary key',支持copy、upsert以及insert三种模式更新写入。 注意:由于dws不支持更新分布列,因而配置的更新主键必须包含dws表中定义的所有分布列。 connector.write.flush.max-rows 否 数据flush大小,超过该值将触发写入flush。默认为5000。 connector.write.flush.interval 否 数据flush周期,周期性触发写入flush。格式为:{length value}{time unit label},如123ms, 321s,支持的时间单位包括: d,h,min,s,ms等,默认为ms。不填写则默认不根据时间刷新。 connector.write.max-retries 否 写入最大重试次数,默认为3。 connector.write.merge.filter-key 否 配置PRIMARY KEY,并且“connector.write.mode”配置为copy时,可以配置merge时的过滤列名。 connector.write.escape-string-value 否 是否对string类型值进行转义,默认为false。
  • 详细样例代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 /** source **/ CREATE SOURCE STREAM car_infos (cast_int_to_varchar int, cast_String_to_int string, case_string_to_timestamp string, case_timestamp_to_date timestamp) WITH ( type = "dis", region = "xxxxx", channel = "dis-input", partition_count = "1", encode = "json", offset = "13", json_config = "cast_int_to_varchar=cast_int_to_varchar;cast_String_to_int=cast_String_to_int;case_string_to_timestamp=case_string_to_timestamp;case_timestamp_to_date=case_timestamp_to_date" ); /** sink **/ CREATE SINK STREAM cars_infos_out (cast_int_to_varchar varchar, cast_String_to_int int, case_string_to_timestamp timestamp, case_timestamp_to_date date) WITH ( type = "dis", region = "xxxxx", channel = "dis-output", partition_count = "1", encode = "json", offset = "4", json_config = "cast_int_to_varchar=cast_int_to_varchar;cast_String_to_int=cast_String_to_int;case_string_to_timestamp=case_string_to_timestamp;case_timestamp_to_date=case_timestamp_to_date", enable_output_null="true" ); /** 统计car的静态信息 **/ INSERT INTO cars_infos_out SELECT cast(cast_int_to_varchar as varchar), cast(cast_String_to_int as int), cast(case_string_to_timestamp as timestamp), cast(case_timestamp_to_date as date) FROM car_infos; 返回数据 {"case_string_to_timestamp":1514736001000,"cast_int_to_varchar":"5","case_timestamp_to_date":"2018-01-01","cast_String_to_int":100}
  • 常用类型转换函数 表1 常用类型转换函数 函数 说明 cast(v1 as varchar) 将v1转换为字符串类型,v1可以是数值类型,TIMESTAMP/DATE/TIME。 cast (v1 as int) 将v1转换为int, v1可以是数值类型或字符类。 cast(v1 as timestamp) 将v1转换为timestamp类型,v1可以是字符串或DATE/TIME。 cast(v1 as date) 将v1转换为date类型, v1可以是字符串或者TIMESTAMP。
  • 语法格式 1 2 3 4 5 6 7 8 9 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "cloudtable", region = "", cluster_id = "", table_name = "", table_columns = "", create_if_not_exist = "" )
  • 功能描述 DLI将作业的输出数据输出到CloudTable的HBase中。HBase是一个稳定可靠,性能卓越、可伸缩、面向列的分布式 云存储 系统,适用于海量数据存储以及分布式计算的场景,用户可以利用HBase搭建起TB至PB级数据规模的存储系统,对数据轻松进行过滤分析,毫秒级得到响应,快速发现数据价值。HBase支持消息数据、报表数据、推荐类数据、风控类数据、日志数据、订单数据等结构化、半结构化的KeyValue数据存储。 利用DLI,用户可方便地将海量数据高速、低时延写入HBase。 表格存储服务 (CloudTable),是基于Apache HBase提供的分布式、可伸缩、全托管的KeyValue数据存储服务,为DLI提供了高性能的随机读写能力,适用于海量结构化数据、半结构化数据以及时序数据的存储和查询应用,适用于物联网IOT应用和通用海量KeyValue数据存储与查询等场景。CloudTable的更多信息,请参见《 表格存储 服务用户指南》。
  • 示例 将流qualified_cars的数据输出到表格存储服务CloudTable的HBase中。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 CREATE SINK STREAM qualified_cars ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT ) WITH ( type = "cloudtable", region = "xxx", cluster_id = "209ab1b6-de25-4c48-8e1e-29e09d02de28", table_name = "car_pass_inspect_with_age_${car_age}", table_columns = "rowKey,info:owner,,car:speed,car:miles", illegal_data_table = "illegal_data", create_if_not_exist = "true", batch_insert_data_num = "20" );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,“cloudtable”表示输出到CloudTable(HBase)。 region 是 表格存储服务所在区域。 cluster_id 是 待插入数据所属集群的id。 table_name 是 待插入数据的表名,支持参数化,例如当需要某一列或者几列作为表名的一部分时,可表示为”car_pass_inspect_with_age_${car_age}“,其中car_age为列名。 table_columns 是 待插入的列,具体形式如:"rowKey,f1:c1,f1:c2,f2:c1",其中必须指定rowKey,当某列不需要加入数据库时,以第三列为例,可表示为"rowKey,f1:c1,,f2:c1"。 illegal_data_table 否 如果指定该参数,异常数据(比如:rowKey不存在)会写入该表(rowKey为时间戳加六位随机数字,schema为info:data, info:reason),否则会丢弃。 create_if_not_exist 否 当待写入的表或者列族不存在时,是否创建,值为true或者false,默认值为false。 batch_insert_data_num 否 表示一次性批量写入的数据条数,值必须为正整数,上限为100,默认值为10。
  • 关键字 ALL:返回重复的行。为默认选项。其后只能跟*,否则会出错。 DISTINCT:从结果集移除重复的行。 WHERE:WHERE子句嵌套将利用子查询的结果作为过滤条件。 operator:包含关系运算符中的等式与不等式操作符及IN,NOT IN,EXISTS,NOT EXISTS操作符。 当operator为IN或者NOT IN时,子查询的返回结果必须是单列。 当operator为EXISTS或者NOT EXISTS时,子查询中一定要包含WHERE条件过滤。当子查询中有字段与外部查询相同时,需要在该字段前加上表名。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 13 create table dwsSink ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector.type' = 'redis', 'connector.host' = '', 'connector.port' = '', 'connector.password' = '', 'connector.table-name' = '', 'connector.key-column' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 connector类型,对于redis,需配置为'redis'。 connector.host 是 redis连接地址。 connector.port 是 redis连接端口。 connector.password 否 redis认证密码。 connector.deploy-mode 否 redis部署模式,支持standalone/cluster,默认standalone connector.table-name 否 table存储模式下必配,redis中存储表名。在table存储模式下,数据将以hash类型存储到redis,其中key为:${table-name}:${ext-key},field名为列名。 说明: table存储模式:将connector.table-name、connector.key-column作为redis的key。redis的hash类型,每个key对应一个hashmap,hashmap的hashkey为源表的字段名,hashvalue为源表的字段值。 connector.key-column 否 table存储模式下可配置,将该字段值作为redis中的ext-key,未配置时,ext-key为生成的uuid connector.write-schema 否 table存储模式下可配置,是否将当前schema写入到redis,默认为false connector.data-type 否 数据存储类型,用户自定义存储模式必配。支持:string, list, hash, set类型。其中string/list以及sets中schema字段数必须为2,hash字段数必须为3 connector.ignore-retraction 否 是否忽略retraction消息,默认为false
  • 示例 配置“connector.table-name”参数时的table存储模式示例。 table模式采用hash类型存储数据,与基本hash类型将表的三个字段分别作为key、hash_key、hash_value不同,table模式下的key值可以通过“connector.table-name”和“connector.key-column”两个参数设置,将表中的所有字段名作为hash_key,字段值作为hash_value写入到hash中。 create table redisSink( car_id STRING, car_owner STRING, car_brand STRING, car_speed INT ) with ( 'connector.type' = 'redis', 'connector.host' = 'xx.xx.xx.xx', 'connector.port' = '6379', 'connector.password' = 'xx', 'connector.table-name'='car_info', 'connector.key-column'='car_id' ); insert into redisSink (car_id,car_owner,car_brand,car_speed) VALUES ("A1234","OwnA","A1234",30); 以下示例演示“connector.data-type”为string, list, hash, set类型时的建表语句。 “connector.data-type”为string类型。 表为2列:第一列为key,第二列为value。 create table redisSink( attr1 STRING, attr2 STRING ) with ( 'connector.type' = 'redis', 'connector.host' = 'xx.xx.xx.xx', 'connector.port' = '6379', 'connector.password' = 'xx', 'connector.data-type' = 'string' ); insert into redisSink (attr1,attr2) VALUES ("car_id","A1234"); “connector.data-type”为list类型。 表为2列:第一列为key,第二列为value。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 create table redisSink( attr1 STRING, attr2 STRING ) with ( 'connector.type' = 'redis', 'connector.host' = 'xx.xx.xx.xx', 'connector.port' = '6379', 'connector.password' = 'xx', 'connector.data-type' = 'list' ); insert into redisSink (attr1,attr2) VALUES ("car_id","A1234"); “connector.data-type”为set类型。 表为2列:第一列为key,第二列为value。 create table redisSink( attr1 STRING, attr2 STRING ) with ( 'connector.type' = 'redis', 'connector.host' = 'xx.xx.xx.xx', 'connector.port' = '6379', 'connector.password' = 'xx', 'connector.data-type' = 'set' ); insert into redisSink (attr1,attr2) VALUES ("car_id","A1234"); “connector.data-type”为hash类型。 表为3列:第一列为key,第二列为hash_key,第三列为hash_value。 create table redisSink( attr1 STRING, attr2 STRING, attr3 STRING ) with ( 'connector.type' = 'redis', 'connector.host' = 'xx.xx.xx.xx', 'connector.port' = '6379', 'connector.password' = 'xx', 'connector.data-type' = 'hash' ); insert into redisSink (attr1,attr2,attr3) VALUES ("car_info","car_id","A1234");
  • 示例 将流qualified_cars的数据输出到 云搜索服务 的集群。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CREATE SINK STREAM qualified_cars ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT ) WITH ( type = "es", region = "xxx", cluster_address = "192.168.0.212:9200", es_index = "car", es_type = "information", es_fields = "id,owner,age,speed,miles", batch_insert_data_num = "10" );
  • 语法格式 1 2 3 4 5 6 7 8 9 10 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "es", region = "", cluster_address = "", es_index = "", es_type= "", es_fields= "", batch_insert_data_num= "" );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,es表示输出到 云搜索 服务中。 region 是 数据所在的云搜索服务所在区域。 cluster_address 是 云搜索服务集群的内网访问地址,例如:x.x.x.x:x,多个地址时以逗号分隔。 es_index 是 待插入数据的索引,支持参数化。对应 CSS 服务中的index。 具体请参考《云搜索服务产品介绍》。 es_type 是 待插入数据的文档类型,支持参数化。对应 CS S服务中的type。 具体请参考《云搜索服务产品介绍》。 若使用的es版本为6.x,则该值不能以"_"开头。 若使用的es版本为7.x,如果提前预置CSS服务中的type,则es_type需为"_doc",否则可为符合CSS规范的值。 es_fields 是 待插入数据字段的key,具体形式如:"id,f1,f2,f3,f4",并且保证与sink中数据列一一对应;如果不使用key,而是采用随机的属性字段,则无需使用id关键字,具体形式如:"f1,f2,f3,f4,f5"。对应CSS服务中的filed。 具体请参考《云搜索服务产品介绍》。 batch_insert_data_num 是 表示一次性批量写入的数据量,值必须为正整数,单位为:条。上限为65536,默认值为10。 action 否 当值为add时,表示遇到相同id时,数据被强制覆盖,当值为upsert时,表示遇到相同id时,更新数据(选择upsert时,es_fields字段中必须指定id),默认值为add。 enable_output_null 否 使用该参数来配置是否输出空字段。当该参数为true表示输出空字段(值为null),若为false表示不输出空字段。默认为false。 max_record_num_cache 否 记录最大缓存数。 es_certificate_name 否 跨源认证信息名称。 创建跨源认证请参考跨源认证。 若es集群开启安全模式且开启https,则使用证书进行访问,创建的跨源认证类型需要为“CSS”。 若es集群开启安全模式,但关闭https,则使用证书和账号密码进行访问,创建的跨源认证类型需要为"Password"。
  • 功能描述 DLI将Flink作业的输出数据输出到云搜索服务CSS的Elasticsearch中。Elasticsearch是基于Lucene的当前流行的企业级搜索服务器,具备分布式多用户的能力。其主要功能包括全文检索、结构化搜索、分析、聚合、高亮显示等。能为用户提供实时搜索、稳定可靠的服务。适用于 日志分析 、站内搜索等场景。 云搜索服务(Cloud Search Service,简称CSS)为DLI提供托管的分布式搜索引擎服务,完全兼容开源Elasticsearch搜索引擎,支持结构化、非结构化文本的多条件检索、统计、报表。 云搜索服务的更多信息,请参见《云搜索服务用户指南》。 创建CSS集群时如果开启了安全模式,后续将无法关闭。
  • 语法格式 1 2 3 4 5 6 7 8 9 CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "kafka", kafka_bootstrap_servers = "", kafka_group_id = "", kafka_topic = "", encode = "json", json_config="" );
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 数据源类型,“Kafka”表示数据源。 kafka_bootstrap_servers 是 Kafka的连接端口,需要确保能连通(需要通过增强型跨源开通DLI队列和Kafka集群的连接)。 kafka_group_id 否 group id。 kafka_topic 是 读取的Kafka的topic。目前只支持读取单个topic。 encode 是 数据编码格式,可选为“csv”、“json”、“blob”和“user_defined”。 若编码格式为“csv”,则需配置“field_delimiter”属性。 若编码格式为“json”,则需配置“json_config”属性。 当编码格式为"blob"时,表示不对接收的数据进行解析,当前表仅能有一个且为Array[TINYINT]类型的表字段。 若编码格式为“user_defined”,则需配置“encode_class_name”和“encode_class_parameter”属性。 encode_class_name 否 当encode为user_defined时,需配置该参数,指定用户自实现解码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 encode_class_parameter 否 当encode为user_defined时,可以通过配置该参数指定用户自实现解码类的入参,仅支持一个string类型的参数。 json_config 否 当encode为json时,用户可以通过该参数指定json字段和流属性字段的映射关系。 格式:"field1=json_field1;field2=json_field2" 格式说明:field1、field2为创建的表字段名称。json_field1、json_field2为kafka输入数据json串的key字段名称。 具体使用方法可以参考示例说明。 说明: 如果定义的source stream中的属性和json中的属性名称相同,json_configs可以不用配置。 field_delimiter 否 当encode为csv时,用于指定csv字段分隔符,默认为逗号。 quote 否 可以指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普通字符处理。 当引用符号为双引号时,请设置quote = "\u005c\u0022"进行转义。 当引用符号为单引号时,则设置quote = "'"。 说明: 目前仅适用于CSV格式。 设置引用符号后,必须保证每个字段中包含0个或者偶数个引用符号,否则会解析失败。 start_time 否 kafka数据读取起始时间。 当该参数配置时则从配置的时间开始读取数据,有效格式为yyyy-MM-dd HH:mm:ss。start_time要不大于当前时间,若大于当前时间,则不会有数据读取出。 该参数配置后,只会读取Kafka topic在该时间点后产生的数据。 kafka_properties 否 可通过该参数配置kafka的原生属性,格式为"key1=value1;key2=value2"。具体的属性值可以参考Apache Kafka中的描述。 kafka_certificate_name 否 跨源认证信息名称。跨源认证信息类型为“Kafka_SSL”时,该参数有效。 说明: 指定该配置项时,服务仅加载该认证下指定的文件和密码,系统将自动设置到“kafka_properties”属性中。 Kafka SSL认证需要的其他配置信息,需要用户手动在“kafka_properties”属性中配置。
  • 前提条件 Kafka服务端的端口如果监听在hostname上,则需要将Kafka Broker节点的hostname和IP的对应关系添加到DLI队列中。Kafka Broker节点的hostname和IP请联系Kafka服务的部署人员。如何添加IP域名映射,请参见《数据湖探索用户指南》中修改主机信息章节。 Kafka是线下集群,需要通过增强型跨源连接功能将Flink作业与Kafka进行对接。且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 参数说明 表1 参数说明 参数 是否必选 说明 默认值 transientThreshold 否 连续transientThreshold个窗口发生数据改变表示发生数据概念迁移。 5 numTrees 否 随机森林中Tree的数量。 15 maxLeafCount 否 Tree最大叶子节点数量。 15 maxTreeHeight 否 Tree最大高度。 12 seed 否 算法使用的随机种子值。 4010 numClusters 否 分类数,默认包含异常和非异常两类。 2 dataViewMode 否 算法学习模式。 history:学习所有历史数据。 horizon:仅考虑最近一段时间历史数据,默认为4个窗口。 history
共100000条