云服务器内容精选

  • 请求示例 使用ID为100000的模板创建名为myjob的Flink SQL作业,该作业执行在testQueue队列上以独享的模式运行。 { "name": "myjob", "desc": "这是个做字符记数的作业", "template_id": 100000, "queue_name": "testQueue", "sql_body": "select * from source_table", "run_mode": "exclusive_cluster", "cu_number": 2, "parallel_number": 1, "checkpoint_enabled": false, "checkpoint_mode": "exactly_once", "checkpoint_interval": 0, "obs_bucket": "my_obs_bucket", "log_enabled": false, "restart_when_exception": false, "idle_state_retention": 3600, "job_type": "flink_sql_job", "dirty_data_strategy": "0", "execution_agency_urn": "myAgencyName", "udf_jar_url": "group/test.jar" }
  • 响应消息 表7 响应参数说明 参数名称 是否必选 参数类型 说明 is_success 否 String 执行请求是否成功。“true”表示请求执行成功。 message 否 String 消息内容。 job 否 Object 作业状态信息。具体请参考表8。 表8 job参数说明 参数名称 是否必选 参数类型 说明 job_id 是 Long 作业ID。 status_name 否 String 当前状态名称。参数说明可以参考查询作业详情中status作业状态字段说明。 status_desc 否 String 当前状态描述。包含异常状态原因及建议。
  • 请求消息 表2 请求参数说明 参数名称 是否必选 参数类型 说明 name 是 String 作业名称。长度限制:1-57个字符。 desc 否 String 作业描述。长度限制:0-512个字符。 template_id 否 Integer 模板ID。 如果“template_id”和“sql_body”都不为空,优先选择“sql_body”的内容;如果“template_id”不为空,“sql_body”为空,选择“template_id”的内容填充“sql_body”。 queue_name 否 String 队列名称。长度限制:0-128个字符。 sql_body 否 String Stream SQL语句,至少包含source, query, sink三个部分。长度限制:1024*1024个字符。 run_mode 否 String 作业运行模式: shared_cluster:共享。 exclusive_cluster:独享。 edge_node:边缘节点。 默认值为“shared_cluster”。 cu_number 否 Integer 用户为作业选择的CU数。默认值为“2”。 CU数量为 DLI 的计算单元数量和管理单元数量总和,CU也是DLI的计费单位,1CU=1核4G。当前配置的CU数量为运行作业时所需的CU数,不能超过其绑定队列的CU数量。管理单元参数设置详见:manager_cu_number。 parallel_number 否 Integer 用户设置的作业并行数目。默认值为“1”。 并行数是指同时运行Flink SQL作业的最大任务数。适度增加并行数会提高作业整体算力,但也须考虑线程增多带来的切换开销。最大并行数不能大于计算单元(CU数量-管理单元)的4倍。 管理单元参数设置详见:manager_cu_number。 checkpoint_enabled 否 Boolean 是否开启作业自动快照功能。 开启:true 关闭:false 默认:false checkpoint_mode 否 Integer 快照模式,。两种可选: 1:表示exactly_once,数据只被消费一次。 2:表示at_least_once,数据至少被消费一次。 默认值为1。 checkpoint_interval 否 Integer 快照时间间隔。单位为秒,默认值为“10”。 obs_bucket 否 String 当“checkpoint_enabled”为“true”时,该参数是用户授权保存快照的OBS桶名。 当“log_enabled” 为“true”时,该参数是用户授权保存作业日志的OBS桶名。 log_enabled 否 Boolean 是否开启作业的日志上传到用户的OBS功能。默认为“false”。 smn_topic 否 String 当作业异常时,向该 SMN 主题推送告警信息。 restart_when_exception 否 Boolean 是否开启作业异常自动重启。默认为“false”。 idle_state_retention 否 Integer 空闲状态保留时间。单位为秒,默认值为“3600”。 job_type 否 String 作业类型:flink_sql_job、flink_opensource_sql_job。 默认值:“flink_opensource_sql_job”。 “run_mode”为“exclusive_cluster”时,作业类型须为“flink_sql_job”或“flink_opensource_sql_job”。 “run_mode””为“shared_cluster”时作业类型必须为”flink_sql_job“。 edge_group_ids 否 Array of Strings 边缘计算组ID列表, 多个ID以逗号分隔。 dirty_data_strategy 否 String 作业脏数据策略。 “2:obsDir”:保存,obsDir表示脏数据存储路径。 “1”:抛出异常。 “0”:忽略。 默认值为“0”。 udf_jar_url 否 String 用户已上传到DLI资源管理系统的资源包名,用户sql作业的udf jar包通过该参数传入。 Flink1.15及以上版本在创建作业时仅支持配置OBS中的程序包,不支持读取DLI程序包。 manager_cu_number 否 Integer 用户为作业选择的管理单元(jobmanager)CU数量,默认值为“1”。 tm_cus 否 Integer 每个taskmanager的CU数,默认值为“1”。 tm_slot_num 否 Integer 每个taskmanager的slot数,默认值为“(parallel_number*tm_cus)/(cu_number-manager_cu_number)”。 resume_checkpoint 否 Boolean 异常重启是否从checkpoint恢复。 resume_max_num 否 Integer 异常重试最大次数,单位:次/小时。取值范围:-1或大于0。默认值为“-1”,表示无限次数。 tags 否 Array of Objects Flink SQL作业的标签。具体请参考表3。 runtime_config 否 String Flink作业运行时自定义优化参数。 flink_version 否 String Flink版本。 execution_agency_urn 否 String 授权给DLI的委托名。Flink1.15版本时支持配置该参数。 resource_config_version 否 String 资源配置版本。可选值 "v1" ,"v2".默认为“v1”。 v2版本对比于v1模版不支持设置CU数量,支持直接设置Job Manager Memory和Task Manager Memory。 v1:适用于Flink 1.12、Flink 1.13、Flink 1.15 V2:适用于Flink 1.13、Flink 1.15、Flink 1.17 优先推荐使用V2版本的参数设置。 resource_config 否 Object Flink 作业的资源配置。 具体参数说明请参考表4。 资源配置版本 为 "v2"时,配置生效,资源配置版本 为 "v1"时,配置无效。 表3 tags参数 参数名称 是否必选 参数类型 说明 key 是 String 标签的键。 说明: 标签的键的最大长度为128个字符,标签的键可以包含任意语种字母、数字、空格和_ . : +-@ ,但首尾不能含有空格,不能以_sys_开头。 value 是 String 标签的值。 说明: 标签值的最大长度为255个字符,标签的值可以包含任意语种字母、数字、空格和_ . : +-@ 。 表4 resource_config参数说明 参数名称 是否必选 参数类型 说明 max_slot 否 integer 该参数用于设置单个TaskManager可以提供的并行任务数量。每个Task Slot可以并行执行一个任务。增加 Task Slots 可以提高 TaskManager 的并行处理能力,但也会增加资源消耗。 Task Slots的数量与TaskManager的CPU数相关联,因为每个CPU可以提供一个Task Slot。 单TM Slot默认值为1。最小并行数不能小于1。 parallel_number 否 integer 作业的并行数,指作业中各个算子的并行执行的子任务的数量,算子的子任务数就是其对应算子的并行度。默认值为“1”。 jobmanager_resource_spec 否 Object JobManager资源规格。具体参数说明请参考表5。 taskmanager_resource_spec 否 Object TaskManager资源规格。具体参数说明请参考表6。 表5 jobmanager_resource_spec参数说明 参数名称 是否必选 参数类型 说明 cpu 否 double JobManager可以使用的CPU核数,默认值为1.0个CPU核数,最低不得小于0.5个CPU核数 memory 否 string JobManager可以使用的内存,单位MB,GB。默认GB。默认值为4GB,最低不得小于2G。 表6 taskmanager_resource_spec参数说明 参数名称 是否必选 参数类型 说明 cpu 否 double TaskManager可以使用的CPU核数,默认值为1.0个CPU核数,最低不得小于0.5个CPU核数 memory 否 string TaskManager可以使用的内存,单位MB,GB。默认GB。默认值为4GB,最低不得小于2G
  • 响应消息 表3 响应参数 参数名称 是否必选 参数类型 说明 is_success 是 Boolean 执行请求是否成功。“true”表示请求执行成功。 message 是 String 系统提示信息。执行成功时,信息可能为空。 object_name 否 String 对象名称。 object_type 否 String 对象类型。 privileges 否 Array of Object 权限信息。具体内容请参考表4。 count 否 Integer 权限总数量。 表4 privileges参数 参数名称 是否必选 参数类型 说明 object 否 String 授权对象,和赋权API中的“object”对应。 is_admin 否 Boolean 判断用户是否为管理员。 user_name 否 String 用户名称,即该用户在当前数据库上有权限。 privileges 否 Array of Strings 该用户在数据库上的权限。
  • 响应示例 { "is_success": true, "message": "", "object_name": "9561", "object_type": "flink", "count": 2, "privileges": [ { "user_name": "testuser1", "is_admin": true, "privileges": [ "ALL" ] }, { "user_name": "user1", "is_admin": false, "privileges": [ "GET" ] } ] }
  • 响应消息 表7 响应参数说明 参数名称 是否必选 参数类型 说明 is_success 否 String 执行请求是否成功。“true”表示请求执行成功。 message 否 String 消息内容。 job 否 Object 作业状态信息。具体请参考表8。 表8 job参数说明 参数名称 是否必选 参数类型 说明 job_id 是 Long 作业ID。 status_name 否 String 当前状态名称。 status_desc 否 String 当前状态描述。包含异常状态原因及建议。
  • 请求示例 新建名为test的Flink Jar作业,并设置作业执行在testQueue上,设置作业运行所使用的CU数、开启作业日志。 { "name": "test", "desc": "job for test", "queue_name": "testQueue", "manager_cu_number": 1, "cu_number": 2, "parallel_number": 1, "tm_cus": 1, "tm_slot_num": 1, "log_enabled": true, "obs_bucket": "bucketName", "smn_topic": "topic", "main_class": "org.apache.flink.examples.streaming.JavaQueueStream", "restart_when_exception": false, "entrypoint": "javaQueueStream.jar", "entrypoint_args":"-windowSize 2000 -rate 3", "dependency_jars": [ "myGroup/test.jar", "myGroup/test1.jar" ], "execution_agency_urn": "myAgencyName", "dependency_files": [ "myGroup/test.csv", "myGroup/test1.csv" ] }
  • 请求消息 表2 参数说明 参数名称 是否必选 参数类型 说明 name 是 String 作业名称。长度限制:1-57个字符。 desc 否 String 作业描述。长度限制:0-512个字符。 queue_name 否 String 队列名称。长度限制:0-128个字符。 cu_number 否 Integer 用户为作业选择的CU数量。 manager_cu_number 否 Integer 用户为作业选择的管理节点CU数量,对应为flink jobmanager数量。默认值为“1”。 parallel_number 否 Integer 用户为作业选择的并发量。 log_enabled 否 Boolean 是否开启作业日志。 开启:true 关闭:false 默认:false obs_bucket 否 String 当“log_enabled”为“true”时, 用户授权保存作业日志的OBS桶名。 smn_topic 否 String 当作业异常时,向该SMN主题推送告警信息。 main_class 否 String 作业入口类。 entrypoint_args 否 String 作业入口类参数,多个参数之间空格分隔。 restart_when_exception 否 Boolean 是否开启异常重启功能,默认值为“false”。 entrypoint 否 String 用户已上传到OBS的程序包名,用户自定义作业主类所在的jar包。 Flink1.15推荐配置OBS中的程序包,不推荐使用DLI程序包。Flink1.15以上版本将不再支持读取DLI程序包。 示例:obs://bucket_name/test.jar dependency_jars 否 Array of Strings 用户已上传到OBS的程序包名,用户自定义作业的其他依赖包。 Flink1.15推荐配置OBS中的程序包,不推荐使用DLI程序包。Flink1.15以上版本将不再支持读取DLI程序包。 示例“obs://bucket_name/test1.jar, obs://bucket_name/test2.jar”。 dependency_files 否 Array of Strings 用户已上传到OBS的资源包名,用户自定义作业的依赖文件。 Flink1.15推荐配置OBS中的程序包,不推荐使用DLI程序包。Flink1.15以上版本将不再支持读取DLI程序包。 示例:"[obs://bucket_name/file1, obs://bucket_name/file2]"。 通过在应用程序中添加以下内容可访问对应的依赖文件。其中,“fileName”为需要访问的文件名,“ClassName”为需要访问该文件的类名。 ClassName.class.getClassLoader().getResource("userData/fileName") tm_cus 否 Integer 每个taskmanager的CU数,默认值为“1”。 tm_slot_num 否 Integer 每个taskmanager的slot数,默认值为“(parallel_number*tm_cus)/(cu_number-manager_cu_number)”。 feature 否 String 作业特性。表示用户作业使用的Flink镜像类型。 basic:表示使用DLI提供的基础Flink镜像。 custom:表示使用用户自定义的Flink镜像。 flink_version 否 String Flink版本。当用户设置“feature”为“basic”时,该参数生效。用户可通过与“feature”参数配合使用,指定作业运行使用的DLI基础Flink镜像的版本。 execution_agency_urn 否 String 授权给DLI的委托名。Flink1.15版本时支持配置该参数。 image 否 String 自定义镜像。格式为:组织名/镜像名:镜像版本。 当用户设置“feature”为“custom”时,该参数生效。用户可通过与“feature”参数配合使用,指定作业运行使用自定义的Flink镜像。关于如何使用自定义镜像,请参考《 数据湖探索 用户指南》。 resume_checkpoint 否 Boolean 异常重启是否从checkpoint恢复。 resume_max_num 否 Integer 异常重试最大次数,单位:次/小时。取值范围:-1或大于0。默认值为“-1”,表示无限次数。 checkpoint_path 否 String 用户Jar中checkpoint的储存地址,不同作业路径需要保持不同。 tags 否 Array of Objects Flink jar作业的标签。具体请参考表3。 runtime_config 否 String Flink作业运行时自定义优化参数。 resource_config_version 否 String 资源配置版本。可选值 "v1" ,"v2".默认为“v1”。 v2版本对比于v1模版不支持设置CU数量,支持直接设置Job Manager Memory和Task Manager Memory。 v1:适用于Flink 1.12、Flink 1.13、Flink 1.15 v2:适用于Flink 1.13、Flink 1.15、Flink 1.17 优先推荐使用V2版本的参数设置。 resource_config 否 Object Flink 作业的资源配置。 具体参数说明请参考表4。 资源配置版本 为 "v2"时,配置生效,资源配置版本 为 "v1"时,配置无效。 表3 tags参数 参数名称 是否必选 参数类型 说明 key 是 String 标签的键。 说明: 标签的键的最大长度为128个字符,标签的键可以包含任意语种字母、数字、空格和_ . : =+-@ ,但首尾不能含有空格,不能以_sys_开头。 value 是 String 标签的值。 说明: 标签值的最大长度为255个字符,标签的值可以包含任意语种字母、数字、空格和_ . : =+-@ ,但首尾不能含有空格。 表4 resource_config参数说明 参数名称 是否必选 参数类型 说明 max_slot 否 integer 该参数用于设置单个TaskManager可以提供的并行任务数量。每个Task Slot可以并行执行一个任务。增加 Task Slots 可以提高 TaskManager 的并行处理能力,但也会增加资源消耗。 Task Slots的数量与TaskManager的CPU数相关联,因为每个CPU可以提供一个Task Slot。 单TM Slot默认值为1。最小并行数不能小于1。 parallel_number 否 integer 作业的并行数,指作业中各个算子的并行执行的子任务的数量,算子的子任务数就是其对应算子的并行度。默认值为“1”。 jobmanager_resource_spec 否 ResourceSpec JobManager资源规格。具体参数说明请参考表5。 taskmanager_resource_spec 否 ResourceSpec TaskManager资源规格。具体参数说明请参考表6。 表5 jobmanager_resource_spec参数说明 参数名称 是否必选 参数类型 说明 cpu 否 double JobManager可以使用的CPU核数,默认值为1.0个CPU核数,最低不得小于0.5个CPU核数 memory 否 string JobManager可以使用的内存,单位MB,GB。默认GB。默认值为4GB,最低不得小于2G。 表6 taskmanager_resource_spec参数说明 参数名称 是否必选 参数类型 说明 cpu 否 double TaskManager可以使用的CPU核数,默认值为1.0个CPU核数,最低不得小于0.5个CPU核数 memory 否 string TaskManager可以使用的内存,单位MB,GB。默认GB。默认值为4GB,最低不得小于2G
  • 请求消息 表2 请求参数说明 参数 是否必选 参数类型 描述 elastic_resource_pool_name 是 String 新建的弹性资源池名称,名称只能包含数字、英文字母和下划线,但不能是纯数字,且不能以下划线开头。长度限制:1~128个字符。 说明: 若名称中包含大写字母,将会自动转换为小写字母。 description 否 String 描述信息。长度限制:256个字符以内。 cidr_in_vpc 否 String 虚拟集群关联的vpc cidr。如果不填,默认值为172.16.0.0/12。 max_cu 是 Integer 最大CU大于等于该资源池下任意一个队列的最大CU之和且大于min_cu。 标准版:最小值为64CUs 基础版:最小值为16CUs,最大值为64CUs min_cu 是 Integer 最小CU大于等于该资源池下所有队列最小CU之和,最小值为64。 标准版:最小值为64CUs 基础版:最小值为16CUs,最大值为64CUs charging_mode 否 Integer 计费类型:默认值是1,表示按需计费。 enterprise_project_id 否 String 企业ID,不填默认为“0”。 tags 否 Array of Objects 队列的标签,使用标签标识云资源。包括“标签键”和“标签值”,具体请参考表3。 label 否 map 弹性资源池属性字段。 如果需要购买基础版,配置该参数值为{"spec":"basic"}。 不配置该参数时默认为标准版弹性资源池。 ipv6_enable 否 boolean 是否启用IPv6。 开启IPv6后,将自动为资源池分配IPv6网段,暂不支持自定义IPv6网段。该功能一旦开启,将不能关闭。默认值:false true:启用IPv6。 false,不开启IPv6。 表3 tags参数 参数名称 是否必选 参数类型 说明 key 是 String 标签的键。 说明: 标签的键的最大长度为128个字符,标签的键可以包含任意语种字母、数字、空格和_ . : +-@ ,但首尾不能含有空格,不能以_sys_开头。 value 是 String 标签的值。 说明: 标签值的最大长度为255个字符,标签的值可以包含任意语种字母、数字、空格和_ . : +-@ 。
  • 请求示例 创建一个弹性资源池,最大CU为684,最小CU为684。 { "elastic_resource_pool_name" : "elastic_pool_0623_02", "description" : "test", "cidr_in_vpc" : "172.16.0.0/14", "charging_mode" : "1", "max_cu" : 684, "min_cu" : 684 }
  • 请求参数 表2 请求Body参数 参数 是否必选 参数类型 描述 elastic_resource_pool_name 是 String 新建的弹性资源池名称。 名称只能包含数字、小写英文字母和下划线,但不能是纯数字,且不能以下划线开头。长度限制:1~128个字符。 description 否 String 描述信息。长度限制:256个字符以内。 cidr_in_vpc 否 String 弹性资源池的子网网段。 如果不填,默认值为172.16.0.0//12 最大长度:18 max_cu 是 Integer 最大CU大于等于该资源池下任意一个队列的最大CU之和且大于min_cu。 标准版:最小值为64CUs 基础版:最小值为16CUs,最大值为64CUs min_cu 是 Integer 最小CU大于等于该资源池下所有队列最小CU之和,最小值为64。 标准版:最小值为64CUs 基础版:最小值为16CUs,最大值为64CUs enterprise_project_id 否 String 企业项目ID,“0”表示default,即默认的企业项目。 开通了企业管理服务的用户可设置该参数绑定指定的项目。 auto_renew 否 Boolean 是否自动续费。 true:是。 false:否。 默认false charging_mode 是 Integer 计费模式。 包周期计费模式默认取值为2。 period_type 是 String 包周期类型,year或者month。 枚举值: year month period_num 否 Integer 周期数,默认1。 缺省值:1 tags 否 Array of objects 弹性资源池标签。请参考TmsTag。 表3 tag参数说明 参数 是否必选 参数类型 描述 key 是 String 标签名称。 value 是 String 标签值。
  • 响应示例 状态码: 403 Forbidden { "error_code" : "DLI.0003", "error_msg" : "Forbidden" } 状态码: 404 Not Found { "error_code" : "DLI.0002", "error_msg" : "Not Found" } 状态码: 500 Internal Server Error { "error_code" : "DLI.0999", "error_msg" : "Internal Server Error" }
  • 响应参数 状态码: 200 表4 响应Body参数 参数 参数类型 描述 message String 系统提示信息,执行成功时,信息可能为空。 状态码: 400 表5 响应Body参数 参数 参数类型 描述 error_code String 错误码。 error_msg String 错误描述信息。 状态码: 500 表6 响应Body参数 参数 参数类型 描述 error_code String 错误码 error_msg String 失败原因
  • 步骤5:在DLI作业开发时使用LakeFormation元数据 DLI对接LakeFormation默认实例且完成LakeFormation的资源授权后,即可以在作业开发时使用LakeFormation元数据。 DLI SQL: LakeFormation SQL语法说明请参考DLI Spark SQL语法参考。 在执行SQL作业时,您可以在控制台选择执行SQL所在的catalog,如图2所示,或在SQL命令中指定catalogName。catalogName是DLI控制台的数据目录映射名。 图2 在SQL编辑器页面选择数据目录 对接LakeFormation实例场景,在创建数据库时需要指定数据库存储的OBS路径。 对接LakeFormation实例场景,在创建表时不支持设置表生命周期和多版本。 对接LakeFormation实例场景,LOAD DATA语句不支持datasource表,且LOAD DATA分区表必须指定分区。 在LakeFormation控制台创建的数据库和表中包含中文字符时,不支持在DLI执行相关数据库和表的操作。 对接LakeFormation实例场景,不支持指定筛选条件删除分区。 对接LakeFormation实例场景,不支持创建Truncate Datasource/Hive外表。 DLI暂不支持使用LakeFormation行过滤条件功能。 DLI读取binary类型的数据进行console展示时,会对binary数据进行Base64转换。 在DLI暂不支持LakeFormation的路径授权。 DLI Spark Jar: 本节介绍在DLI管理控制台提交Spark Jar作业时使用LakeFormation元数据的配置操作。 Spark Jar 示例 SparkSession spark = SparkSession.builder() .enableHiveSupport() .appName("java_spark_demo") .getOrCreate(); spark.sql("show databases").show(); DLI管理控制台Spark Jar作业配置说明 (推荐)方式一:使用控制台提供的参数项(委托、元数据来源等)配置Spark Jar作业访问LakeFormation元数据 新建或编辑Spark Jar作业时,请参考表3Spark Jar作业访问LakeFormation元数据。 表3 配置Spark Jar作业访问LakeFormation元数据 参数 说明 配置示例 Spark版本 Spark 3.3.x及以上版本支持对接LakeFormation。 3.3.1 委托 使用Spark 3.3.1及以上版本的引擎执行作业时,需要您先在 IAM 页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: spark.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 访问元数据 配置开启Spark作业访问元数据功能。 是 元数据来源 配置Spark作业访问的元数据类型。本场景下请选择Lakeformation。 选择该参数后系统将自动为您的作业添加以下配置项用于加载lakeformation相关依赖。 spark.sql.catalogImplementation=hive spark.hadoop.hive-ext.dlcatalog.metastore.client.enable=true spark.hadoop.hive-ext.dlcatalog.metastore.session.client.class=com.huawei.cloud.dalf.lakecat.client.hiveclient.LakeCatMetaStoreClient og // lakeformation相关依赖加载 spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/* spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/* “元数据来源”还支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。 优先推荐您使用控制台提供的“元数据来源”参数项进行配置。 Lakeformation 数据目录名称 配置Spark作业访问的数据目录名称。 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。如需指定LakeFormation其他实例请参考◦方式二:使用Spark(--conf)参数配置...在Spark(--conf)中配置连接的Lakeformation实例和数据目录。 选择该参数后系统将自动为您的作业添加以下配置项用于连接Lakeformation默认实例下的数据目录。 spark.hadoop.lakecat.catalogname.default=lfcatalog “数据目录名称”还支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。 优先推荐您使用控制台提供的“数据目录名称”参数项进行配置。 - Spark参数(--conf) “元数据来源”和“数据目录名称”均支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。 如果您需要配置访问Hudi数据表,可在Spark(--conf)参数中填加以下配置项。 spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension spark.hadoop.hoodie.write.lock.provider=org.apache.hudi.lakeformation.LakeCatMetastoreBasedLockProvider 如果您需要配置访问Delta数据表,可在Spark(--conf)参数中填加以下配置项。 spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension - 方式二:使用Spark(--conf)参数配置Spark Jar作业访问LakeFormation元数据 新建或编辑Spark Jar作业时,请在作业配置页面的Spark(--conf)参数中按需配置以下信息以访问LakeFormation元数据。 spark.sql.catalogImplementation=hive spark.hadoop.hive-ext.dlcatalog.metastore.client.enable=true spark.hadoop.hive-ext.dlcatalog.metastore.session.client.class=com.huawei.cloud.dalf.lakecat.client.hiveclient.LakeCatMetaStoreClient spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension //支持hudi,可选 spark.hadoop.hoodie.write.lock.provider=org.apache.hudi.lakeformation.LakeCatMetastoreBasedLockProvider //支持hudi,可选 // 使用有OBS和lakeformation权限的委托访问,建议用户设置最小权限集 spark.dli.job.agency.name=agencyForLakeformation //需要访问的lakeformation实例ID,在lakeformation console查看。可选,如不填写访问Lakeformation的默认实例 spark.hadoop.lakeformation.instance.id=xxx //需要访问的lakeformation侧的CATA LOG 名称,在lakeformation console查看。可选,如不填写则默认值为hive spark.hadoop.lakecat.catalogname.default=lfcatalog // lakeformation相关依赖加载 spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/* spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/* DLI Flink OpenSource SQL 示例1:委托的方式对接Lakeformation 创建Flink OpenSource SQL作业并配置如下参数: 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 开启checkpoint 勾选开启checkpoint。 开启 自定义参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例中关于Catalog的参数说明请参考表4 表4 Flink OpenSource SQL示例中关于Catalog的参数说明 参数 说明 是否必填 参数值 type catalog类型 是 固定值hive hive-conf-dir hive-conf路径,固定值/opt/flink/conf 是 固定值/opt/flink/conf default-database 默认数据库名称 否 默认default库 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 CREATE CATALOG hive WITH ( 'type' = 'hive', 'hive-conf-dir' = '/opt/flink/conf', -- 固定配置/opt/flink/conf 'default-database'='default' ); USE CATALOG hive; CREATE TABLE IF NOT EXISTS dataGenSource612 (user_id string, amount int) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.user_id.kind' = 'random', 'fields.user_id.length' = '3' ); CREATE table IF NOT EXISTS printSink612 (user_id string, amount int) WITH ('connector' = 'print'); INSERT INTO printSink612 SELECT * FROM dataGenSource612; 示例2:DEW的方式对接Lakeformation 创建Flink OpenSource SQL作业并配置如下参数: 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 开启checkpoint 勾选开启checkpoint。 开启 自定义参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例中关于Catalog的参数说明请参考表5 需要指定properties.catalog.lakeformation.auth.identity.util.class参数值为com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator,并且配置dew相关配置。 表5 Flink OpenSource SQL示例中关于Catalog的参数说明(DEW方式) 参数 说明 是否必填 参数值 type catalog类型 是 固定值hive hive-conf-dir hive-conf路径,固定值/opt/flink/conf 是 固定值/opt/flink/conf default-database 默认数据库名称 否 不填默认default库 properties.catalog.lakecat.auth.identity.util.class 认证信息获取类 是 dew方式必填,固定配置为com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator properties.catalog.dew.projectId DEW所在的项目ID, 默认是Flink作业所在的项目ID。 是 使用dew方式必填 properties.catalog.dew.endpoint 指定要使用的DEW服务所在的endpoint信息。 是 使用dew方式必填。 配置示例:kms.xxx.com properties.catalog.dew.csms.secretName 在DEW服务的凭据管理中新建的通用凭据的名称。 是 使用dew方式必填 properties.catalog.dew.csms.version 在DEW服务的凭据管理中新建的通用凭据的版本号。 是 使用dew方式必填 properties.catalog.dew.access.key 在DEW服务的凭据中配置access.key值对应的key 是 使用dew方式必填 properties.catalog.dew.secret.key 在DEW服务的凭据中配置secret.key值对应的key 是 使用dew方式必填 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 CREATE CATALOG myhive WITH ( 'type' = 'hive', 'hive-conf-dir' = '/opt/flink/conf', 'default-database'='default', --下边是dew相关配置,请根据实际情况修改参数值 'properties.catalog.lakeformation.auth.identity.util.class' = 'com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator', 'properties.catalog.dew.endpoint'='kms.xxx.com', 'properties.catalog.dew.csms.secretName'='obsAksK', 'properties.catalog.dew.access.key' = 'myak', 'properties.catalog.dew.secret.key' = 'mysk', 'properties.catalog.dew.projectId'='330e068af1334c9782f4226xxxxxxxxx', 'properties.catalog.dew.csms.version'='v9' ); USE CATALOG myhive; create table IF NOT EXISTS dataGenSource_dew612( user_id string, amount int ) with ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.user_id.kind' = 'random', 'fields.user_id.length' = '3' ); create table IF NOT EXISTS printSink_dew612( user_id string, amount int ) with ( 'connector' = 'print' ); insert into printSink_dew612 select * from dataGenSource_dew612; 示例3:委托的方式对接Lakeformation写hudi表 创建Flink OpenSource SQL作业并配置如下参数: 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 开启checkpoint 勾选开启checkpoint。 开启 自定义参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例中关于Catalog的参数说明请参考表6。 表6 hudi类型Catalog参数说明 参数 说明 是否必填 参数值 type catalog类型 是 hudi表配置为hudi。 hive-conf-dir hive-conf路径,固定值/opt/flink/conf 是 固定值/opt/flink/conf。 default-database 默认数据库名称 否 默认default库。 mode 取值'hms' 或 'non-hms'。 'hms' 表示创建的 Hudi Catalog 会使用 Hive Metastore 存储元数据信息。 'non-hms'表示不使用Hive Metastore存储元数据信息。 是 固定值hms。 表7 hudi类型sink表的connector参数 参数 说明 是否必填 参数值 connector flink connector类型。 配置为hudi表示sink表是hudi表。 是 hudi path 表的基本路径。如果该路径不存在,则会创建它。 是 请参考示例代码中的配置值。 hoodie.datasource.write.recordkey.field hoodie表的唯一键字段名 否 这里配置order_id为唯一键。 EXTERNAL 是否外表 是 hudi表必填,且设置为true true CREATE CATALOG hive_catalog WITH ( 'type'='hive', 'hive-conf-dir' = '/opt/flink/conf', 'default-database'='test' ); USE CATALOG hive_catalog; create table if not exists genSource618 ( order_id STRING, order_name STRING, price INT, weight INT ) with ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.order_id.kind' = 'random', 'fields.order_id.length' = '8', 'fields.order_name.kind' = 'random', 'fields.order_name.length' = '5' ); CREATE CATALOG hoodie_catalog WITH ( 'type'='hudi', 'hive.conf.dir' = '/opt/flink/conf', 'mode'='hms' -- supports 'dfs' mode that uses the DFS backend for table DDLs persistence ); CREATE TABLE if not exists hoodie_catalog.`test`.`hudiSink618` ( `order_id` STRING PRIMARY KEY NOT ENFORCED, `order_name` STRING, `price` INT, `weight` INT, `create_time` BIGINT, `create_date` String ) PARTITIONED BY (create_date) WITH ( 'connector' = 'hudi', 'path' = 'obs://xxx/catalog/dbtest3/hudiSink618', 'hoodie.datasource.write.recordkey.field' = 'order_id', 'write.precombine.field' = 'create_time', 'EXTERNAL' = 'true' -- must be set ); insert into hoodie_catalog.`test`.`hudiSink618` select order_id, order_name, price, weight, UNIX_TIMESTAMP() as create_time, FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyyMMdd') as create_date from genSource618; DLI Flink Jar 示例1:委托方式对接Lakeformation 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录 示例代码如下: 本例通过DataGen表产生随机数据并输出到Print结果表中。 其他connector类型可参考Flink 1.15支持的connector列表。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 package com.huawei.test; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; @SuppressWarnings({"deprecation", "rawtypes", "unchecked"}) public class GenToPrintTaskAgency { private static final Logger LOGGER = LoggerFactory.getLogger(GenToPrintTaskAgency.class); private static final String datePattern = "yyyy-MM-dd_HH-mm-ss"; public static void main(String[] args) { LOGGER.info("Start task."); ParameterTool paraTool = ParameterTool.fromArgs(args); String checkpointInterval = "180000000"; // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval)); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern); String time = dateTimeFormat.format(System.currentTimeMillis()); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend( new FsStateBackend("obs://obs/xxx/testcheckpoint/" + time), true); env.setStateBackend(rocksDbBackend); String createCatalog = "CREATE CATALOG lf_catalog WITH (\n" + " 'type' = 'hive',\n" + " 'hive-conf-dir' = '/opt/hadoop/conf'\n" + " );"; tEnv.executeSql(createCatalog); String dataSource = "CREATE TABLE if not exists lf_catalog.`testdb`.`dataGenSourceJar618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.user_id.kind' = 'random',\n" + " 'fields.user_id.length' = '3'\n" + ")"; /*testdb是用户自定义的数数据库*/ tEnv.executeSql(dataSource); String printSink = "CREATE TABLE if not exists lf_catalog.`testdb`.`printSinkJar618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH ('connector' = 'print')"; tEnv.executeSql(printSink); /*testdb是用户自定义的数数据库*/ String query = "insert into lf_catalog.`test`.`printSinkJar618_1` " + "select * from lf_catalog.`test`.`dataGenSourceJar618_1`"; tEnv.executeSql(query); } } 创建Flink jar作业并配置如下参数。 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 优化参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例2:DEW方式对接Lakeformation 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录 示例代码如下: 本例通过DataGen表产生随机数据并输出到Print结果表中。 其他connector类型可参考Flink 1.15支持的connector列表。 package com.huawei.test; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; @SuppressWarnings({"deprecation", "rawtypes", "unchecked"}) public class GenToPrintTaskDew { private static final Logger LOGGER = LoggerFactory.getLogger(GenToPrintTaskAgency.class); private static final String datePattern = "yyyy-MM-dd_HH-mm-ss"; public static void main(String[] args) { LOGGER.info("Start task."); ParameterTool paraTool = ParameterTool.fromArgs(args); String checkpointInterval = "180000000"; // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval)); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern); String time = dateTimeFormat.format(System.currentTimeMillis()); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend( new FsStateBackend("obs://obs/xxx/testcheckpoint/" + time), true); env.setStateBackend(rocksDbBackend); String createCatalog = "CREATE CATALOG lf_catalog WITH (\n" + " 'type' = 'hive',\n" + " 'hive-conf-dir' = '/opt/hadoop/conf',\n" + " 'properties.catalog.lakeformation.auth.identity.util.class' = 'com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator',\n" + " 'properties.catalog.dew.endpoint'='kms.xxx.xxx.com',\n" + " 'properties.catalog.dew.csms.secretName'='obsAksK',\n" + " 'properties.catalog.dew.access.key' = 'ak',\n" + " 'properties.catalog.dew.secret.key' = 'sk',\n" + " 'properties.catalog.dew.projectId'='330e068af1334c9782f4226xxxxxxxxxx',\n" + " 'properties.catalog.dew.csms.version'='v9'\n" + " );"; tEnv.executeSql(createCatalog); String dataSource = "CREATE TABLE if not exists lf_catalog.`testdb`.`dataGenSourceJarDew618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.user_id.kind' = 'random',\n" + " 'fields.user_id.length' = '3'\n" + ")"; tEnv.executeSql(dataSource); /*testdb是用户自定义的数数据库*/ String printSink = "CREATE TABLE if not exists lf_catalog.`testdb`.`printSinkJarDew618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH ('connector' = 'print')"; tEnv.executeSql(printSink); /*testdb是用户自定义的数数据库*/ String query = "insert into lf_catalog.`test`.`printSinkJarDew618_1` " + "select * from lf_catalog.`test`.`dataGenSourceJarDew618_1`"; tEnv.executeSql(query); } } 创建Flink jar作业并配置如下参数。 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 优化参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例3:Flink jar支持Hudi表 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录 示例代码如下: 本例通过DataGen表产生随机数据并输出到Hudi结果表中。 其他connector类型可参考Flink 1.15支持的connector列表。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 package com.huawei.test; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.text.SimpleDateFormat; public class GenToHudiTask4 { private static final Logger LOGGER = LoggerFactory.getLogger(GenToHudiTask4.class); private static final String datePattern = "yyyy-MM-dd_HH-mm-ss"; public static void main(String[] args) throws IOException { LOGGER.info("Start task."); ParameterTool paraTool = ParameterTool.fromArgs(args); String checkpointInterval = "30000"; // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval)); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern); String time = dateTimeFormat.format(System.currentTimeMillis()); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend( new FsStateBackend("obs://xxx/jobs/testcheckpoint/" + time), true); env.setStateBackend(rocksDbBackend); String catalog = "CREATE CATALOG hoodie_catalog\n" + " WITH (\n" + " 'type'='hudi',\n" + " 'hive.conf.dir' = '/opt/hadoop/conf',\n" + " 'mode'='hms'\n" + " )"; tEnv.executeSql(catalog); String dwsSource = "CREATE TABLE if not exists genSourceJarForHudi618_1 (\n" + " order_id STRING,\n" + " order_name STRING,\n" + " price INT,\n" + " weight INT\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.order_id.kind' = 'random',\n" + " 'fields.order_id.length' = '8',\n" + " 'fields.order_name.kind' = 'random',\n" + " 'fields.order_name.length' = '8'\n" + ")"; tEnv.executeSql(dwsSource); /*testdb是用户自定义的数数据库*/ String printSinkdws = "CREATE TABLE if not exists hoodie_catalog.`testdb`.`hudiSinkJarHudi618_1` (\n" + " order_id STRING PRIMARY KEY NOT ENFORCED,\n" + " order_name STRING,\n" + " price INT,\n" + " weight INT,\n" + " create_time BIGINT,\n" + " create_date String\n" + ") WITH (" + "'connector' = 'hudi',\n" + "'path' = 'obs://xxx/catalog/dbtest3/hudiSinkJarHudi618_1',\n" + "'hoodie.datasource.write.recordkey.field' = 'order_id',\n" + "'EXTERNAL' = 'true'\n" + ")"; tEnv.executeSql(printSinkdws); /*testdb是用户自定义的数数据库*/ String query = "insert into hoodie_catalog.`testdb`.`hudiSinkJarHudi618_1` select\n" + " order_id,\n" + " order_name,\n" + " price,\n" + " weight,\n" + " UNIX_TIMESTAMP() as create_time,\n" + " FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyyMMdd') as create_date\n" + " from genSourceJarForHudi618_1"; tEnv.executeSql(query); } } 表8 hudi类型sink表的connector参数 参数 说明 是否必填 参数值 connector flink connector类型。 配置为hudi表示sink表是hudi表。 是 hudi path 表的基本路径。如果该路径不存在,则会创建它。 是 请参考示例代码中的配置值。 hoodie.datasource.write.recordkey.field hoodie表的唯一键字段名 否 这里配置order_id为唯一键。 EXTERNAL 是否外表 是 hudi表必填,且设置为true true 创建Flink jar作业并配置如下参数。 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 优化参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 -
  • 步骤4:授权使用LakeFormation资源 SQL作业场景 在进行SQL作业提交之前,需完成LakeFormation元数据、数据库、表、列和函数等资源授权,确保作业在执行过程中能够顺利访问所需的数据和资源。LakeFormation SQL资源权限支持列表提供了LakeFormation权限支持列表。 使用LakeFormation资源需要分别完成LakeFormation的IAM细粒度授权和LakeFormation SQL资源授权。 LakeFormation的IAM细粒度授权:授权使用LakeFormation API。 IAM服务通常提供了管理用户、组和角色的访问权限的方式。您可以在IAM控制台中创建策略(Policy),定义哪些用户或角色可以调用LakeFormation的API。然后,将这些策略附加到相应的用户或角色上。 方法1:基于角色授权: 即IAM最初提供的一种根据用户的工作职能定义权限的粗粒度授权机制。该机制以服务为粒度,提供有限的服务相关角色用于授权。 例如参考LakeFormation权限管理授予用户只读权限,允许查询LakeFormation相关元数据资源的权限。 或如下示例授予LakeFormation相关元数据资源的所有操作权限。 示例: { "Version": "1.1", "Statement": [ { "Effect": "Allow", "Action": [ "lakeformation:table:*", "lakeformation:database:*", "lakeformation:catalog:*", "lakeformation:function:*", "lakeformation:transaction:*", "lakeformation:policy:describe", "lakeformation:credential:describe" ] } ] } 方法2:基于策略的精细化授权: IAM提供的细粒度授权的能力,可以精确到具体服务的操作、资源以及请求条件等。 LakeFormation权限策略请参考LakeFormation权限和授权项。 IAM授权的具体操作请参考创建用户并授权使用LakeFormation。 LakeFormation SQL资源授权:授权使用LakeFormation具体资源(元数据、数据库、表、列和函数等)。 LakeFormation资源授权是指允许用户对特定资源的访问的权限,以此来控制对LakeFormation的数据和元数据的访问。 LakeFormation资源授权有两种方式: 方式一:在LakeFormation管理控制台对资源授权。 了解LakeFormation SQL资源权限请参考数据权限概述。 方式二:在DLI管理控制台使用GRANT SQL语句授权 GRANT语句是SQL语言中用于授权的一种方式。 您可以使用GRANT语句来授予用户或角色对数据库、表、列、函数等的访问权限。 LakeFormation SQL资源权限支持列表提供了LakeFormation资源授权的策略。 Catalog资源暂时不支持在DLI SQL授权,请参考▪方式一:在LakeFormation管理控制台...在LakeFormation 管理控制台完成授权。 Spark Jar、Flink OpenSource SQL、Flink Jar作业场景: 方式1:使用委托授权:使用Spark 3.3.1及以上版本、Flink 1.15版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在配置作业时添加新建的委托信息。 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 方式2:使用DEW授权: 已为授予IAM用户所需的IAM和Lakeformation权限,具体请参考•SQL作业场景的IAM授权的操作步骤。 已在DEW服务创建通用凭证,并存入凭据值。具体操作请参考创建通用凭据。 已创建DLI访问DEW的委托并完成委托授权。该委托需具备以下权限: DEW中的查询凭据的版本与凭据值ShowSecretVersion接口权限,csms:secretVersion:get。 DEW中的查询凭据的版本列表ListSecretVersions接口权限,csms:secretVersion:list。 DEW解密凭据的权限,kms:dek:decrypt。 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。