华为云用户手册

  • JSON_VAL函数使用说明 语法 STRING JSON_VAL(STRING json_string, STRING json_path) 表2 参数说明 参数 数据类型 说明 json_string STRING 需要解析的JSON对象,使用字符串表示。 json_path STRING 解析JSON的路径表达式,使用字符串表示。 目前path支持如下表达式参考下表表3。 表3 json_path参数支持的表达式 表达式 说明 $ 根对象 [] 数组下标 * 数组通配符 . 取子元素 示例 测试输入数据。 测试数据源kafka,具体消息内容参考如下: "{name:James,age:24,gender:male,grade:{math:95,science:[80,85],english:100}}" "{name:James,age:24,gender:male,grade:{math:95,science:[80,85],english:100}]" 使用JSON_VAL编写SQL create table kafkaSource( message STRING ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'topic-swq', 'connector.properties.bootstrap.servers' = 'xxx.xxx.xxx.xxx:9092,yyy.yyy.yyy:9092,zzz.zzz.zzz.zzz:9092', 'connector.startup-mode' = 'earliest-offset', 'format.field-delimiter' = '|', 'format.type' = 'csv' ); create table kafkaSink( message1 STRING, message2 STRING, message3 STRING, message4 STRING, message5 STRING, message6 STRING ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'topic-swq-out', 'connector.properties.bootstrap.servers' = 'xxx.xxx.xxx.xxx:9092,yyy.yyy.yyy:9092,zzz.zzz.zzz.zzz:9092', 'format.type' = 'json' ); INSERT INTO kafkaSink SELECT JSON_VAL(message,""), JSON_VAL(message,"$.name"), JSON_VAL(message,"$.grade.science"), JSON_VAL(message,"$.grade.science[*]"), JSON_VAL(message,"$.grade.science[1]"), JSON_VAL(message,"$.grade.dddd") FROM kafkaSource; 查看输出结果 {"message1":null,"message2":"swq","message3":"[80,85]","message4":"[80,85]","message5":"85","message6":null} {"message1":null,"message2":null,"message3":null,"message4":null,"message5":null,"message6":null}
  • 字符串函数 数据湖探索 DLI )提供了丰富的字符串函数,用于处理和转换字符串数据。这些函数包括拼接、大小写转换、截取子串、替换、正则匹配、编码解码、格式转换等。此外,还支持字符串长度计算、位置查找、填充、反转等功能,以及从JSON字符串中提取值的JSON_VAL函数。这些功能广泛应用于数据清洗、文本处理和数据分析场景,为开发者提供强大的工具支持。 字符串函数简介请参考表1,更多内容参考Apache Flink。 表1 字符串函数 函数 返回类型 描述 string1 || string2 STRING 返回两个字符串的拼接 CHAR_LENGTH(string) CHARACTER_LENGTH(string) INT 返回字符串中的字符数量 UPPER(string) STRING 返回字符串的大写形式 LOWER(string) STRING 返回字符串的小写形式 POSITION(string1 IN string2) INT 返回第一个字符串在第二个字符串中首次出现的位置。若第一个字符串不存在与第二个字符串,则返回0 TRIM([ BOTH | LEADING | TRAILING ] string1 FROM string2) STRING 去除string2字符串的首尾(或首部、或尾部)的string1字符串 LTRIM(string) STRING 返回去除首部空格后的字符串 例如LTRIM(' This is a test String.') 返回"This is a test String." RTRIM(string) STRING 返回去除尾部空格后的字符串 例如RTRIM('This is a test String. ') 返回"This is a test String." REPEAT(string, integer) STRING 返回integer个string连接后的字符串 例如REPEAT('This is a test String.', 2) 返回"This is a test String.This is a test String." REGEXP_REPLACE(string1, string2, string3) STRING 用string3代替string1中的符合正则表达式string2的字符串,并返回替换后的string1字符串 例如REGEXP_REPLACE('foobar', 'oo|ar', '') 返回"fb" REGEXP_REPLACE('ab\ab', '\\', 'e')返回"abeab" OVERLAY(string1 PLACING string2 FROM integer1 [ FOR integer2 ]) STRING 用string2代替string1中的字符串,从integer1开始,替换长度为integer2,并返回替换后的string1字符串 integer2默认为string2的长度 例如OVERLAY('This is an old string' PLACING ' new' FROM 10 FOR 5)返回"This is a new string" SUBSTRING(string FROM integer1 [ FOR integer2 ]) STRING 返回string中从integer1位置开始的长度为integer2的子字符串。若integer2未配置,则默认返回从integer1开始到末尾的子字符串 REPLACE(string1, string2, string3) STRING 用string3代替string1中的string2后的字符串,并返回替换后的string1字符串 例如:REPLACE('hello world', 'world', 'flink') 返回"hello flink" REPLACE('ababab', 'abab', 'z') 返回"zab" REPLACE('ab\\ab', '\\', 'e')返回"abeab" REGEXP_EXTRACT(string1, string2[, integer]) STRING 使用正则表达式string2匹配抽取字符串string1中的第integer个字串,integer从1开始,正则匹配提取。 若参数为 NULL或者正则不合法,则返回NULL。 例如REGEXP_EXTRACT('foothebar', 'foo(.*?)(bar)', 2)" 返回"bar" INITCAP(string) STRING 返回将字符串的首字符大写其余字符转为小写后的字符串 CONCAT(string1, string2,...) STRING 返回将两个或多个字符串拼接后的新字符串。 例如 CONCAT('AA', 'BB', 'CC') 返回"AABBCC" CONCAT_WS(string1, string2, string3,...) STRING 返回将每个参数和第一个参数指定的分隔符依次连接到一起组成的字符串。若string1是null,则返回null。若其他参数为null,在执行拼接过程中跳过取值为null的参数 例如CONCAT_WS('~', 'AA', NULL, 'BB', '', 'CC') 返回"AA~BB~~CC" LPAD(string1, integer, string2) STRING 将string2字符串拼接到string1字符串的左端,直到新的字符串达到指定长度integer为止 任意参数为null时,返回null 若integer为负数,则返回null 若integer不大于string1的长度,则返回string1裁剪为integer长度的字符串 例如LPAD('hi',4,'??') 返回"??hi" LPAD('hi',1,'??') 返回"h" RPAD(string1, integer, string2) STRING 将string2字符串拼接到string1字符串的右端,直到新的字符串达到指定长度integer为止 任意参数为null时,返回null 若integer为负数,则返回null 若integer不大于string1的长度,则返回string1裁剪为integer长度的字符串 例如RPAD('hi',4,'??') 返回 "hi??" RPAD('hi',1,'??') 返回"h" FROM_BASE64(string) STRING 将base64编码的字符串str解析成对应字符串 若字符串为null,则返回null 例如FROM_BASE64('aGVsbG8gd29ybGQ=') 返回"hello world" TO_BASE64(string) STRING 将字符串基于base64编码 若字符串为null,则返回null 例如TO_BASE64('hello world') 返回"aGVsbG8gd29ybGQ=" ASCII(string) INT 返回字符串的第一个字符的ASCII值 若字符串为null,则返回null 例如ascii('abc') 返回97 ascii(CAST(NULL AS VARCHAR)) 返回NULL CHR(integer) STRING 将ASCII码转换为字符 若integer大于255,则计算出integer除以255的余数,并将余数作为ASCII码值 若integer为null,则返回null chr(97) 返回a chr(353) 返回a DECODE(binary, string) STRING 使用提供的字符集string解码参数binary,字符集可以为'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16' 若任意参数为null,则返回null ENCODE(strinh1, string2) STRING 使用提供的字符集string2编码字符串string1,字符集可以为'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16' 若任意参数为null,则返回null INSTR(string1, string2) INT 返回string2在string1中首次出现的位置 若有参数为null,则返回null LEFT(string, integer) STRING 返回最左边的integer个字符 若integer为负数,则返回空 若存在参数为null,则返回null RIGHT(string, integer) STRING 返回最右侧的integer个字符 若integer为负数,则返回空 若存在参数为null,则返回null LOCATE(string1, string2[, integer]) INT 返回string1在string2的位置integer之后首次出现的位置 若string1在string2的位置integer之后不存在,则返回0 若integer不存在,则默认为0 若存在参数为null,则返回null PARSE_URL(string1, string2[, string3]) STRING 返回URL string1中指定的部分解析后的值 string2为'HOST'、'PATH'、'QUERY'、'REF'、'PROTOCOL'、'AUTHORITY'、'FILE'或'USERINFO' 若存在参数为null,则返回null 若string2为QUERY,也可以指定QUERY中的key为string3 例如: parse_url('http://facebook.com/path1/p.php?k1=v1&k2=v2#Ref1', 'HOST')返回 'facebook.com' parse_url('http://facebook.com/path1/p.php?k1=v1&k2=v2#Ref1', 'QUERY', 'k1') 返回'v1' REGEXP(string1, string2) BOOLEAN 对指定的字符串执行一个正则表达式搜索,并返回一个BOOLEAN值表示是否找到指定的匹配模式。若找到,则返回TRUE。其中string1表示指定的字符串,string2表示正则表达式 若存在参数为null,则返回null REVERSE(string) STRING 反转字符串,返回字符串值的相反顺序。 若存在参数为null,则返回null SPLIT_INDEX(string1, string2, integer1) STRING 以string2作为分隔符,将字符串string1分割成若干段,取其中的第integer1段。integer1从0开始 若integer1为负数,则返回null 如果任一参数为null,则返回null STR_TO_MAP(string1[, string2, string3]]) MAP 使用string2分隔符将string1分割成K-V对,并使用string3分隔每个K-V对,组装成MAP返回 string2默认为',' string3默认为'=' SUBSTR(string[, integer1[, integer2]]) STRING 截取从位置integer1开始,长度为integer2的子串,并返回 若为指定integer2,翻截取到字符串结尾 JSON_VAL(STRING json_string, STRING json_path) STRING 从json形式的字符串json_string中提取指定json_path的值。具体函数使用可以参考JSON_VAL函数使用说明说明。 说明: 以下规则优先级按照顺序从高到低。 不允许json_string和json_path为NULL json_string格式必须为合法的json串,否则函数返回NULL json_string为空字符串,则函数返回空字符串 json_path为空字串或路径不存在,则函数返回NULL
  • 前提条件 该场景作业需要运行在DLI的独享队列上,因此要与Doris建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《 数据湖 探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 如果使用 MRS Doris,请在增强型跨源的主机信息中添加MRS集群所有节点的主机ip信息。 详细操作请参考《数据湖探索用户指南》中的“修改主机信息”章节描述。 集群未启用Kerberos认证(普通模式) 使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 集群未启用Kerberos认证(普通模式) Doris的表名是区分大小写。 使用cloudTable的doris时,'fenodes'字段值的端口请用8030,如'xx:8030'。同时安全组请放开端口8030,8040,9030。 开启HTTPS后,需要在创建表的with子句中添加如下配置参数: 'doris.enable.https' = 'true' 'doris.ignore.https.ca' = 'true' 请在Flink“作业编辑”页面选择“运行参数配置”,选择“开启Checkpoint”,否则会导致Doris结果表无法写入数据,且写入Doris的延时取决于设置的Checkpoint的间隔时间。
  • 示例 该示例是从Datagen数据源中生成数据,并将结果写入到Doris结果表中。 参考增强型跨源连接,在DLI上根据Doris所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。参考“修改主机信息”章节描述,在增强型跨源中增加MRS的主机信息。 设置Doris的安全组,添加入向规则使其对Flink的队列网段放通。分别根据Doris的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 参考测试地址连通性。 参考MRS Doris使用指南,创建doris表,创建语句如下: CREATE TABLE IF NOT EXISTS dorisdemo ( `user_id` varchar(10) NOT NULL, `city` varchar(10), `age` int, `gender` int ) DISTRIBUTED BY HASH(`user_id`) BUCKETS 10 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本将Datagen作为数据源,将数据写入到Doris作为结果表中。 create table student_datagen_source( `user_id` String NOT NULL, `city` String, `age` int, `gender` int ) with ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.user_id.kind' = 'random', 'fields.user_id.length' = '7', 'fields.city.kind' = 'random', 'fields.city.length' = '7' ); CREATE TABLE dorisDemo ( `user_id` String NOT NULL, `city` String, `age` int, `gender` int ) with ( 'connector' = 'doris', 'fenodes' = 'FE_IP:PORT', 'table.identifier' = 'demo.dorisdemo', 'username' = 'dorisUser', 'password' = 'dorisPassword', 'sink.label-prefix' = 'demo', 'sink.enable-2pc' = 'true', 'sink.buffer-count' = '10' ); insert into dorisDemo select * from student_datagen_source 查看doris结果表是否已成功写入数据。 user_id city age gender 50aff04 93406c5 12 1 681a230 1f27d06 16 1 006eff4 3521ded 18 0
  • 语法格式 create table dorisSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'doris', 'fenodes' = 'FE_IP:PORT,FE_IP:PORT,FE_IP:PORT', 'table.identifier' = 'database.table', 'username' = 'dorisUsername', 'password' = 'dorisPassword' );
  • 示例 示例1:该示例是从DMS Kafka数据源中读取数据,并写入到Print结果表中。 参考增强型跨源连接,根据Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。 设置Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Kafka的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,提交运行作业。 注意:创建作业时,在作业编辑界面的“运行参数”处,“Flink版本”选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE upsertKafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkAddress2:KafkaPort', 'key.format' = 'csv', 'value.format' = 'json' ); CREATE TABLE printSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'print' ); INSERT INTO printSink SELECT * FROM upsertKafkaSource; 向Kafka中的指定topic中插入如下数据(注意:kafka插入数据时请指定key)。 {"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202303251505050001", "order_channel":"appshop", "order_time":"2023-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2023-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202303251202020001", "order_channel":"miniAppShop", "order_time":"2023-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2023-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330111"} 查看taskmanager的out文件,数据结果参考如下: +I(202303251202020001,miniAppShop,2023-03-2512:02:02,60.0,60.0,2023-03-2512:03:00,0002,Bob,330110) +I(202303251505050001,appshop,2023-03-25 15:05:05,500.0,400.0,2023-03-2515:10:00,0003,Cindy,330108) -U(202303251202020001,miniAppShop,2023-03-2512:02:02,60.0,60.0,2023-03-2512:03:00,0002,Bob,330110) +U(202303251202020001,miniAppShop,2023-03-2512:02:02,60.0,60.0,2023-03-2512:03:00,0002,Bob,330111)
  • 参数说明 表2 参数说明 参数 是否必选 默认参数 数据类型 说明 connector 是 无 String connector类型,对于upsert kafka连接器,需配置为'upsert-kafka'。 topic 是 无 String Kafka topic名。 properties.bootstrap.servers 是 无 String Kafka brokers地址,以逗号分隔。 key.format 是 无 String 用于对Kafka消息中key部分序列化和反序列化的格式。key字段由PRIMARY KEY语法指定。支持的格式如下: csv json avro 请参考Format页面以获取更多详细信息和格式参数。 key.fields-prefix 否 无 String 为键格式的所有字段定义自定义前缀,以避免与值格式的字段发生名称冲突。 默认情况下,前缀为空。如果定义了自定义前缀,则表架构和'key.fields'都将使用前缀名称。在构造密钥格式的数据类型时,将删除前缀,并在密钥格式中使用无前缀的名称。请注意,此选项要求'value.fields-include' 必须设置为'EXCEPT_KEY'。 value.format 是 无 String 用于对 Kafka消息中 value 部分序列化和反序列化的格式。支持的格式: csv json avro 请参考Format页面以获取更多详细信息和格式参数。 value.fields-include 是 ALL String 控制哪些字段应该出现在值中。取值范围如下: ALL:消息的value部分将包含schema的所有字段,包括定义中键的字段。 EXCEPT_KEY:记录的value部分包含schema的所有内容,定义为主键的字段除外。 properties.* 否 无 String 该选项可以传递任意的Kafka参数。 “properties.”后的后缀名必须匹配定义在 kafka参数文档中的参数名。 Flink会自动移除选项名中的 "properties." 前缀,并将转换后的键名以及值传入KafkaClient。 例如:您可以通过 'properties.allow.auto.create.topics' = 'false' 来禁止自动创建 topic。 但是'key.deserializer' 和 'value.deserializer' 是不允许通过该方式传递参数,因为Flink会重写这些参数的值。 sink.parallelism 否 无 Integer 定义upsert-kafka sink 算子的并行度。默认情况下,由框架确定并行度,与上游连接算子的并行度保持一致。 sink.buffer-flush.max-rows 否 0 Integer 缓存刷新前,最多能缓存的记录条数。 当sink收到很多同key上的更新时,缓存将保留同 key 的最后一条记录,因此sink缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。可以通过设置为'0'来禁用它。 默认情况下,该选项是未开启的。如果要开启 sink 缓存,需要同时设置'sink.buffer-flush.max-rows'和'sink.buffer-flush.interval'两个选项为大于零的值。 sink.buffer-flush.interval 否 0 Duration 缓存刷新的间隔时间,超过该时间后异步线程将刷新缓存数据。单位可以为毫秒(ms)、秒(s)、分钟(min)或小时(h)。例如'sink.buffer-flush.interval'='10 ms'。 默认情况下,该选项是未开启的。如果要开启 sink 缓存,需要同时设置'sink.buffer-flush.max-rows'和'sink.buffer-flush.interval'两个选项为大于零的值。
  • 功能描述 Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统,具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。Upsert Kafka 连接器支持以upsert方式从Kafka topic中读取数据并将数据写入Kafka topic。表类型支持源表和结果表。 作为source,upsert-kafka 连接器生产changelog流,其中每条数据记录代表一个更新或删除事件。 数据记录中的value被解释为同一key的最后一个value的UPDATE,如果有这个key(如果不存在相应的key,则该更新被视为INSERT)。用表来类比,changelog 流中的数据记录被解释为UPSERT,也称为INSERT/UPDATE,因为任何具有相同key的现有行都被覆盖。另外,value为空的消息将会被视作为DELETE消息。 作为sink,upsert-kafka连接器可以消费changelog流。它会将INSERT/UPDATE_AFTER数据作为正常的Kafka消息写入,并将DELETE数据以value为空的Kafka消息写入(表示对应 key 的消息被删除)。Flink将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。 表1 支持类别 类别 详情 支持表类型 源表、结果表
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 Upsert Kafka 始终以upsert方式工作,并且需要在DDL中定义主键。在具有相同主键值的消息按序存储在同一个分区的前提下,在 changlog source 定义主键意味着 在物化后的 changelog 上主键具有唯一性。定义的主键将决定哪些字段出现在Kafka消息的key中。 由于该连接器以 upsert 的模式工作,该连接器作为 source 读入时,可以确保具有相同主键值下仅最后一条消息会生效。 数据类型的使用,请参考Format章节。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 12 create table kafkaTable( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'upsert-kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'key.format' = '', 'value.format' = '' );
  • 示例 从Kafka源表中读取数据,将Redis表作为维表,并将二者生成的宽表信息写入Kafka结果表中,其具体步骤如下: 参考增强型跨源连接,根据Redis和Kafka所在的虚拟私有云和子网创建相应的增强型跨源,并绑定所要使用的Flink弹性资源池。 设置Redis和Kafka的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性根据Redis的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 登录Redis客户端,通过如下命令向Redis发送如下数据: HMSET 330102 area_province_name a1 area_province_name b1 area_county_name c1 area_street_name d1 region_name e1 HMSET 330106 area_province_name a1 area_province_name b1 area_county_name c2 area_street_name d2 region_name e1 HMSET 330108 area_province_name a1 area_province_name b1 area_county_name c3 area_street_name d3 region_name e1 HMSET 330110 area_province_name a1 area_province_name b1 area_county_name c4 area_street_name d4 region_name e1 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,提交运行作业。该作业脚本将Kafka为数据源,Redis作为维表,数据写入到Kafka结果表中。 如下脚本中的加粗参数请根据实际环境修改。 CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaSourceTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); --创建地址维表 create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string, primary key (area_id) not enforced -- redis的key ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'hash', 'deploy-mode' = 'master-replica' ); --根据地址维表生成详细的包含地址的订单信息宽表 create table order_detail( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) with ( 'connector' = 'kafka', 'topic' = 'kafkaSinkTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'format' = 'json' ); insert into order_detail select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name, area.area_id, area.area_province_name, area.area_city_name, area.area_county_name, area.area_street_name, area.region_name from orders left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id; 连接Kafka集群,向Kafka的source topic中插入如下测试数据: {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"appShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} 连接Kafka集群,在Kafka的sink topic读取数据,结果数据参考如下: {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106","area_province_name":"a1","area_city_name":"b1","area_county_name":"c2","area_street_name":"d2","region_name":"e1"} {"order_id":"202103251202020001","order_channel":"miniAppShop","order_time":"2021-03-25 12:02:02","pay_amount":60.0,"real_pay":60.0,"pay_time":"2021-03-25 12:03:00","user_id":"0002","user_name":"Bob","area_id":"330110","area_province_name":"a1","area_city_name":"b1","area_county_name":"c4","area_street_name":"d4","region_name":"e1"} {"order_id":"202103251505050001","order_channel":"appshop","order_time":"2021-03-25 15:05:05","pay_amount":500.0,"real_pay":400.0,"pay_time":"2021-03-25 15:10:00","user_id":"0003","user_name":"Cindy","area_id":"330108","area_province_name":"a1","area_city_name":"b1","area_county_name":"c3","area_street_name":"d3","region_name":"e1"}
  • 语法格式 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (',' watermark for rowtime_column_name as watermark-strategy_expression) ,PRIMARY KEY (attr_name, ...) NOT ENFORCED ) with ( 'connector' = 'redis', 'host' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 说明 connector 是 无 String connector类型,需配置为'redis'。 host 是 无 String redis连接地址。 port 否 6379 Integer redis连接端口。 password 否 无 String redis认证密码。 namespace 否 无 String redis key的namespace delimiter 否 : String redis的key和namespace之间的分隔符。 data-type 否 hash String redis的数据类型,有下列选项 hash list set sorted-set string data-type取值约束详见data-type取值约束说明。 schema-syntax 否 fields String redis的schema语义,包含以下值: fields:适用于所有数据类型 fields-scores:适用于sorted set数据类型 array:适用于list、set、sorted set数据类型 array-scores:适用于sorted set数据类型 map:适用于hash、sorted set数据类型 schema-syntax取值约束详见schema-syntax取值约束说明。 deploy-mode 否 standalone String redis集群的部署模式,支持standalone、master-replica、cluster,默认standalone。 retry-count 是 5 Integer 设置每个连接请求的队列大小。如果超过队列大小,则命令调用将导致RedisException。将requestQueueSize设置为较低的值将导致在过载期间或连接处于断开状态时更早出现异常。更高的值意味着达到边界需要更长的时间,但可能会有更多的请求排队,并使用更多的堆空间。默认请设置为2147483647。 connection-timeout-millis 否 10000 Integer 尝试连接redis集群时的最大超时时间。 commands-timeout-millis 否 2000 Integer 等待操作完成响应的最大时间。 rebalancing-timeout-millis 否 15000 Integer redis集群失败时的休眠时间。 scan-keys-count 否 1000 Integer 每次扫描时读取的数量。 default-score 否 0 Double 当data-type设置为“sorted-set”数据类型的默认score。 deserialize-error-policy 否 fail-job Enum 数据解析失败时的处理方式。 枚举类型,包含以下值: fail-job:作业失败 skip-row:跳过当前数据 null-field:设置当前数据为null skip-null-values 否 true Boolean 是否跳过null。 lookup.async 否 false Boolean 作为redis维表时,是否使用异步 I/O。 lookup.parallelism 否 无 int 定义查找连接运算符的自定义并行度。默认情况下,如果未定义此选项,则规划器将通过考虑全局配置(如果定义了选项“lookup.parallelism”)来推导并行度,否则将考虑输入运算符的并行度。 lookup.batch.interval 否 1s Duration 批量查找连接可以使用最大延迟来缓冲输入记录。批量查找连接可以使用最大延迟来缓冲输入记录。 lookup.batch.size 否 100L long 可以缓冲的最大输入记录数,以便进行批量查找连接。 lookup.batch 否 false Boolean 指定是否启用批量查找优化。如果启用,用户必须同时设置 lookup.batch.interval 和 lookup.batch.size 选项。此外,由于底层批处理间隔干扰机制的实现,用户必须在 flink 配置中显式启用 table.exec.batch-lookup.enabled' 选项 ignore-retractions 否 false Boolean 连接器应忽略更新插入/撤回流模式下的收回消息。 key-column 否 无 String Redis 表schema的key
  • Hudi Hudi是一种数据湖的存储格式,在Hadoop文件系统之上提供了更新数据和删除数据的能力以及消费变化数据的能力。支持多种计算引擎,提供IUD接口,在HDFS的数据集上提供了插入更新和增量拉取的功能。 表1 支持类别 类别 详情 支持Flink表类型 源表、结果表 支持hudi表类型 MOR表,COW表 支持读写类型 批量读,批量写,流式读,流式写 Hudi源表 Hudi结果表 父主题: Connector列表
  • 参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 说明 connector 是 无 String connector类型,需配置为'redis'。 host 是 无 String redis连接地址。 port 否 6379 Integer redis连接端口。 password 否 无 String redis认证密码。 namespace 否 无 String redis key的namespace。 例如设置该值为"person",假设key为"jack"则redis中会是"person:jack"。 delimiter 否 : String redis的key和namespace之间的分隔符。 data-type 否 hash String redis的数据类型,有下列选项,与redis的数据类型相对应: hash list set sorted-set string data-type取值约束详见data-type取值约束说明。 schema-syntax 否 fields String redis的schema语义,包含以下值: fields:适用于所有数据类型。fields类型是指可以设置多个字段,写入时会取每个字段的值。 fields-scores:适用于sorted set数据类型,表示对每个字段都设置一个字段作为其独立的score。 array:适用于list、set、sorted set数据类型 array-scores:适用于sorted set数据类型 map:适用于hash、sorted set数据类型。 schema-syntax取值约束详见schema-syntax取值约束说明。 deploy-mode 否 standalone String redis集群的部署模式,支持standalone、master-replica、cluster,默认standalone。 该值可参考redis集群的实例类型介绍。 retry-count 否 5 Integer 连接redis集群的尝试次数。 connection-timeout-millis 否 10000 Integer 尝试连接redis集群时的最大超时时间。 commands-timeout-millis 否 2000 Integer 等待操作完成响应的最大时间。 rebalancing-timeout-millis 否 15000 Integer redis集群失败时的休眠时间。 default-score 否 0 Double 当data-type设置为“sorted-set”数据类型的默认score。 ignore-retraction 否 false Boolean 是否忽略retract消息。 skip-null-values 否 true Boolean 是否跳过null。如果为false,则设置为字符串"null"。 ignore-retractions 否 false Boolean 连接器应忽略更新插入/撤回流模式下的收回消息。 key-column 否 无 String Redis 表schema的key sink.delivery-guarantee 否 at-least-once String exactly-once: 记录只传送一次,在故障转移方案下也是如此。如果要生成完整的exactly-once管道,需要源和接收器支持exactly-once,并且已正确配置。 at-least-once: 确保传递记录,但可能会多次传递同一记录。通常,这种比exactly-once模式更快。 none: 记录将尽最大努力交付。这通常是处理记录的最快方法,但可能会发生记录丢失或重复的情况。 sink.parallelism 否 无 int 定义接收器的自定义并行度。默认情况下,如果未定义此选项,则规划器将通过考虑全局配置来单独派生每个语句的并行度。 key-ttl-mode 否 no-ttl String key-ttl-mode是开启Redis sink TTL的功能参数,key-ttl-mode的限制为:no-ttl、expire-msec、expire-at-date、expire-at-timestamp。 no-ttl:不设置过期时间。 expire-msec:设置key多久过期,参数为long类型字符串,单位为毫秒。 expire-at-date:设置key到某个时间点过期,参数为UTC时间。 expire-at-timestamp:设置key到某个时间点过期,参数为时间戳。 key-ttl 否 无 String key-ttl是key-ttl-mode的补充参数,有以下几种参数值: 当key-ttl-mode取值为no-ttl时,不需要配置此参数。 当key-ttl-mode取值为expire-msec时,需要配置为可以解析成Long型的字符串。例如5000,表示5000ms后key过期。 当key-ttl-mode取值为expire-at-date时,需要配置为Date类型字符串,例如2011-12-03T10:15:30,表示到期时间为北京时间2011-12-03 18:15:30。 当key-ttl-mode取值为expire-at-timestamp时,需要配置为timestamp类型字符串,单位为毫秒。例如1679385600000,表示到期时间为2023-03-21 16:00:00。
  • 语法格式 1 2 3 4 5 6 7 8 9 10 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (',' watermark for rowtime_column_name as watermark-strategy_expression) ,PRIMARY KEY (attr_name, ...) NOT ENFORCED ) with ( 'connector' = 'redis', 'host' = '' );
  • 参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 说明 connector 是 无 String connector类型,需配置为'redis'。 host 是 无 String redis连接地址。 port 否 6379 Integer redis连接端口。 password 否 无 String redis认证密码。 namespace 否 无 String redis key的namespace delimiter 否 : String redis的key和namespace之间的分隔符。 data-type 否 hash String redis的数据类型,有下列选项: hash list set sorted-set string data-type取值约束详见data-type取值约束说明。 schema-syntax 否 fields String redis的schema语义,包含以下值(其具体使用请参考注意事项和常见问题): fields:适用于所有数据类型 fields-scores:适用于sorted set数据类型 array:适用于list、set、sorted set数据类型 array-scores:适用于sorted set数据类型 map:适用于hash、sorted set数据类型 schema-syntax取值约束详见schema-syntax取值约束说明。 deploy-mode 否 standalone String Redis集群的部署模式,支持standalone、master-replica、cluster。默认为standalone。 Redis实例类型不同配置的部署模式不同: 单机、主备、proxy集群实例都选择standalone, cluster实例选择cluster。 retry-count 否 5 Integer 连接redis集群的尝试次数。 connection-timeout-millis 否 10000 Integer 尝试连接redis集群时的最大超时时间。 commands-timeout-millis 否 2000 Integer 等待操作完成响应的最大时间。 rebalancing-timeout-millis 否 15000 Integer redis集群失败时的休眠时间。 scan-keys-count 否 1000 Integer 每次扫描时读取的数量。 default-score 否 0 Double 当data-type设置为“sorted-set”时的默认score。 deserialize-error-policy 否 fail-job Enum 数据解析失败时的处理方式。枚举类型,包含以下值: fail-job:作业失败 skip-row:跳过当前数据 null-field:设置当前数据为null skip-null-values 否 true Boolean 是否跳过null。 ignore-retractions 否 false Boolean 连接器应忽略更新插入/撤回流模式下的收回消息。 key-column 否 无 String Redis 表schema的key source.parallelism 否 无 int 定义源的自定义并行度。默认情况下,如果未定义此选项,使用全局配置来的并行度。
  • 前提条件 该场景作业需要运行在DLI的独享队列上,因此要与Doris建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 如果使用MRS Doris,请在增强型跨源的主机信息中添加MRS集群所有节点的主机ip信息。 详细操作请参考《数据湖探索用户指南》中的“修改主机信息”章节描述。 集群未启用Kerberos认证(普通模式) 使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。
  • 语法格式 create table dorisSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'doris', 'fenodes' = 'FE_IP:PORT,FE_IP:PORT,FE_IP:PORT', 'table.identifier' = 'database.table', 'username' = 'dorisUsername', 'password' = 'dorisPassword' );
  • 注意事项 创建Flink OpenSource SQL作业时,在作业编辑界面的“运行参数”处,“Flink版本”需要选择“1.15”,勾选“保存作业日志”并设置保存作业日志的OBS桶,方便后续查看作业日志。 认证用的username和password等硬编码到代码中或者明文存储都有很大的安全风险,建议使用DEW管理凭证。配置文件或者环境变量中密文存放,使用时解密,确保安全。Flink Opensource SQL使用DEW管理访问凭据 集群未启用Kerberos认证(普通模式) Doris的表名是区分大小写。 使用cloudTable的doris时,'fenodes'字段值的端口请用8030,如'xx:8030'。同时安全组请放开端口8030, 8040,9030。 开启HTTPS后,需要在创建表的with子句中添加如下配置参数: 'doris.enable.https' = 'true' 'doris.ignore.https.ca' = 'true'
  • 示例 该示例是从Doris源表读取数据,并输入到 print connector。 参考增强型跨源连接,在DLI上根据Doris所在的虚拟私有云和子网分别创建相应的增强型跨源连接,并绑定所要使用的Flink弹性资源池。参考“修改主机信息”章节描述,在增强型跨源中增加MRS的主机信息。 设置Doris的安全组,添加入向规则使其对Flink的队列网段放通。参考测试地址连通性分别根据Doris的地址测试队列连通性。如果能连通,则表示跨源已经绑定成功,否则表示未成功。 创建Doris表,并插入10条数据。创建语句如下: CREATE TABLE IF NOT EXISTS dorisdemo ( `user_id` varchar(10) NOT NULL, `city` varchar(10), `age` int, `gender` int ) DISTRIBUTED BY HASH(`user_id`) BUCKETS 10; INSERT INTO dorisdemo VALUES ('user1', 'city1', 20, 1); INSERT INTO dorisdemo VALUES ('user2', 'city2', 21, 0); INSERT INTO dorisdemo VALUES ('user3', 'city3', 22, 1); INSERT INTO dorisdemo VALUES ('user4', 'city4', 23, 0); INSERT INTO dorisdemo VALUES ('user5', 'city5', 24, 1); INSERT INTO dorisdemo VALUES ('user6', 'city6', 25, 0); INSERT INTO dorisdemo VALUES ('user7', 'city7', 26, 1); INSERT INTO dorisdemo VALUES ('user8', 'city8', 27, 0); INSERT INTO dorisdemo VALUES ('user9', 'city9', 28, 1); INSERT INTO dorisdemo VALUES ('user10', 'city10', 29, 0); 参考创建Flink OpenSource作业,创建flink opensource sql作业,输入以下作业脚本,并提交运行。该作业脚本读取Doris表,并打印。 CREATE TABLE dorisDemo ( `user_id` String NOT NULL, `city` String, `age` int, `gender` int ) with ( 'connector' = 'doris', 'fenodes' = 'FE_IP:PORT,FE_IP:PORT,FE_IP:PORT', 'table.identifier' = 'demo.dorisdemo', 'username' = 'dorisUser', 'password' = 'dorisPassword', 'doris.request.retries'='3', 'doris.batch.size' = '100' ); CREATE TABLE print ( `user_id` String NOT NULL, `city` String, `age` int, `gender` int ) with ( 'connector' = 'print' ); insert into print select * from dorisDemo; 查看print结果表数据。 +I[user5, city5, 24, 1] +I[user4, city4, 23, 0] +I[user3, city3, 22, 1] +I[user10, city10, 29, 0] +I[user6, city6, 25, 0] +I[user1, city1, 20, 1] +I[user9, city9, 28, 1] +I[user7, city7, 26, 1] +I[user8, city8, 27, 0] +I[user2, city2, 21, 0]
  • 参数说明 表1 参数说明 参数 是否必选 默认值 数据类型 说明 connector 是 无 String 固定为:print。 print-identifier 否 无 String 配置一个标识符作为输出数据的前缀。 standard-error 否 false Boolean 该值只能为true或false,默认为false。 如果为true,则表示输出数据到taskmanager的error文件中。 如果为false,则表示输出数据到taskmanager的out中。 sink.parallelism 否 无 Integer 为Print结果表定义并行度。默认情况下,并行度由框架决定,与上游并行度一致。
  • 语法格式 1 2 3 4 5 6 7 8 9 create table printSink ( attr_name attr_type (',' attr_name attr_type) * (',' PRIMARY KEY (attr_name,...) NOT ENFORCED) ) with ( 'connector' = 'print', 'print-identifier' = '', 'standard-error' = '' );
  • 包年包月弹性资源池设置了定时扩缩容时,怎样计费? 假设您在2023/03/08 15:50:04购买了包年/包月弹性资源池(CU范围:64CUs),购买时长为一个月,在资源运行一段时间后发现使用过程中大部分时间CU数在128CU以上(假设每天累计12个小时实际CUs为128CUs,超出规格64CUs。),因此在2023/03/10 15:50:04变更弹性资源池规格为128CU。弹性资源池计费详情请参见数据湖探索价格详情。 价格仅供参考,实际计算请以数据湖探索价格详情中的价格为准。 该示例计费详情如表1所示。 表1 包年/包月弹性资源池+扩缩容场景计费示例 计费模式 计算规则 付费周期 费用计算 包年/包月 弹性资源池64CUs包周期 1个月 费用:10880元 包年/包月 规格变更增加64CUs,包周期计费 28天 费用:(10880元 / 30) x 28天 = 10154.7元 按需 超出规格部分的按需计费: 计算费用=单价*CU数*小时数 2*12=24小时 费用:0.4 x 64 x 24(小时数)= 614.4 元 - - - 合计:21649.1 元 父主题: 计费相关问题
  • 弹性资源池的实际CUs、CU范围、规格的含义 实际CUs:弹性资源池当前分配的可用CUs。 实际CUs的计算公式: 实际CUs=min{sum(队列maxCU),弹性资源池maxCU}。 计算结果需满足为16CUs的倍数,如果不能整除16CUs则向上取整。 实际CUs的分配示例: 如表1所示,弹性资源池实际CUs分配的计算过程如下: 计算队列maxCU之和:sum(队列maxCU)= 32 + 56 = 88CUs。 比较队列maxCU之和与弹性资源池maxCU,两者取最小值:min{88CUs,112CUs} = 88CUs。 检查88CUs是否为16CU的倍数,由于88不能被16整除,故向上取整为96CUs。 表1 弹性资源池实际CUs分配示例 场景说明 资源类型 CU范围 新建弹性资源池64-112CUs 添加了两个队列,分别为队列A和队列B。两个队列设置的CU范围如下: 队列A的CU范围:16-32CUs 队列B的CU范围:16-56CUs 弹性资源池 64-112CUs 队列A 16-32CUS 队列B 16-56CUS 已使用CUs:已经被作业或任务占用的CU资源。这些资源可能正在执行计算任务,暂时不可用。 HetuEngine已使用CUs和实际CU一致。 CU范围:CU设置主要是为了控制弹性资源池扩缩容的最大最小CU范围,避免无限制的资源扩容风险。 弹性资源池中所有队列的最小CU数之和需要小于等于弹性资源池的最小CU数。 弹性资源池中任意一个队列的最大CU必须小于等于弹性资源池的最大CU。 弹性资源池至少可以满足弹性资源池中所有队列按最小CU运行,尽量满足队列按最大CU运行。 规格(包周期CU):购买弹性资源池时选择的CU范围的最小值即弹性资源池规格。规格是包周期弹性资源池特有的。规格部分以包周期的计费,规格之外的部分按需计费。 父主题: 计费相关问题
  • 排查思路 DLI提供优惠的资源套餐包,购买套餐包后,资源按照套餐包类型计费。 套餐包是用户预先购买的资源使用量配额,超出当前套餐包的额度,将自动转为按需收费。 建议您按以下顺序排查原因。 可能原因 处理措施 未购买对应的套餐包 请参考判断是否经购买了对应的套餐包 套餐包额度已用完 请参考判断套餐包额度是否超出 购买的套餐包没有涵盖到DLI所有计费项 请参考判断是否产生了套餐包未涵盖的计费项 如果处理完某个可能原因仍未解决问题,请继续排查其他可能原因。
  • 判断是否经购买了对应的套餐包 目前DLI支持的套餐包类型如表1所示。 不同套餐包抵扣的费用不同,详见套餐包计费。请确认是否已经购买了产生按需费用的计费项所对应的套餐包。 如果未购买,产生按需计费为正常现象;如果已购买,请继续排查判断套餐包额度是否超出。 表1 套餐包类型及适用场景 类型 说明 适用场景 重置规则 扫描数据量套餐包 按照用户每个作业的数据扫描量(单位为“GB”)收取计算费用。 仅适用于default 队列。 扫描数据量套餐的额度按订购周期会重置。 按订购周期重置:重置周期为月,例如1月5日订购套餐包,之后每月5日同一时间重置免费资源。 弹性资源池CU时套餐包 按照“CU时”收取计算费用,用户在弹性资源池添加的队列上运行作业时按照弹性资源池CU时计费。 适用于可预估队列使用量的场景、或测试项目等资源消耗不高的场景。 弹性资源池CU时套餐包的额度按订购周期重置。 按订购周期重置:重置周期为月,例如1月5日订购套餐包,之后每月5日同一时间重置免费资源。 存储量套餐包 按照存储在DLI服务中的数据存储量(单位为“GB”)收取存储费用。 适用于在DLI 存储表数据的场景,例如时延敏感类的业务将表存储在DLI,使用存储量套餐包可以节省存储费用。 存储套餐的额度每个小时会重置。
  • 数据湖队列什么情况下是空闲状态? 队列空闲状态是指在DLI 作业管理中,该队列下均无SQL 作业运行,或者 Flink 作业运行、Spark 作业运行。 即一个自然小时内无作业运行,该自然小时为空闲状态。不会进行计费。 通常按需计费的队列,在空闲1h后计算资源会被释放,再次使用时,需要重新分配计算资源,可能会耗费5~10min时间。 按需计费以小时为单位进行结算。不足一小时按一小时计费,小时数按整点计算。 例如 12:10 ~12:30、12:10 ~12:55 期间使用,则收取1个小时的费用。 具体的计费模式请参考:计费说明。 父主题: 计费相关问题
  • 包年/包月计费模式的DLI资源可以和按需计费模式相互转换吗? DLI暂不支持变更资源的计费模式。 请您根据业务场景提前做好资源规划,如资源消耗较大,建议您选择包年/包月或购买套餐包的形式更加优惠。 自建队列购买方式如下: 购买队列请参考创建队列。 弹性资源池购买方式如下: 具体弹性资源池的购买和添加队列操作可以参考《数据湖探索用户指南》中的“创建弹性资源池”和“弹性资源池添加队列”相关章节。 父主题: 计费相关问题
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全