华为云用户手册

  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 数据源类型,“Kafka”表示数据源。 kafka_bootstrap_servers 是 Kafka的连接端口,需要确保能连通(需要通过增强型跨源开通 DLI 队列和Kafka集群的连接)。 kafka_group_id 否 group id。 kafka_topic 是 读取的Kafka的topic。目前只支持读取单个topic。 encode 是 数据编码格式,可选为“csv”、“json”、“blob”和“user_defined”。 若编码格式为“csv”,则需配置“field_delimiter”属性。 若编码格式为“json”,则需配置“json_config”属性。 当编码格式为"blob"时,表示不对接收的数据进行解析,当前表仅能有一个且为Array[TINYINT]类型的表字段。 若编码格式为“user_defined”,则需配置“encode_class_name”和“encode_class_parameter”属性。 encode_class_name 否 当encode为user_defined时,需配置该参数,指定用户自实现解码类的类名(包含完整包路径),该类需继承类DeserializationSchema。 encode_class_parameter 否 当encode为user_defined时,可以通过配置该参数指定用户自实现解码类的入参,仅支持一个string类型的参数。 json_config 否 当encode为json时,用户可以通过该参数指定json字段和流属性字段的映射关系。 格式:"field1=json_field1;field2=json_field2" 格式说明:field1、field2为创建的表字段名称。json_field1、json_field2为kafka输入数据json串的key字段名称。 具体使用方法可以参考示例说明。 说明: 如果定义的source stream中的属性和json中的属性名称相同,json_configs可以不用配置。 field_delimiter 否 当encode为csv时,用于指定csv字段分隔符,默认为逗号。 quote 否 可以指定数据格式中的引用符号,在两个引用符号之间的属性分隔符会被当做普通字符处理。 当引用符号为双引号时,请设置quote = "\u005c\u0022"进行转义。 当引用符号为单引号时,则设置quote = "'"。 说明: 目前仅适用于 CS V格式。 设置引用符号后,必须保证每个字段中包含0个或者偶数个引用符号,否则会解析失败。 start_time 否 kafka数据读取起始时间。 当该参数配置时则从配置的时间开始读取数据,有效格式为yyyy-MM-dd HH:mm:ss。start_time要不大于当前时间,若大于当前时间,则不会有数据读取出。 该参数配置后,只会读取Kafka topic在该时间点后产生的数据。 kafka_properties 否 可通过该参数配置kafka的原生属性,格式为"key1=value1;key2=value2"。具体的属性值可以参考Apache Kafka中的描述。 kafka_certificate_name 否 跨源认证信息名称。跨源认证信息类型为“Kafka_SSL”时,该参数有效。 说明: 指定该配置项时,服务仅加载该认证下指定的文件和密码,系统将自动设置到“kafka_properties”属性中。 Kafka SSL认证需要的其他配置信息,需要用户手动在“kafka_properties”属性中配置。
  • 语法格式 1 2 3 4 5 6 7 8 9 CREATE SOURCE STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "kafka", kafka_bootstrap_servers = "", kafka_group_id = "", kafka_topic = "", encode = "json", json_config="" );
  • 前提条件 Kafka服务端的端口如果监听在hostname上,则需要将Kafka Broker节点的hostname和IP的对应关系添加到DLI队列中。Kafka Broker节点的hostname和IP请联系Kafka服务的部署人员。如何添加IP域名映射,请参见《 数据湖探索 用户指南》中修改主机信息章节。 Kafka是线下集群,需要通过增强型跨源连接功能将Flink作业与Kafka进行对接。且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖 探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,“opentsdb”表示输出到 表格存储服务 CloudTable(OpenTSDB)。 region 是 表格存储 服务所在区域。 cluster_id 否 待插入数据所属集群的id,该参数与tsdb_link_address必须指定其中一个。 tsdb_metrics 是 数据点的metric,支持参数化。 tsdb_timestamps 是 数据点的timestamp,数据类型支持LONG、INT、SHORT和STRING,仅支持指定动态列。 tsdb_values 是 数据点的value,数据类型支持SHORT、INT、LONG、FLOAT、DOUBLE和STRING,支持指定动态列或者常数值。 tsdb_tags 是 数据点的tags,每个tags里面至少一个标签值,最多8个标签值,支持参数化。 batch_insert_data_num 否 表示一次性批量写入的数据量(即数据条数),值必须为正整数,上限为65536,默认值为8。 tsdb_link_address 否 待插入数据所属集群的OpenTsdb链接地址,使用该参数时,作业需要运行在独享DLI队列,且DLI队列需要与CloudTable集群建立增强型跨源,该参数与cluster_id必须指定其中一个。 说明: 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。
  • 示例 将流weather_out的数据输出到表格存储服务CloudTable的OpenTSDB中。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CREATE SINK STREAM weather_out ( timestamp_value LONG, /* 时间 */ temperature FLOAT, /* 温度值 */ humidity FLOAT, /* 湿度值 */ location STRING /* 地点 */ ) WITH ( type = "opentsdb", region = "xxx", cluster_id = "e05649d6-00e2-44b4-b0ff-7194adaeab3f", tsdb_metrics = "weather", tsdb_timestamps = "${timestamp_value}", tsdb_values = "${temperature}; ${humidity}", tsdb_tags = "location:${location},signify:temperature; location:${location},signify:humidity", batch_insert_data_num = "10" );
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "opentsdb", region = "", cluster_id = "", tsdb_metrics = "", tsdb_timestamps = "", tsdb_values = "", tsdb_tags = "", batch_insert_data_num = "" )
  • 功能描述 DLI将Flink作业的输出数据输出到CloudTable的OpenTSDB中。OpenTSDB是基于HBase的分布式的,可伸缩的时间序列数据库。它存储的是时间序列数据,时间序列数据是指在不同时间点上收集到的数据,这类数据反映了一个对象随时间的变化状态或程度。支持秒级别数据的采集监控,进行永久存储,索引和查询,可用于系统监控和测量、物联网数据、金融数据和科学实验结果数据的收集监控。 表格存储服务(CloudTable),是基于Apache HBase提供的分布式、可伸缩、全托管的KeyValue数据存储服务,为DLI提供了高性能的随机读写能力,适用于海量结构化数据、半结构化数据以及时序数据的存储和查询应用,适用于物联网IOT应用和通用海量KeyValue数据存储与查询等场景。CloudTable的更多信息,请参见《表格存储服务用户指南》。
  • 参数说明 表1 参数说明 参数 是否必选 说明 默认值 transientThreshold 否 连续transientThreshold个窗口发生数据改变表示发生数据概念迁移。 5 numTrees 否 随机森林中Tree的数量。 15 maxLeafCount 否 Tree最大叶子节点数量。 15 maxTreeHeight 否 Tree最大高度。 12 seed 否 算法使用的随机种子值。 4010 numClusters 否 分类数,默认包含异常和非异常两类。 2 dataViewMode 否 算法学习模式。 history:学习所有历史数据。 horizon:仅考虑最近一段时间历史数据,默认为4个窗口。 history
  • 语法格式 1 2 3 4 5 6 7 8 CREATE SINK STREAM stream_id (attr_name attr_type (',' attr_name attr_type)* ) WITH ( type = "rds", username = "", password = "", db_url = "", table_name = "" );
  • 示例 将流audi_cheaper_than_30w的数据输出到数据库test的audi_cheaper_than_30w表下。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 CREATE SINK STREAM audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "rds", username = "root", password = "xxxxxx", db_url = "postgresql://192.168.1.1:8000/test", table_name = "audi_cheaper_than_30w" ); insert into audi_cheaper_than_30w select "1","2","3",4;
  • 关键字 表1 关键字说明 参数 是否必选 说明 type 是 输出通道类型,rds表示输出到关系型数据库或者 数据仓库 服务中。 username 是 数据库连接用户名。 password 是 数据库连接密码。 db_url 是 数据库连接地址格式为:postgresql://ip:port/database。 table_name 是 要插入数据的数据库表名。数据库表需事先创建好。 db_columns 否 支持配置输出流属性和数据库表属性的对应关系,需严格按照输出流的属性顺序配置。 示例: 1 2 3 4 5 6 7 8 9 create sink stream a3(student_name string, student_age int) with ( type = "rds", username = "root", password = "xxxxxxxx", db_url = "postgresql://192.168.0.102:8000/test1", db_columns = "name,age", table_name = "t1" ); student_name对应数据库里的name属性,student_age对应数据库里的age属性。 说明: 当不配置db_columns时,若输出流属性个数小于数据库表属性个数,并且数据库多出的属性都是nullable或者有默认值时,这种情况也允许。 primary_key 否 如果想通过主键实时更新表中的数据,需要在创建数据表的时候增加primary_key配置项,如下面例子中的c_timeminute。配置primary_key后,在进行数据写入操作时,如果primary_key存在,则进行更新操作,否则进行插入操作。 示例: 1 2 3 4 5 6 7 8 9 CREATE SINK STREAM test(c_timeminute LONG, c_cnt LONG) WITH ( type = "rds", username = "root", password = "xxxxxxxx", db_url = "postgresql://192.168.0.12:8000/test", table_name = "test", primary_key = "c_timeminute" );
  • 前提条件 请务必确保您的账户下已在数据仓库服务(DWS)里创建了DWS集群。 如何创建DWS集群,请参考《数据仓库服务管理指南》中“创建集群”章节。 请确保已创建DWS数据库表。 该场景作业需要运行在DLI的独享队列上,因此要与DWS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。
  • 功能描述 DLI将Flink作业的输出数据输出到数据仓库服务(DWS)中。DWS数据库内核兼容PostgreSQL,PostgreSQL数据库可存储更加复杂类型的数据,支持空间信息服务、多版本并发控制(MVCC)、高并发,适用场景包括位置应用、金融保险、互联网电商等。 数据仓库服务(Data Warehouse Service,简称DWS)是一种基于基础架构和平台的在线数据处理数据库,为用户提供海量数据挖掘和分析服务。DWS的更多信息,请参见《数据仓库服务管理指南》。
  • 语法格式 create table disSource ( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' watermark for rowtime_column_name as watermark-strategy_expression) ) with ( 'connector.type' = 'dis', 'connector.region' = '', 'connector.channel' = '', 'format-type' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 说明 connector.type 是 数据源类型,“dis”表示数据源为 数据接入服务 ,必须为dis。 connector.region 是 数据所在的DIS区域。 connector.ak 否 访问密钥ID(Access Key ID),需与sk同时设置 connector.sk 否 Secret Access Key,需与ak同时设置 connector.channel 是 数据所在的DIS通道名称。 connector.partition-count 否 读取从0分区开始计算的partition-count个通道范围内的数据。 该参数和partition-range参数不能同时配置。 当两个参数都没有配置的时候默认读取所有partition。 connector.partition-range 否 指定作业从DIS通道读取的分区范围。该参数和partition-count参数不能同时配置。当两个参数没有配置的时候默认读取所有partition。 partition-range = "[0:2]"时,表示读取的分区范围是1-3,包括分区1、分区2和分区3,范围设置要在dis相应通道的范围内。 connector.offset 否 用户可以根据需求设置该参数的数值,读取数据的起始位置,与start-time不能同时设置。 connector.start-time 否 DIS数据读取从该起始时间的数据。 当该参数配置时则从配置的时间开始读取数据,有效格式为yyyy-MM-dd HH:mm:ss。 当没有配置start-time也没配置offset的时候,读取最新数据。 connector. enable-checkpoint 否 是否启用checkpoint功能,可配置为true(启用)或者false(停用), 默认为false。 勿与offset或start-time同时设置;若enable-checkpoint为true,与checkpoint-app-name需要同时配置。 connector. checkpoint-app-name 否 DIS服务的消费者标识,当不同作业消费相同通道时,需要区分不同的消费者标识,以免checkpoint混淆。 勿与offset或start-time同时设置;若enable-checkpoint为true,则需要同时配置。 connector. checkpoint-interval 否 DIS源算子做checkpoint的时间间隔,默认为60s。格式为d、day/h、hour/min、minute/s、sec、second 勿与offset或start-time同时设置。 format.type 是 数据编码格式,可选为“csv”、“json” format.field-delimiter 否 属性分隔符,仅当编码格式为csv时,用户可以自定义属性分隔符,默认为“,”英文逗号。
  • 示例 1 2 3 4 5 6 7 8 9 10 11 12 create table disCsvSource ( car_id STRING, car_owner STRING, car_age INT, average_speed INT, total_miles INT) with ( 'connector.type' = 'dis', 'connector.region' = 'cn-north-1', 'connector.channel' = 'disInput', 'format.type' = 'csv' );
  • 功能描述 创建source流从数据接入服务(DIS)获取数据。用户数据从DIS接入,Flink作业从DIS的通道读取数据,作为作业的输入数据。Flink作业可通过DIS的source源将数据从生产者快速移出,进行持续处理,适用于将云服务外数据导入云服务后进行过滤、实时分析、监控报告和转储等场景。 数据接入服务(Data Ingestion Service,简称DIS)为处理或分析流数据的自定义应用程序构建数据流管道,主要解决云服务外的数据实时传输到云服务内的问题。数据接入服务每小时可从数十万种数据源(如IoT数据采集、日志和定位追踪事件、网站点击流、社交媒体源等)中连续捕获、传送和存储数TB数据。DIS的更多信息,请参见《数据接入服务用户指南》。
  • 参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 说明 connector 是 无 String connector类型,需配置为'redis'。 host 是 无 String redis连接地址。 port 否 6379 Integer redis连接端口。 password 否 无 String redis认证密码。 namespace 否 无 String redis key的namespace delimiter 否 : String redis的key和namespace之间的分隔符。 data-type 否 hash String redis的数据类型,有下列选项: hash list set sorted-set string data-type取值约束详见data-type取值约束说明。 schema-syntax 否 fields String redis的schema语义,包含以下值(其具体使用请参考注意事项和常见问题): fields:适用于所有数据类型 fields-scores:适用于sorted set数据类型 array:适用于list、set、sorted set数据类型 array-scores:适用于sorted set数据类型 map:适用于hash、sorted set数据类型 schema-syntax取值约束详见schema-syntax取值约束说明。 deploy-mode 否 standalone String redis集群的部署模式,支持standalone、master-replica、cluster。默认为standalone。 retry-count 否 5 Integer 连接redis集群的尝试次数。 connection-timeout-millis 否 10000 Integer 尝试连接redis集群时的最大超时时间。 commands-timeout-millis 否 2000 Integer 等待操作完成响应的最大时间。 rebalancing-timeout-millis 否 15000 Integer redis集群失败时的休眠时间。 scan-keys-count 否 1000 Integer 每次扫描时读取的数量。 default-score 否 0 Double 当data-type设置为“sorted-set”时的默认score。 deserialize-error-policy 否 fail-job Enum 数据解析失败时的处理方式。枚举类型,包含以下值: fail-job:作业失败 skip-row:跳过当前数据 null-field:设置当前数据为null skip-null-values 否 true Boolean 是否跳过null。 pwd_auth_name 否 无 String DLI侧创建的Password类型的跨源认证名称。 使用跨源认证则无需在作业中配置账号和密码。
  • 前提条件 创建该作业前,需要建立DLI和Redis的增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 Flink跨源开发场景中直接配置跨源认证信息存在密码泄露的风险,优先推荐您使用DLI提供的跨源认证。 跨源认证简介及操作方法请参考跨源认证简介。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (',' watermark for rowtime_column_name as watermark-strategy_expression) ,PRIMARY KEY (attr_name, ...) NOT ENFORCED ) with ( 'connector' = 'redis', 'host' = '' );
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 CREATE TABLE sink_table ( name string, num INT, p_day string, p_hour string ) partitioned by (p_day, p_hour) WITH ( 'connector' = 'filesystem', 'path' = 'obs://*** ', 'format' = 'parquet', 'auto-compaction' = 'true' );
  • 示例二 使用datagen随机生成数据写入obs的bucketName桶下的fileName目录中。文件生成时间与checkpoint有关,达到checkpoint间隔或达到100MB时,生成新文件。 create table orders( name string, num INT ) with ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.name.kind' = 'random', 'fields.name.length' = '5' ); CREATE TABLE sink_table ( name string, num INT ) WITH ( 'connector' = 'filesystem', 'path' = 'obs://bucketName/fileName', 'format' = 'csv', 'sink.rolling-policy.file-size'='128m', 'sink.rolling-policy.rollover-interval'='30 min', 'auto-compaction'='true', 'compaction.file-size'='100m' ); INSERT into sink_table SELECT * from orders;
  • 示例一 使用datagen随机生成数据写入obs的bucketName桶下的fileName目录中。文件生成时间与checkpoint无关,达到30min或128MB时,生成新文件。 create table orders( name string, num INT ) with ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.name.kind' = 'random', 'fields.name.length' = '5' ); CREATE TABLE sink_table ( name string, num INT ) WITH ( 'connector' = 'filesystem', 'path' = 'obs://bucketName/fileName', 'format' = 'csv', 'sink.rolling-policy.file-size'='128m', 'sink.rolling-policy.rollover-interval'='30 min' ); INSERT into sink_table SELECT * from orders;
  • 参数说明 表1 参数说明 参数 是否必选 默认值 类型 说明 connector 是 无 String 固定位filesystem。 path 是 无 String OBS路径。 format 是 无 String 文件格式。 支持csv、parquet格式。 sink.rolling-policy.file-size 否 128MB MemorySize 单个part文件最大大小,超过该数值会滚动产生新文件。 说明: RollingPolicy 定义了何时关闭给定的In-progress Part文件,并将其转换为Pending状态,然后在转换为Finished状态。 Finished状态的文件,可供查看并且可以保证数据的有效性,在出现故障时不会恢复。 在STREAMING模式下,滚动策略结合Checkpoint间隔(到下一个Checkpoint成功时,文件的Pending状态才转换为Finished状态)共同控制Part文件对下游readers是否可见以及这些文件的大小和数量。 sink.rolling-policy.rollover-interval 否 30 min Duration 单个Part文件处于打开状态的最长时间,超过该时间会滚动产生新文件(默认值30分钟,以避免产生大量小文件)。检查频率是通过sink.rolling-policy.check-interval参数控制的。 说明: 该参数数字与单位之间必须要有空格。 支持的时间单位包括: d,h,min,s,ms等。 对于bulk格式的文件(parquet、orc、avro),checkpoint的时间间隔也会控制单个part文件打开的最长时间。 sink.rolling-policy.check-interval 否 1 min Duration 基于时间的滚动策略的检查间隔。 该属性控制了基于sink.rolling-policy.rollover-interval属性检查文件是否该被滚动的检查频率。 auto-compaction 否 false Boolean 在流式 sink 中是否开启自动合并功能。数据首先会被写入临时文件。当checkpoint完成后,该checkpoint产生的临时文件会被合并。 compaction.file-size 否 `sink.rolling-policy.file-size`的大小 MemorySize 合并目标文件大小,默认值为滚动文件大小。 说明: 只有在同个checkpoint内的文件会被合并,因此最终文件的数量至少等于checkpoint的数量。 如果合并时间较长,可能会引起反压,延长checkpoint所需时间。 开启该功能后,checkpoint时会产生最终文件,并打开新的文件接收下个checkpoint产生的数据。
  • 功能描述 FileSystem sink用于将数据输出到分布式文件系统HDFS或者 对象存储服务 OBS等文件系统。适用于数据转储、大数据分析、备份或活跃归档、深度或冷归档等场景。 考虑到输入流可以是无界的,每个桶中的数据被组织成有限大小的Part文件。完全可以配置为基于时间的方式往桶中写入数据,比如可以设置每个小时的数据写入一个新桶中。即桶中将包含一个小时间隔内接收到的记录。 桶目录中的数据被拆分成多个Part文件。对于相应的接收数据的桶的Sink的每个Subtask,每个桶将至少包含一个Part文件。将根据配置的滚动策略来创建其他Part文件。对于Row Formats默认的策略是根据Part文件大小进行滚动,需要指定文件打开状态最长时间的超时以及文件关闭后的非活动状态的超时时间。对于Bulk Formats在每次创建Checkpoint时进行滚动,并且用户也可以添加基于大小或者时间等的其他条件。 在STREAMING模式下使用FileSink需要开启Checkpoint功能。Part文件只在Checkpoint成功时生成。如果没有开启Checkpoint功能,文件将永远停留在in-progress或者pending的状态,并且下游系统将不能安全读取该文件数据。 sink end算子的接受记录数为checkpoint的个数,非实际的发送数据,实际发送数据量请参考streaming-writer或StreamingFileWriter算子的记录数。
  • 示例 图片分类预测我们采用Mnist数据集作为流的输入,通过加载预训练的deeplearning4j模型或者keras模型,可以实时预测每张图片代表的数字。 1 2 3 4 5 6 CREATE SOURCE STREAM Mnist( image Array[TINYINT] ) SELECT DL_IMAGE_MAX_PREDICTION_INDEX(image, 'your_dl4j_model_path', false) FROM Mnist SELECT DL_IMAGE_MAX_PREDICTION_INDEX(image, 'your_keras_model_path', true) FROM Mnist SELECT DL_IMAGE_MAX_PREDICTION_INDEX(image, 'your_keras_model_config_path', 'keras_weights_path') FROM Mnist 文本分类预测我们采用一组新闻标题数据作为流的输入,通过加载预训练的deeplearning4j模型或者keras模型,可以实时预测每个新闻标题所属的类别,比如经济,体育,娱乐等。 1 2 3 4 5 6 7 CREATE SOURCE STREAM News( title String ) SELECT DL_TEXT_MAX_PREDICTION_INDEX(title, 'your_dl4j_word2vec_model_path','your_dl4j_model_path', false) FROM News SELECT DL_TEXT_MAX_PREDICTION_INDEX(title, 'your_keras_word2vec_model_path','your_keras_model_path', true) FROM News SELECT DL_TEXT_MAX_PREDICTION_INDEX(title, 'your_dl4j_model_path', false) FROM New SELECT DL_TEXT_MAX_PREDICTION_INDEX(title, 'your_keras_model_path', true) FROM New
  • 参数说明 表1 参数说明 参数 是否必选 说明 field_name 是 数据在数据流中的字段名。 图像分类中field_name类型需声明为ARRAY[TINYINT]。 文本分类中field_name类型需声明为String。 model_path 是 模型存放在OBS上的完整路径,包括模型结构和模型权值。 is_dl4j_model 是 是否是deeplearning4j的模型。 true代表是deeplearning4j,false代表是keras模型。 keras_model_config_path 是 模型结构存放在OBS上的完整路径。在keras中通过model.to_json()可得到模型结构。 keras_weights_path 是 模型权值存放在OBS上的完整路径。在keras中通过model.save_weights(filepath)可得到模型权值。 word2vec_path 是 word2vec模型存放在OBS上的完整路径。
  • 语法格式 1 2 3 4 5 6 7 -- 图像分类, 返回预测图像分类的类别id DL_IMAGE_MAX_PREDICTION_INDEX(field_name, model_path, is_dl4j_model) DL_IMAGE_MAX_PREDICTION_INDEX(field_name, keras_model_config_path, keras_weights_path) -- 适用于Keras模型 -- 文本分类,返回预测文本分类的类别id DL_TEXT_MAX_PREDICTION_INDEX(field_name, model_path, is_dl4j_model) -- 采用默认word2vec模型 DL_TEXT_MAX_PREDICTION_INDEX(field_name, word2vec_path, model_path, is_dl4j_model) 模型及配置文件等需存储在用户的OBS中,路径格式为"obs://your_ak:your_sk@obs.your_obs_region.xxx.com:443/your_model_path"。例如你的模型存放在OBS上,桶名为dl_model,文件名为model.h5,则路径填写为"obs://your_ak:your_sk@obs.xxx.com:443/dl_model/model.h5"。
  • 注意事项 暂不支持通过python写UDF、UDTF、UDAF自定义函数。 如果使用IntelliJ IDEA工具对创建的自定义函数进行调试,则需要在IDEA上勾选:include dependencies with "Provided" scope,否则本地调试运行时会加载不到pom文件中的依赖包。 具体操作以IntelliJ IDEA版本2020.2为例,参考如下: 在IntelliJ IDEA界面,选择调试的配置文件,单击“Edit Configurations”。 在“Run/Debug Configurations”界面,勾选:include dependencies with "Provided" scope。 单击“OK”完成应用配置。
  • 按列GROUP BY 功能描述 按列进行分组操作。 语法格式 1 2 3 4 SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] 语法说明 GROUP BY:按列可分为单列GROUP BY与多列GROUP BY。 单列GROUP BY:指GROUP BY子句中仅包含一列。 多列GROUP BY:指GROUP BY子句中不止一列,查询语句将按照GROUP BY的所有字段分组,所有字段都相同的记录将被放在同一组中。 注意事项 GroupBy在流处理表中会产生更新结果 示例 根据score及name两个字段对表student进行分组,并返回分组结果。 1 2 insert into temp SELECT name,score, max(score) FROM student GROUP BY name,score;
共100000条