云服务器内容精选

  • 步骤5:在 DLI 作业开发时使用LakeFormation元数据 DLI对接LakeFormation默认实例且完成LakeFormation的资源授权后,即可以在作业开发时使用LakeFormation元数据。 DLI SQL: LakeFormation SQL语法说明请参考DLI Spark SQL语法参考。 在执行SQL作业时,您可以在控制台选择执行SQL所在的catalog,如图2所示,或在SQL命令中指定catalogName。catalogName是DLI控制台的数据目录映射名。 图2 在SQL编辑器页面选择数据目录 对接LakeFormation实例场景,在创建数据库时需要指定数据库存储的OBS路径。 对接LakeFormation实例场景,在创建表时不支持设置表生命周期和多版本。 对接LakeFormation实例场景,LOAD DATA语句不支持datasource表,且LOAD DATA分区表必须指定分区。 在LakeFormation控制台创建的数据库和表中包含中文字符时,不支持在DLI执行相关数据库和表的操作。 对接LakeFormation实例场景,不支持指定筛选条件删除分区。 对接LakeFormation实例场景,不支持创建Truncate Datasource/Hive外表。 DLI暂不支持使用LakeFormation行过滤条件功能。 DLI读取binary类型的数据进行console展示时,会对binary数据进行Base64转换。 在DLI暂不支持LakeFormation的路径授权。 DLI Spark Jar: 本节介绍在DLI管理控制台提交Spark Jar作业时使用LakeFormation元数据的配置操作。 Spark Jar 示例 SparkSession spark = SparkSession.builder() .enableHiveSupport() .appName("java_spark_demo") .getOrCreate(); spark.sql("show databases").show(); DLI管理控制台Spark Jar作业配置说明 (推荐)方式一:使用控制台提供的参数项(委托、元数据来源等)配置Spark Jar作业访问LakeFormation元数据 新建或编辑Spark Jar作业时,请参考表3Spark Jar作业访问LakeFormation元数据。 表3 配置Spark Jar作业访问LakeFormation元数据 参数 说明 配置示例 Spark版本 Spark 3.3.x及以上版本支持对接LakeFormation。 3.3.1 委托 使用Spark 3.3.1及以上版本的引擎执行作业时,需要您先在 IAM 页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: spark.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 访问元数据 配置开启Spark作业访问元数据功能。 是 元数据来源 配置Spark作业访问的元数据类型。本场景下请选择Lakeformation。 选择该参数后系统将自动为您的作业添加以下配置项用于加载lakeformation相关依赖。 spark.sql.catalogImplementation=hive spark.hadoop.hive-ext.dlcatalog.metastore.client.enable=true spark.hadoop.hive-ext.dlcatalog.metastore.session.client.class=com.huawei.cloud.dalf.lakecat.client.hiveclient.LakeCatMetaStoreClient og // lakeformation相关依赖加载 spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/* spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/* “元数据来源”还支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。 优先推荐您使用控制台提供的“元数据来源”参数项进行配置。 Lakeformation 数据目录名称 配置Spark作业访问的数据目录名称。 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。如需指定LakeFormation其他实例请参考◦方式二:使用Spark(--conf)参数配置...在Spark(--conf)中配置连接的Lakeformation实例和数据目录。 选择该参数后系统将自动为您的作业添加以下配置项用于连接Lakeformation默认实例下的数据目录。 spark.hadoop.lakecat.catalogname.default=lfcatalog “数据目录名称”还支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。 优先推荐您使用控制台提供的“数据目录名称”参数项进行配置。 - Spark参数(--conf) “元数据来源”和“数据目录名称”均支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。 如果您需要配置访问Hudi数据表,可在Spark(--conf)参数中填加以下配置项。 spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension spark.hadoop.hoodie.write.lock.provider=org.apache.hudi.lakeformation.LakeCatMetastoreBasedLockProvider 如果您需要配置访问Delta数据表,可在Spark(--conf)参数中填加以下配置项。 spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension - 方式二:使用Spark(--conf)参数配置Spark Jar作业访问LakeFormation元数据 新建或编辑Spark Jar作业时,请在作业配置页面的Spark(--conf)参数中按需配置以下信息以访问LakeFormation元数据。 spark.sql.catalogImplementation=hive spark.hadoop.hive-ext.dlcatalog.metastore.client.enable=true spark.hadoop.hive-ext.dlcatalog.metastore.session.client.class=com.huawei.cloud.dalf.lakecat.client.hiveclient.LakeCatMetaStoreClient spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension //支持hudi,可选 spark.hadoop.hoodie.write.lock.provider=org.apache.hudi.lakeformation.LakeCatMetastoreBasedLockProvider //支持hudi,可选 // 使用有OBS和lakeformation权限的委托访问,建议用户设置最小权限集 spark.dli.job.agency.name=agencyForLakeformation //需要访问的lakeformation实例ID,在lakeformation console查看。可选,如不填写访问Lakeformation的默认实例 spark.hadoop.lakeformation.instance.id=xxx //需要访问的lakeformation侧的CATA LOG 名称,在lakeformation console查看。可选,如不填写则默认值为hive spark.hadoop.lakecat.catalogname.default=lfcatalog // lakeformation相关依赖加载 spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/* spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/* DLI Flink OpenSource SQL 示例1:委托的方式对接Lakeformation 创建Flink OpenSource SQL作业并配置如下参数: 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 开启checkpoint 勾选开启checkpoint。 开启 自定义参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例中关于Catalog的参数说明请参考表4 表4 Flink OpenSource SQL示例中关于Catalog的参数说明 参数 说明 是否必填 参数值 type catalog类型 是 固定值hive hive-conf-dir hive-conf路径,固定值/opt/flink/conf 是 固定值/opt/flink/conf default-database 默认数据库名称 否 默认default库 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 CREATE CATALOG hive WITH ( 'type' = 'hive', 'hive-conf-dir' = '/opt/flink/conf', -- 固定配置/opt/flink/conf 'default-database'='default' ); USE CATALOG hive; CREATE TABLE IF NOT EXISTS dataGenSource612 (user_id string, amount int) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.user_id.kind' = 'random', 'fields.user_id.length' = '3' ); CREATE table IF NOT EXISTS printSink612 (user_id string, amount int) WITH ('connector' = 'print'); INSERT INTO printSink612 SELECT * FROM dataGenSource612; 示例2:DEW的方式对接Lakeformation 创建Flink OpenSource SQL作业并配置如下参数: 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 开启checkpoint 勾选开启checkpoint。 开启 自定义参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例中关于Catalog的参数说明请参考表5 需要指定properties.catalog.lakeformation.auth.identity.util.class参数值为com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator,并且配置dew相关配置。 表5 Flink OpenSource SQL示例中关于Catalog的参数说明(DEW方式) 参数 说明 是否必填 参数值 type catalog类型 是 固定值hive hive-conf-dir hive-conf路径,固定值/opt/flink/conf 是 固定值/opt/flink/conf default-database 默认数据库名称 否 不填默认default库 properties.catalog.lakecat.auth.identity.util.class 认证信息获取类 是 dew方式必填,固定配置为com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator properties.catalog.dew.projectId DEW所在的项目ID, 默认是Flink作业所在的项目ID。 是 使用dew方式必填 properties.catalog.dew.endpoint 指定要使用的DEW服务所在的endpoint信息。 是 使用dew方式必填。 配置示例:kms.xxx.com properties.catalog.dew.csms.secretName 在DEW服务的凭据管理中新建的通用凭据的名称。 是 使用dew方式必填 properties.catalog.dew.csms.version 在DEW服务的凭据管理中新建的通用凭据的版本号。 是 使用dew方式必填 properties.catalog.dew.access.key 在DEW服务的凭据中配置access.key值对应的key 是 使用dew方式必填 properties.catalog.dew.secret.key 在DEW服务的凭据中配置secret.key值对应的key 是 使用dew方式必填 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 CREATE CATALOG myhive WITH ( 'type' = 'hive', 'hive-conf-dir' = '/opt/flink/conf', 'default-database'='default', --下边是dew相关配置,请根据实际情况修改参数值 'properties.catalog.lakeformation.auth.identity.util.class' = 'com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator', 'properties.catalog.dew.endpoint'='kms.xxx.com', 'properties.catalog.dew.csms.secretName'='obsAksK', 'properties.catalog.dew.access.key' = 'myak', 'properties.catalog.dew.secret.key' = 'mysk', 'properties.catalog.dew.projectId'='330e068af1334c9782f4226xxxxxxxxx', 'properties.catalog.dew.csms.version'='v9' ); USE CATALOG myhive; create table IF NOT EXISTS dataGenSource_dew612( user_id string, amount int ) with ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.user_id.kind' = 'random', 'fields.user_id.length' = '3' ); create table IF NOT EXISTS printSink_dew612( user_id string, amount int ) with ( 'connector' = 'print' ); insert into printSink_dew612 select * from dataGenSource_dew612; 示例3:委托的方式对接Lakeformation写hudi表 创建Flink OpenSource SQL作业并配置如下参数: 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 开启checkpoint 勾选开启checkpoint。 开启 自定义参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例中关于Catalog的参数说明请参考表6。 表6 hudi类型Catalog参数说明 参数 说明 是否必填 参数值 type catalog类型 是 hudi表配置为hudi。 hive-conf-dir hive-conf路径,固定值/opt/flink/conf 是 固定值/opt/flink/conf。 default-database 默认数据库名称 否 默认default库。 mode 取值'hms' 或 'non-hms'。 'hms' 表示创建的 Hudi Catalog 会使用 Hive Metastore 存储元数据信息。 'non-hms'表示不使用Hive Metastore存储元数据信息。 是 固定值hms。 表7 hudi类型sink表的connector参数 参数 说明 是否必填 参数值 connector flink connector类型。 配置为hudi表示sink表是hudi表。 是 hudi path 表的基本路径。如果该路径不存在,则会创建它。 是 请参考示例代码中的配置值。 hoodie.datasource.write.recordkey.field hoodie表的唯一键字段名 否 这里配置order_id为唯一键。 EXTERNAL 是否外表 是 hudi表必填,且设置为true true CREATE CATALOG hive_catalog WITH ( 'type'='hive', 'hive-conf-dir' = '/opt/flink/conf', 'default-database'='test' ); USE CATALOG hive_catalog; create table if not exists genSource618 ( order_id STRING, order_name STRING, price INT, weight INT ) with ( 'connector' = 'datagen', 'rows-per-second' = '1', 'fields.order_id.kind' = 'random', 'fields.order_id.length' = '8', 'fields.order_name.kind' = 'random', 'fields.order_name.length' = '5' ); CREATE CATALOG hoodie_catalog WITH ( 'type'='hudi', 'hive.conf.dir' = '/opt/flink/conf', 'mode'='hms' -- supports 'dfs' mode that uses the DFS backend for table DDLs persistence ); CREATE TABLE if not exists hoodie_catalog.`test`.`hudiSink618` ( `order_id` STRING PRIMARY KEY NOT ENFORCED, `order_name` STRING, `price` INT, `weight` INT, `create_time` BIGINT, `create_date` String ) PARTITIONED BY (create_date) WITH ( 'connector' = 'hudi', 'path' = 'obs://xxx/catalog/dbtest3/hudiSink618', 'hoodie.datasource.write.recordkey.field' = 'order_id', 'write.precombine.field' = 'create_time', 'EXTERNAL' = 'true' -- must be set ); insert into hoodie_catalog.`test`.`hudiSink618` select order_id, order_name, price, weight, UNIX_TIMESTAMP() as create_time, FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyyMMdd') as create_date from genSource618; DLI Flink Jar 示例1:委托方式对接Lakeformation 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录 示例代码如下: 本例通过DataGen表产生随机数据并输出到Print结果表中。 其他connector类型可参考Flink 1.15支持的connector列表。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 package com.huawei.test; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; @SuppressWarnings({"deprecation", "rawtypes", "unchecked"}) public class GenToPrintTaskAgency { private static final Logger LOGGER = LoggerFactory.getLogger(GenToPrintTaskAgency.class); private static final String datePattern = "yyyy-MM-dd_HH-mm-ss"; public static void main(String[] args) { LOGGER.info("Start task."); ParameterTool paraTool = ParameterTool.fromArgs(args); String checkpointInterval = "180000000"; // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval)); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern); String time = dateTimeFormat.format(System.currentTimeMillis()); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend( new FsStateBackend("obs://obs/xxx/testcheckpoint/" + time), true); env.setStateBackend(rocksDbBackend); String createCatalog = "CREATE CATALOG lf_catalog WITH (\n" + " 'type' = 'hive',\n" + " 'hive-conf-dir' = '/opt/hadoop/conf'\n" + " );"; tEnv.executeSql(createCatalog); String dataSource = "CREATE TABLE if not exists lf_catalog.`testdb`.`dataGenSourceJar618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.user_id.kind' = 'random',\n" + " 'fields.user_id.length' = '3'\n" + ")"; /*testdb是用户自定义的数数据库*/ tEnv.executeSql(dataSource); String printSink = "CREATE TABLE if not exists lf_catalog.`testdb`.`printSinkJar618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH ('connector' = 'print')"; tEnv.executeSql(printSink); /*testdb是用户自定义的数数据库*/ String query = "insert into lf_catalog.`test`.`printSinkJar618_1` " + "select * from lf_catalog.`test`.`dataGenSourceJar618_1`"; tEnv.executeSql(query); } } 创建Flink jar作业并配置如下参数。 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 优化参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例2:DEW方式对接Lakeformation 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录 示例代码如下: 本例通过DataGen表产生随机数据并输出到Print结果表中。 其他connector类型可参考Flink 1.15支持的connector列表。 package com.huawei.test; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; @SuppressWarnings({"deprecation", "rawtypes", "unchecked"}) public class GenToPrintTaskDew { private static final Logger LOGGER = LoggerFactory.getLogger(GenToPrintTaskAgency.class); private static final String datePattern = "yyyy-MM-dd_HH-mm-ss"; public static void main(String[] args) { LOGGER.info("Start task."); ParameterTool paraTool = ParameterTool.fromArgs(args); String checkpointInterval = "180000000"; // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval)); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern); String time = dateTimeFormat.format(System.currentTimeMillis()); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend( new FsStateBackend("obs://obs/xxx/testcheckpoint/" + time), true); env.setStateBackend(rocksDbBackend); String createCatalog = "CREATE CATALOG lf_catalog WITH (\n" + " 'type' = 'hive',\n" + " 'hive-conf-dir' = '/opt/hadoop/conf',\n" + " 'properties.catalog.lakeformation.auth.identity.util.class' = 'com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator',\n" + " 'properties.catalog.dew.endpoint'='kms.xxx.xxx.com',\n" + " 'properties.catalog.dew.csms.secretName'='obsAksK',\n" + " 'properties.catalog.dew.access.key' = 'ak',\n" + " 'properties.catalog.dew.secret.key' = 'sk',\n" + " 'properties.catalog.dew.projectId'='330e068af1334c9782f4226xxxxxxxxxx',\n" + " 'properties.catalog.dew.csms.version'='v9'\n" + " );"; tEnv.executeSql(createCatalog); String dataSource = "CREATE TABLE if not exists lf_catalog.`testdb`.`dataGenSourceJarDew618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.user_id.kind' = 'random',\n" + " 'fields.user_id.length' = '3'\n" + ")"; tEnv.executeSql(dataSource); /*testdb是用户自定义的数数据库*/ String printSink = "CREATE TABLE if not exists lf_catalog.`testdb`.`printSinkJarDew618_1` (\n" + " user_id string,\n" + " amount int\n" + ") WITH ('connector' = 'print')"; tEnv.executeSql(printSink); /*testdb是用户自定义的数数据库*/ String query = "insert into lf_catalog.`test`.`printSinkJarDew618_1` " + "select * from lf_catalog.`test`.`dataGenSourceJarDew618_1`"; tEnv.executeSql(query); } } 创建Flink jar作业并配置如下参数。 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 优化参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例3:Flink jar支持Hudi表 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录 示例代码如下: 本例通过DataGen表产生随机数据并输出到Hudi结果表中。 其他connector类型可参考Flink 1.15支持的connector列表。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 package com.huawei.test; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.text.SimpleDateFormat; public class GenToHudiTask4 { private static final Logger LOGGER = LoggerFactory.getLogger(GenToHudiTask4.class); private static final String datePattern = "yyyy-MM-dd_HH-mm-ss"; public static void main(String[] args) throws IOException { LOGGER.info("Start task."); ParameterTool paraTool = ParameterTool.fromArgs(args); String checkpointInterval = "30000"; // set up execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode().build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval)); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern); String time = dateTimeFormat.format(System.currentTimeMillis()); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend( new FsStateBackend("obs://xxx/jobs/testcheckpoint/" + time), true); env.setStateBackend(rocksDbBackend); String catalog = "CREATE CATALOG hoodie_catalog\n" + " WITH (\n" + " 'type'='hudi',\n" + " 'hive.conf.dir' = '/opt/hadoop/conf',\n" + " 'mode'='hms'\n" + " )"; tEnv.executeSql(catalog); String dwsSource = "CREATE TABLE if not exists genSourceJarForHudi618_1 (\n" + " order_id STRING,\n" + " order_name STRING,\n" + " price INT,\n" + " weight INT\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second' = '1',\n" + " 'fields.order_id.kind' = 'random',\n" + " 'fields.order_id.length' = '8',\n" + " 'fields.order_name.kind' = 'random',\n" + " 'fields.order_name.length' = '8'\n" + ")"; tEnv.executeSql(dwsSource); /*testdb是用户自定义的数数据库*/ String printSinkdws = "CREATE TABLE if not exists hoodie_catalog.`testdb`.`hudiSinkJarHudi618_1` (\n" + " order_id STRING PRIMARY KEY NOT ENFORCED,\n" + " order_name STRING,\n" + " price INT,\n" + " weight INT,\n" + " create_time BIGINT,\n" + " create_date String\n" + ") WITH (" + "'connector' = 'hudi',\n" + "'path' = 'obs://xxx/catalog/dbtest3/hudiSinkJarHudi618_1',\n" + "'hoodie.datasource.write.recordkey.field' = 'order_id',\n" + "'EXTERNAL' = 'true'\n" + ")"; tEnv.executeSql(printSinkdws); /*testdb是用户自定义的数数据库*/ String query = "insert into hoodie_catalog.`testdb`.`hudiSinkJarHudi618_1` select\n" + " order_id,\n" + " order_name,\n" + " price,\n" + " weight,\n" + " UNIX_TIMESTAMP() as create_time,\n" + " FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyyMMdd') as create_date\n" + " from genSourceJarForHudi618_1"; tEnv.executeSql(query); } } 表8 hudi类型sink表的connector参数 参数 说明 是否必填 参数值 connector flink connector类型。 配置为hudi表示sink表是hudi表。 是 hudi path 表的基本路径。如果该路径不存在,则会创建它。 是 请参考示例代码中的配置值。 hoodie.datasource.write.recordkey.field hoodie表的唯一键字段名 否 这里配置order_id为唯一键。 EXTERNAL 是否外表 是 hudi表必填,且设置为true true 创建Flink jar作业并配置如下参数。 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 优化参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 -
  • 步骤4:授权使用LakeFormation资源 SQL作业场景 在进行SQL作业提交之前,需完成LakeFormation元数据、数据库、表、列和函数等资源授权,确保作业在执行过程中能够顺利访问所需的数据和资源。LakeFormation SQL资源权限支持列表提供了LakeFormation权限支持列表。 使用LakeFormation资源需要分别完成LakeFormation的IAM细粒度授权和LakeFormation SQL资源授权。 LakeFormation的IAM细粒度授权:授权使用LakeFormation API。 IAM服务通常提供了管理用户、组和角色的访问权限的方式。您可以在IAM控制台中创建策略(Policy),定义哪些用户或角色可以调用LakeFormation的API。然后,将这些策略附加到相应的用户或角色上。 方法1:基于角色授权: 即IAM最初提供的一种根据用户的工作职能定义权限的粗粒度授权机制。该机制以服务为粒度,提供有限的服务相关角色用于授权。 例如参考LakeFormation权限管理授予用户只读权限,允许查询LakeFormation相关元数据资源的权限。 或如下示例授予LakeFormation相关元数据资源的所有操作权限。 示例: { "Version": "1.1", "Statement": [ { "Effect": "Allow", "Action": [ "lakeformation:table:*", "lakeformation:database:*", "lakeformation:catalog:*", "lakeformation:function:*", "lakeformation:transaction:*", "lakeformation:policy:describe", "lakeformation:credential:describe" ] } ] } 方法2:基于策略的精细化授权: IAM提供的细粒度授权的能力,可以精确到具体服务的操作、资源以及请求条件等。 LakeFormation权限策略请参考LakeFormation权限和授权项。 IAM授权的具体操作请参考创建用户并授权使用LakeFormation。 LakeFormation SQL资源授权:授权使用LakeFormation具体资源(元数据、数据库、表、列和函数等)。 LakeFormation资源授权是指允许用户对特定资源的访问的权限,以此来控制对LakeFormation的数据和元数据的访问。 LakeFormation资源授权有两种方式: 方式一:在LakeFormation管理控制台对资源授权。 了解LakeFormation SQL资源权限请参考数据权限概述。 方式二:在DLI管理控制台使用GRANT SQL语句授权 GRANT语句是SQL语言中用于授权的一种方式。 您可以使用GRANT语句来授予用户或角色对数据库、表、列、函数等的访问权限。 LakeFormation SQL资源权限支持列表提供了LakeFormation资源授权的策略。 Catalog资源暂时不支持在DLI SQL授权,请参考▪方式一:在LakeFormation管理控制台...在LakeFormation 管理控制台完成授权。 Spark Jar、Flink OpenSource SQL、Flink Jar作业场景: 方式1:使用委托授权:使用Spark 3.3.1及以上版本、Flink 1.15版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在配置作业时添加新建的委托信息。 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 方式2:使用DEW授权: 已为授予IAM用户所需的IAM和Lakeformation权限,具体请参考•SQL作业场景的IAM授权的操作步骤。 已在DEW服务创建通用凭证,并存入凭据值。具体操作请参考创建通用凭据。 已创建DLI访问DEW的委托并完成委托授权。该委托需具备以下权限: DEW中的查询凭据的版本与凭据值ShowSecretVersion接口权限,csms:secretVersion:get。 DEW中的查询凭据的版本列表ListSecretVersions接口权限,csms:secretVersion:list。 DEW解密凭据的权限,kms:dek:decrypt。 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。
  • 步骤3:在DLI管理控制台创建数据目录 在DLI管理控制台需要创建到Catalog的连接,才可以访问LakeFormation实例中存储的Catalog。 DLI仅支持对接LakeFormation默认实例,请在LakeFormation设置实例为默认实例。 LakeFormation中每一个数据目录只能创建一个映射,不能创建多个。 例如用户在DLI创建了映射名catalogMapping1对应LakeFormation数据目录:catalogA。创建成功后,在同一个项目空间下,不能再创建到catalogA的映射。 登录DLI管理控制台。 选择“SQL编辑器 ”。 在SQL编辑器页面,选择“数据目录”。 单击创建数据目录。 配置数据目录相关信息。 表2 数据目录配置信息 参数名称 是否必填 说明 外部数据目录名称 是 LakeFormation默认实例下的Catalog名称。 类型 是 当前只支持LakeFormation。 该选项已固定,无需填写。 数据目录映射名称 是 在DLI使用的Catalog映射名,用户在执行SQL语句的时候需要指定Catalog映射,以此来标识访问的外部的元数据。建议与外部数据目录名称保持一致。 当前仅支持连接LakeFormation默认实例的数据目录。 描述 否 自定义数据目录的描述信息。 单击“确定”创建数据目录。
  • 步骤1:创建LakeFormation实例用于元数据存储 LakeFormation实例为元数据的管理提供基础资源,DLI仅支持对接LakeFormation的默认实例。 创建实例 登录LakeFormation管理控制台。 单击页面右上角“立即购买”或“购买实例”,进入实例购买页面。 首次创建实例时界面显示“立即购买”,如果界面已有LakeFormation实例则显示为“购买实例”。 按需配置LakeFormation实例参数,完成实例创建。 本例创建按需计费的共享型实例。 更多参数配置及说明,请参考创建LakeFormation实例。 设置实例为默认实例 查看实例“基本信息”中“是否为默认实例”的参数值。 “true”表示当前实例为默认实例。 “false”表示当前实例不为默认实例。 如果需要设置当前实例为默认实例,请单击页面右上角“设为默认实例”。 勾选操作影响后单击“确定”,将当前实例设置为默认实例。 当前DLI仅对接LakeFormation默认实例,变更默认实例后,可能对使用LakeFormation的周边服务产生影响,请谨慎操作。
  • 操作场景 LakeFormation是企业级一站式 湖仓构建 服务,提供元数据统一管理能力,支持无缝对接多种计算引擎及大数据云服务,便捷高效地构建 数据湖 和运营相关业务,加速释放业务数据价值。 在DLI的Spark作业和SQL作业场景,支持对接LakeFormation实现元数据的统一管理,本节操作介绍配置DLI与LakeFormation的数据连接的操作步骤。 LakeFormation Spark语法请参考Spark语法参考。 LakeFormation Flink语法请参考Flink语法参考。 HetuEngine SQL语法请参考HetuEngine SQL语法参考。
  • 约束限制 在表1中提供了支持对接LakeFormation获取元数据的队列和引擎类型。 查看队列的引擎类型和版本请参考查看队列的基本信息。 表1 LakeFormation获取元数据的队列和引擎类型 队列类型 引擎类型和支持的版本 default队列 Spark 3.3.x:支持对接LakeFormation获取元数据的队列和引擎。 HetuEngine 2.1.0:支持对接LakeFormation获取元数据的队列和引擎。 SQL队列 Spark 3.3.x:支持对接LakeFormation获取元数据的队列和引擎。 HetuEngine 2.1.0:支持对接LakeFormation获取元数据的队列和引擎。 通用队列 Flink作业场景:Flink 1.15及以上版本且使用弹性资源池队列时支持对接LakeFormation获取元数据。 DLI仅支持对接LakeFormation默认实例,请在LakeFormation设置实例为默认实例。 DLI支持读取Lakeformation的中Avro、Json、Parquet、Csv、Orc、Text、Hudi格式的数据。 LakeFormation数据目录中的库、表权限统一由LakeFormation管理。 DLI支持对接LakeFormation后,DLI原始库表下移至dli的数据目录下。
  • 常见问题 问题一:查询OBS分区表报错,报错信息如下: DLI.0005: There should be at least one partition pruning predicate on partitioned table `xxxx`.`xxxx`.; 问题根因:查询OBS分区表时没有携带分区字段。 解决方案:查询OBS分区表时,where条件中至少包含一个分区字段。 问题二:使用DataSource语法指定OBS文件路径创建OBS表,insert数据到OBS表,显示作业运行失败,报:“DLI.0007: The output path is a file, don't support INSERT...SELECT” 错误。 问题示例语句参考如下: CREATE TABLE testcsvdatasource (name string, id int) USING csv OPTIONS (path "obs://dli-test-021/data/test.csv"); 问题根因:创建OBS表指定的OBS路径为具体文件,导致不能插入数据。例如上述示例中的OBS路径为:"obs://dli-test-021/data/test.csv"。 解决方案:使用DataSource语法创建OBS表指定的OBS文件路径改为文件目录即可,后续即可通过insert插入数据。上述示例,建表语句可以修改为: CREATE TABLE testcsvdatasource (name string, id int) USING csv OPTIONS (path "obs://dli-test-021/data"); 问题三:使用Hive语法创建OBS分区表时,提示语法格式不对。例如,如下使用Hive语法创建以classNo为分区的OBS表: CREATE TABLE IF NOT EXISTS testtable(name STRING, score DOUBLE, classNo INT) PARTITIONED BY (classNo) STORED AS TEXTFILE LOCATION 'obs://dli-test-021/data7'; 问题根因:使用Hive语法创建OBS分区表时,分区字段不能出现在表名后的字段列表中,只能定义在PARTITIONED BY后。 解决方案:使用Hive语法创建OBS分区表时,分区字段指定在PARTITIONED BY后。例如: CREATE TABLE IF NOT EXISTS testtable(name STRING, score DOUBLE) PARTITIONED BY (classNo INT) STORED AS TEXTFILE LOCATION 'obs://dli-test-021/data7';
  • 使用DataSource语法创建OBS表 以下通过创建 CS V格式的OBS表举例,创建其他数据格式的OBS表方法类似,此处不一一列举。 创建OBS非分区表 指定OBS数据文件,创建csv格式的OBS表。 按照以下文件内容创建“test.csv”文件,并将“test.csv”文件上传到OBS桶“dli-test-021”的根目录下。 Jordon,88,23 Kim,87,25 Henry,76,26 登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”,执行以下命令创建OBS表。 CREATE TABLE testcsvdatasource (name STRING, score DOUBLE, classNo INT ) USING csv OPTIONS (path "obs://dli-test-021/test.csv"); 如果是通过指定的数据文件创建的OBS表,后续不支持在DLI通过insert表操作插入数据。OBS文件内容和表数据保持同步。 查询已创建的“testcsvdatasource”表数据。 select * from testcsvdatasource; 图1 查询结果 本地修改原始的OBS表文件“test.csv”,增加一行“Aarn,98,20”数据,重新替换OBS桶目录下的“test.csv”文件。 Jordon,88,23 Kim,87,25 Henry,76,26 Aarn,98,20 在DLI的SQL编辑器中再次查询“testcsvdatasource”表数据,DLI上可以查询到新增的“Aarn,98,20”数据。 select * from testcsvdatasource; 图2 查询结果 指定OBS数据文件目录,创建csv格式的OBS表。 指定的OBS数据目录不包含数据文件。 在OBS桶“dli-test-021”根目录下创建数据文件目录“data”。 登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”。在DLI的“testdb”数据库下创建OBS表“testcsvdata2source”。 CREATE TABLE testcsvdata2source (name STRING, score DOUBLE, classNo INT) USING csv OPTIONS (path "obs://dli-test-021/data"); 通过insert语句插入表数据。 insert into testcsvdata2source VALUES('Aarn','98','20'); insert作业运行成功后,查询OBS表“testcsvdata2source”数据。 select * from testcsvdata2source; 图3 查询结果 在OBS桶的“obs://dli-test-021/data”目录下刷新后查询,生成了csv数据文件,文件内容为insert插入的数据内容。 图4 查询结果 指定的OBS数据目录包含数据文件。 在OBS桶“dli-test-021”根目录下创建数据文件目录“data2”。创建如下内容的测试数据文件“test.csv”,并上传文件到“obs://dli-test-021/data2”目录下。 Jordon,88,23 Kim,87,25 Henry,76,26 登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”。在DLI的“testdb”数据库下创建OBS表“testcsvdata3source”。 CREATE TABLE testcsvdata3source (name STRING, score DOUBLE, classNo INT) USING csv OPTIONS (path "obs://dli-test-021/data2"); 通过insert语句插入表数据。 insert into testcsvdata3source VALUES('Aarn','98','20'); insert作业运行成功后,查询OBS表“testcsvdata3source”数据。 select * from testcsvdata3source; 图5 查询结果 在OBS桶的“obs://dli-test-021/data2”目录下刷新后查询,生成了一个csv数据文件,内容为insert插入的表数据内容。 图6 查询结果 创建OBS分区表 创建单分区OBS表 在OBS桶“dli-test-021”根目录下创建数据文件目录“data3”。 登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”。在DLI的“testdb”数据库下创建以“classNo”列为分区的OBS分区表“testcsvdata4source”,指定OBS目录“obs://dli-test-021/data3”。 CREATE TABLE testcsvdata4source (name STRING, score DOUBLE, classNo INT) USING csv OPTIONS (path "obs://dli-test-021/data3") PARTITIONED BY (classNo); 在OBS桶的“obs://dli-test-021/data3”目录下创建“classNo=25”的分区目录。根据以下文件内容创建数据文件“test.csv”,并上传到OBS的“obs://dli-test-021/data3/classNo=25”目录下。 Jordon,88,25 Kim,87,25 Henry,76,25 在SQL编辑器中执行以下命令,导入分区数据到OBS表“testcsvdata4source ”。 ALTER TABLE testcsvdata4source ADD PARTITION (classNo = 25) LOCATION 'obs://dli-test-021/data3/classNo=25'; 查询OBS表“testcsvdata4source ”classNo分区为“25”的数据: select * from testcsvdata4source where classNo = 25; 图7 查询结果 插入如下数据到OBS表“testcsvdata4source ”: insert into testcsvdata4source VALUES('Aarn','98','25'); insert into testcsvdata4source VALUES('Adam','68','24'); 查询OBS表“testcsvdata4source ”classNo分区为“25”和“24”的数据。 分区表在进行查询时where条件中必须携带分区字段,否则会查询失败,报:DLI.0005: There should be at least one partition pruning predicate on partitioned table。 select * from testcsvdata4source where classNo = 25; 图8 查询结果 select * from testcsvdata4source where classNo = 24; 图9 查询结果 在OBS桶的“obs://dli-test-021/data3”目录下点击刷新,该目录下生成了对应的分区文件,分别存放新插入的表数据。 图10 OBS上classNo分区为“25”文件数据 图11 OBS上classNo分区为“24”文件数据 创建多分区OBS表 在OBS桶“dli-test-021”根目录下创建数据文件目录“data4”。 登录DLI管理控制台,选择“SQL编辑器”,在SQL编辑器中“执行引擎”选择“spark”,“队列”选择已创建的SQL队列,数据库选择“testdb”。在“testdb”数据库下创建以“classNo”和“dt”列为分区的OBS分区表“testcsvdata5source”,指定OBS目录“obs://dli-test-021/data4”。 CREATE TABLE testcsvdata5source (name STRING, score DOUBLE, classNo INT, dt varchar(16)) USING csv OPTIONS (path "obs://dli-test-021/data4") PARTITIONED BY (classNo,dt); 给 testcsvdata5source表插入如下测试数据: insert into testcsvdata5source VALUES('Aarn','98','25','2021-07-27'); insert into testcsvdata5source VALUES('Adam','68','25','2021-07-28'); 根据classNo分区列查询testcsvdata5source数据。 select * from testcsvdata5source where classNo = 25; 图12 查询结果 根据dt分区列查询testcsvdata5source数据。 select * from testcsvdata5source where dt like '2021-07%'; 图13 查询结果 在OBS桶“obs://dli-test-021/data4”目录下刷新后查询,会生成如下数据文件: 文件目录1:obs://dli-test-021/data4/xxxxxx/classNo=25/dt=2021-07-27 图14 查询结果 文件目录2:obs://dli-test-021/data4/xxxxxx/classNo=25/dt=2021-07-28 图15 查询结果 在OBS桶的“obs://dli-test-021/data4”目录下创建“classNo=24”的分区目录,再在“classNo=24”目录下创建子分区目录“dt=2021-07-29”。根据以下文件内容创建数据文件“test.csv”,并上传到OBS的“obs://dli-test-021/data4/classNo=24/dt=2021-07-29”目录下。 Jordon,88,24,2021-07-29 Kim,87,24,2021-07-29 Henry,76,24,2021-07-29 在SQL编辑器中执行以下命令,导入分区数据到OBS表“testcsvdata5source ”。 ALTER TABLE testcsvdata5source ADD PARTITION (classNo = 24,dt='2021-07-29') LOCATION 'obs://dli-test-021/data4/classNo=24/dt=2021-07-29'; 根据classNo分区列查询testcsvdata5source数据。 select * from testcsvdata5source where classNo = 24; 图16 查询结果 根据dt分区列查询所有“2021-07”月的所有数据。 select * from testcsvdata5source where dt like '2021-07%'; 图17 查询结果
  • DataSource和Hive两种语法创建OBS表的区别 DataSource语法和Hive语法主要区别在于支持的表数据存储格式范围、支持的分区数等有差异。两种语法创建OBS表主要差异点参见表1。 表1 DataSource语法和Hive语法创建OBS表的差异点 语法 支持的数据类型范围 创建分区表时分区字段差异 支持的分区数 DataSource语法 支持ORC,PARQUET,JSON,CSV,AVRO类型 创建分区表时,分区字段在表名和PARTITIONED BY后都需要指定。具体可以参考DataSource语法创建单分区OBS表。 单表分区数最多允许7000个。 Hive语法 支持TEXTFILE, AVRO, ORC, SEQUENCEFILE, RCFILE, PARQUET 创建分区表时,指定的分区字段不能出现在表后,只能通过PARTITIONED BY指定分区字段名和类型。具体可以参考Hive语法创建OBS分区表。 单表分区数最多允许100000个。 创建OBS表的DataSource语法可以参考使用DataSource语法创建OBS表。 创建OBS表的Hive语法可以参考使用Hive语法创建OBS表。
  • 监控指标 表1 数据湖探索 服务支持的监控指标 指标ID 指标名称 指标含义 取值范围 单位 进制 测量对象 监控周期(原始指标) queue_cu_num 队列CU使用量 展示用户队列申请的CU数 ≥0 Count 不涉及 队列 5分钟 queue_job_launching_num 提交中作业数 展示用户队列中状态为提交中的作业数。 ≥0 Count 不涉及 队列 5分钟 queue_job_running_num 运行中作业数 展示用户队列中状态为运行中的作业数。 ≥0 Count 不涉及 队列 5分钟 queue_job_succeed_num 已完成作业数 展示用户队列中状态为已完成的作业数。 ≥0 Count 不涉及 队列 5分钟 queue_job_failed_num 已失败作业数 展示用户队列中状态为已失败的作业数。 ≥0 Count 不涉及 队列 5分钟 queue_job_cancelled_num 已取消作业数 展示用户队列中状态为已取消的作业数。 ≥0 Count 不涉及 队列 5分钟 queue_alloc_cu_num 队列CU分配量 展示用户队列的CU分配情况。 ≥0 Count 不涉及 队列 5分钟 queue_min_cu_num 队列最小CU 展示用户队列中的最小CU。 ≥0 Count 不涉及 队列 5分钟 queue_max_cu_num 队列最大CU 展示用户队列中的最大CU。 ≥0 Count 不涉及 队列 5分钟 queue_priority 队列优先级 展示用户队列的优先级。 1~100 不涉及 不涉及 队列 5分钟 queue_cpu_usage 队列CPU使用率 展示用户队列的CPU使用率。 0~100 % 不涉及 队列 该指标仅适用于普通队列。 5分钟 queue_disk_usage 队列磁盘使用率 展示用户队列的磁盘使用率。 0~100 % 不涉及 队列 该指标仅适用于普通队列。 5分钟 queue_disk_used 队列磁盘使用率最大值 展示用户队列的磁盘使用率的最大值。 0~100 % 不涉及 队列 该指标仅适用于普通队列。 5分钟 queue_mem_usage 队列内存使用率 展示用户队列的内存使用率。 0~100 % 不涉及 队列 该指标仅适用于普通队列。 5分钟 queue_mem_used 队列内存使用量 展示用户队列的内存使用量。 ≥0 MB 不涉及 队列 该指标仅适用于普通队列。 5分钟 queue_job_launching_max_duration 作业提交最大时长 该指标用于统计采样时间点提交中的作业最长的持续时间。(包括SQL作业、Flink作业、Spark作业) ≥0 Seconds 不涉及 队列 5分钟 该指标属于瞬时采样指标(非连续性采样),用于记录采样时刻为 “提交中”或者“启动中” 的作业的最大提交时长,并非对全量作业的统计性指标。不涉及对历史作业或已完成作业的数据统计。仅适用于监控队列运行状态。 queue_sql_job_running_max_duration SQL作业运行最大时长 该指标用于统计采样时间点运行中的SQL作业最长的持续时间。 ≥0 Seconds 不涉及 队列 5分钟 该指标属于瞬时采样指标(非连续性采样),用于记录采样时刻为 “运行中” 的SQL作业的最大运行时长,并非对全量作业的统计性指标。不涉及对历史作业或已完成作业的数据统计。仅适用于监控队列运行状态 queue_spark_job_running_max_duration Spark作业运行最大时长 该指标用于统计采样时间点运行中的Spark作业最长的持续时间。 ≥0 Seconds 不涉及 队列 5分钟 该指标属于瞬时采样指标(非连续性采样),用于记录采样时刻为 “运行中” 的Spark作业的最大运行时长,并非对全量作业的统计性指标。不涉及对历史作业或已完成作业的数据统计。仅适用于监控队列运行状态 flink_read_records_per_second Flink作业数据输入速率 展示用户Flink作业的数据输入速率,供监控和调试使用。 ≥0 record/s 不涉及 Flink作业 10秒钟 flink_write_records_per_second Flink作业数据输出速率 展示用户Flink作业的数据输出速率,供监控和调试使用。 ≥0 record/s 不涉及 Flink作业 10秒钟 flink_read_records_total Flink作业数据输入总数 展示用户Flink作业的数据输入总数,供监控和调试使用。 ≥0 record/s 不涉及 Flink作业 10秒钟 flink_write_records_total Flink作业数据输出总数 展示用户Flink作业的数据输出总数,供监控和调试使用。 ≥0 record/s 不涉及 Flink作业 10秒钟 flink_read_bytes_per_second Flink作业字节输入速率 展示用户Flink作业每秒输入的字节数。 ≥0 byte/s 1024(IEC) Flink作业 10秒钟 flink_write_bytes_per_second Flink作业字节输出速率 展示用户Flink作业每秒输出的字节数。 ≥0 byte/s 1024(IEC) Flink作业 10秒钟 flink_read_bytes_total Flink作业字节输入总数 展示用户Flink作业字节的输入总数。 ≥0 byte/s 1024(IEC) Flink作业 10秒钟 flink_write_bytes_total Flink作业字节输出总数 展示用户Flink作业字节的输出总数。 ≥0 byte/s 1024(IEC) Flink作业 10秒钟 flink_cpu_usage Flink作业CPU使用率 展示用户Flink作业的CPU使用率。 0~100 % 不涉及 Flink作业 10秒钟 flink_mem_usage Flink作业内存使用率 展示用户Flink作业的内存使用率。 0~100 % 不涉及 Flink作业 10秒钟 flink_max_op_latency Flink作业最大算子延迟 展示用户Flink作业的最大算子延迟时间,单位ms。 ≥0 ms 不涉及 Flink作业 10秒钟 flink_max_op_backpressure_level Flink作业最大算子反压 展示用户Flink作业的最大算子反压值,数值越大,反压越严重。 0:表示OK 50:表示Low 100:表示High 0~100 不涉及 不涉及 Flink作业 10秒钟 elastic_resource_pool_cpu_usage 弹性资源池CPU使用率 展示用户弹性资源池的CPU使用率。 0~100 % 不涉及 弹性资源池 5分钟 elastic_resource_pool_mem_usage 弹性资源池内存使用率 展示用户弹性资源池的内存使用率。 0~100 % 不涉及 弹性资源池 5分钟 elastic_resource_pool_disk_usage 弹性资源池磁盘使用率 展示用户弹性资源池的磁盘使用率。 0~100 % 不涉及 弹性资源池 5分钟 elastic_resource_pool_disk_max_usage 弹性资源池磁盘使用率最大值 展示用户弹性资源池的磁盘使用率最大值。 0~100 % 不涉及 弹性资源池 5分钟 elastic_resource_pool_cu_num 弹性资源池CU使用量 展示用户弹性资源池的CU使用量。 ≥0 Count 不涉及 弹性资源池 5分钟 elastic_resource_pool_alloc_cu_num 弹性资源池CU分配量 展示用户弹性资源池的CU分配情况。 ≥0 Count 不涉及 弹性资源池 5分钟 elastic_resource_pool_min_cu_num 弹性资源池最小CU 展示用户弹性资源池的最小CU。 ≥0 Count 不涉及 弹性资源池 5分钟 elastic_resource_pool_max_cu_num 弹性资源池最大CU 展示用户弹性资源池的最大CU。 ≥0 Count 不涉及 弹性资源池 5分钟
  • Lakeformation权限策略(HetuEngine) 表3 HetuEngine语法LakeFormation权限配置参考表 类型 语法 SQL鉴权所需 LakeFormation权限 调用元数据接口所需 LakeFormation权限 schema create schema catalog:CREATE_DATABASE catalog:CREATE_DATABASE catalog:DESCRIBE show schemas catalog:LIST_DATABASE catalog:LIST_DATABASE drop schema database:DROP catalog:LIST_DATABASE database:DESCRIBE database:DROP alter schema set location/owner database:ALTER catalog:LIST_DATABASE database:DESCRIBE database:ALTER desc schema database:LIST_DATABASE database:LIST_DATABASE database:DESCRIBE table create table database:CREATE_TABLE database:DESCRIBE database:CREATE_TABLE create table as select database:CREATE_TABLE 源表:SELECT(或列:SELECT) database:DESCRIBE database:CREATE_TABLE table:DESCRIBE(源表) table:select(源表) show create table table:DESCRIBE table:DESCRIBE table:select select from table table:SELECT(或column:SELECT) table:DESCRIBE table:SELECT(或column:SELECT) insert into table table:INSERT table:SELECT(或column:SELECT) table:DESCRIBE table:ALTER alter table table:ALTER table:DESCRIBE table:ALTER show tables database:LIST_TABLE catalog:LIST_DATABASE database:LIST_TABLE drop table table:DROP table:DESCRIBE table:DROP truncate table table:DELETE table:DESCRIBE desc table table:DESCRIBE catalog:LIST_DATABASE table:DESCRIBE comment table:ALTER table:DESCRIBE table:ALTER view create view database:CREATE_TABLE 源表:SELECT(或列:SELECT) database:CREATE_TABLE table:DESCRIBE(源表) table:select(源表) drop view table:DROP table:DESCRIBE table:DROP alter view table:ALTER table:DESCRIBE table:ALTER (table:SELECT) select from view table:DESCRIBE(源表和视图) table:select(源表和视图) table:DESCRIBE(源表和视图) table:select(源表和视图) show views database:LIST_TABLE catalog:LIST_DATABASE database:LIST_TABLE table:DESCRIBE show create view table:DESCRIBE table:DESCRIBE column show columns table:SELECT(或column:SELECT) catalog:LIST_DATABASE table:DESCRIBE table:SELECT(或column:SELECT) select [column] from table table:SELECT(或column:SELECT) table:DESCRIBE table:SELECT(或column:SELECT) stats show stats table:SELECT(或column:SELECT) table:DESCRIBE table:SELECT(或column:SELECT) analyze table:INSERT table:SELECT(或column:SELECT) table:DESCRIBE table:ALTER
  • LakeFormation SQL资源权限支持列表 DLI支持SQL资源鉴权的操作列表请参考数据权限列表。 LakeFormation SQL资源权限支持列表请参考表1。 表1 LakeFormation SQL资源权限支持列表 资源类型 权限类型 Database ALL ALTER DROP DESCRIBE LIST_TABLE LIST_FUNC CREATE_TABLE CREATE_FUNC Table/View ALL ALTER DROP DESCRIBE UPDATE INSERT SELECT DELETE Column SELECT Function ALL ALTER DROP DESCRIBE EXEC
  • Lakeformation权限策略(Spark) 表2 Lakeformation权限策略 类型 SQL语句 元数据IAM鉴权权限 SQL资源鉴权权限 DDL语句 ALTER DATABASE database:describe database:alter database:DESCRIBE database:ALTER ALTER TABLE database:describe table:describe table:alter database:create database:DESCRIBE table:DESCRIBE table:ALTER database:CREATE_TABLE column:SELECT或table:SELECT ALTER VIEW database:describe table:describe table:alter database:DESCRIBE table:DESCRIBE column:SELECT table:ALTER CREATE DATABASE database:describe database:create database:DESCRIBE catalog:CREATE_DATABASE CREATE OR REPLACE FUNCTION (CREATE) database:describe function:create database:DESCRIBE database:CREATE_FUNC CREATE OR REPLACE FUNCTION (REPLACE) database:describe function:describe function:alter database:CREATE_FUNC database:DESCRIBE function:DESCRIBE function:ALTER CREATE TABLE database:describe table:describe table:create database:DESCRIBE database:CREATE_TABLE CREATE VIEW database:describe table:describe table:drop table:create database:CREATE_TABLE table:DESCRIBE(source\target) table:DROP(target) column:SELECT DROP DATABASE database:describe database:drop database:DESCRIBE database:DROP DROP FUNCTION database:describe function:describe function:drop database:DESCRIBE function:DESCRIBE function:DROP DROP TABLE database:describe table:describe credential:describe table:drop database:DESCRIBE table:DESCRIBE table:DROP DROP VIEW database:describe table:describe table:drop database:DESCRIBE table:DESCRIBE(target\source) table:DROP(target) REPAIR TABLE database:describe table:describe credential:describe table:alter database:DESCRIBE table:DESCRIBE table:ALTER table:SELECT TRUNCATE TABLE database:describe table:describe table:alter database:DESCRIBE table:DESCRIBE table:SELECT table:UPDATE DML语句 INSERT TABLE database:describe table:describe table:alter credential:describe database:DESCRIBE table:DESCRIBE table:ALTER table:INSERT column:SELECT或table:SELECT LOAD DATA database:describe table:describe credential:describe database:DESCRIBE table:DESCRIBE table:UPDATE table:ALTER table:SELECT DR语句 SELECT database:describe table:describe credential:describe database:DESCRIBE table:DESCRIBE column:SELECT EXPLAIN 取决于执行sql 取决于执行sql Auxiliary 语句 ANALYZE TABLE database:describe table:describe credential:describe table:alter database:DESCRIBE table:DESCRIBE table:SELECT table:ALTER DESCRIBE DATABASE database:describe database:DESCRIBE DESCRIBE FUNCTION database:describe function:describe database:DESCRIBE function:DESCRIBE DESCRIBE QUERY database:describe table:describe database:DESCRIBE table:DESCRIBE table:SELECT DESCRIBE TABLE database:describe table:describe database:DESCRIBE table:DESCRIBE REFRESH TABLE database:describe table:describe credential:describe database:DESCRIBE table:DESCRIBE table:SELECT REFRESH FUNCTION database:describe function:describe database:DESCRIBE function:DESCRIBE SHOW COLUMNS database:describe table:describe database:DESCRIBE table:DESCRIBE SHOW CREATE TABLE database:describe table:describe database:DESCRIBE table:DESCRIBE SHOW DATABASES database:describe catalog:LIST_DATABASE database:DESCRIBE SHOW FUNCTIONS database:describe function:describe database:DESCRIBE SHOW PARTITIONS database:describe table:describe database:DESCRIBE table:DESCRIBE SHOW TABLE EXTENDED database:describe table:describe catalog:LIST_DATABASE database:DESCRIBE table:DESCRIBE database:LIST_TABLE SHOW TABLES database:describe table:describe catalog:LIST_DATABASE database:LIST_TABLE database:DESCRIBE SHOW TBLPROPERTIES database:describe table:describe database:DESCRIBE table:DESCRIBE SHOW VIEWS database:describe table:describe catalog:LIST_DATABASE database:LIST_TABLE database:DESCRIBE
  • 数据库和表的约束与限制 表1 DLI资源相关约束限制 限制项 说明 数据库 “default”为内置数据库,不能创建名为“default”的数据库。 DLI支持创建的数据库的最大数量为50个。 数据表 DLI支持创建的表的最大数量为5000个。 DLI支持创建表类型: Managed:数据存储位置为DLI的表。 External:数据存储位置为OBS的表。 View:视图,视图只能通过SQL语句创建。 跨源表:表类型同样为External。 创建DLI表时不支持指定存储路径。 数据导入 仅支持将OBS上的数据导入DLI或OBS中。 支持将OBS中CSV,Parquet,ORC,JSON和Avro格式的数据导入到在DLI中创建的表。 将CSV格式数据导入分区表,需在数据源中将分区列放在最后一列。 导入数据的编码格式仅支持UTF-8。 数据导出 只支持将DLI表(表类型为“Managed”)中的数据导出到OBS桶中,且导出的路径必须指定到文件夹级别。 导出文件格式为json格式,且文本格式仅支持UTF-8。 支持跨账号导出数据,即B账户对A账户授权后,A账户拥有B账户OBS桶的元数据信息和权限信息的读取权限,以及路径的读写权限,则A账户可将数据导出至B账户的OBS路径中。
  • DLI支持创建的表类型 DLI表 DLI表是存储在DLI数据湖中的数据表。支持多种数据格式,可以存储结构化、半结构化和非结构化数据。 DLI表的数据存储在DLI服务内部,查询性能更好,适用于对时延敏感类的业务,如交互类的查询等。 库表管理中表的列表页面,表类型为Managed的即代表DLI表。 OBS表 OBS表的数据存储在OBS上,适用于对时延不敏感的业务,如历史数据统计分析等。 OBS表通常以对象的形式存储数据,每个对象包含数据和相关的元数据。 库表管理中表的列表页面,表类型为External,存储位置为OBS路径的即代表OBS表。 视图表 视图表(View)是一种虚拟表,它不存储实际的数据,而是根据定义的查询逻辑动态生成数据。视图通常用于简化复杂的查询,或者为不同的用户或应用提供定制化的数据视图。 视图表可以基于一个或多个表创建,提供了一种灵活的方式来展示数据,而不影响底层数据的存储和组织。 库表管理中表的列表页面,表类型为View的即代表视图表。 View只能通过SQL语句进行创建,不能通过“创建表”页面进行创建。视图中包含的表或视图信息不可被更改,如有更改可能会造成查询失败。 跨源表 跨源表是指能够跨越多个数据源进行查询和分析的数据表。这种表可以整合来自不同数据源的数据,提供统一的数据视图。 跨源表常用于 数据仓库 和数据湖架构中,允许用户执行跨多个数据源的复杂查询。 库表管理中表的列表页面,表类型为External,存储位置非OBS路径的即代表跨源表。
提示

您即将访问非华为云网站,请注意账号财产安全