步骤5:在
DLI 作业开发时使用LakeFormation元数据 DLI对接LakeFormation默认实例且完成LakeFormation的资源授权后,即可以在作业开发时使用LakeFormation元数据。 DLI SQL: LakeFormation SQL语法说明请参考DLI Spark SQL语法参考。 在执行SQL作业时,您可以在控制台选择执行SQL所在的catalog,如图2所示,或在SQL命令中指定catalogName。catalogName是DLI控制台的数据目录映射名。 图2 在SQL编辑器页面选择数据目录 对接LakeFormation实例场景,在创建数据库时需要指定数据库存储的OBS路径。 对接LakeFormation实例场景,在创建表时不支持设置表生命周期和多版本。 对接LakeFormation实例场景,LOAD DATA语句不支持datasource表,且LOAD DATA分区表必须指定分区。 在LakeFormation控制台创建的数据库和表中包含中文字符时,不支持在DLI执行相关数据库和表的操作。 对接LakeFormation实例场景,不支持指定筛选条件删除分区。 对接LakeFormation实例场景,不支持创建Truncate Datasource/Hive外表。 DLI暂不支持使用LakeFormation行过滤条件功能。 DLI读取binary类型的数据进行console展示时,会对binary数据进行Base64转换。 在DLI暂不支持LakeFormation的路径授权。 DLI Spark Jar: 本节介绍在DLI管理控制台提交Spark Jar作业时使用LakeFormation元数据的配置操作。 Spark Jar 示例 SparkSession spark = SparkSession.builder()
.enableHiveSupport()
.appName("java_spark_demo")
.getOrCreate();
spark.sql("show databases").show(); DLI管理控制台Spark Jar作业配置说明 (推荐)方式一:使用控制台提供的参数项(委托、元数据来源等)配置Spark Jar作业访问LakeFormation元数据 新建或编辑Spark Jar作业时,请参考表3Spark Jar作业访问LakeFormation元数据。 表3 配置Spark Jar作业访问LakeFormation元数据 参数 说明 配置示例 Spark版本 Spark 3.3.x及以上版本支持对接LakeFormation。 3.3.1 委托 使用Spark 3.3.1及以上版本的引擎执行作业时,需要您先在
IAM 页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: spark.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 访问元数据 配置开启Spark作业访问元数据功能。 是 元数据来源 配置Spark作业访问的元数据类型。本场景下请选择Lakeformation。 选择该参数后系统将自动为您的作业添加以下配置项用于加载lakeformation相关依赖。 spark.sql.catalogImplementation=hive
spark.hadoop.hive-ext.dlcatalog.metastore.client.enable=true
spark.hadoop.hive-ext.dlcatalog.metastore.session.client.class=com.huawei.cloud.dalf.lakecat.client.hiveclient.LakeCatMetaStoreClient
og
// lakeformation相关依赖加载
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/* “元数据来源”还支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。 优先推荐您使用控制台提供的“元数据来源”参数项进行配置。 Lakeformation 数据目录名称 配置Spark作业访问的数据目录名称。 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。如需指定LakeFormation其他实例请参考◦方式二:使用Spark(--conf)参数配置...在Spark(--conf)中配置连接的Lakeformation实例和数据目录。 选择该参数后系统将自动为您的作业添加以下配置项用于连接Lakeformation默认实例下的数据目录。 spark.hadoop.lakecat.catalogname.default=lfcatalog “数据目录名称”还支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。 优先推荐您使用控制台提供的“数据目录名称”参数项进行配置。 - Spark参数(--conf) “元数据来源”和“数据目录名称”均支持在Spark(--conf)参数中配置,且系统优先以Spark(--conf)中配置信息为准。 如果您需要配置访问Hudi数据表,可在Spark(--conf)参数中填加以下配置项。 spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.hadoop.hoodie.write.lock.provider=org.apache.hudi.lakeformation.LakeCatMetastoreBasedLockProvider 如果您需要配置访问Delta数据表,可在Spark(--conf)参数中填加以下配置项。 spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension - 方式二:使用Spark(--conf)参数配置Spark Jar作业访问LakeFormation元数据 新建或编辑Spark Jar作业时,请在作业配置页面的Spark(--conf)参数中按需配置以下信息以访问LakeFormation元数据。 spark.sql.catalogImplementation=hive
spark.hadoop.hive-ext.dlcatalog.metastore.client.enable=true
spark.hadoop.hive-ext.dlcatalog.metastore.session.client.class=com.huawei.cloud.dalf.lakecat.client.hiveclient.LakeCatMetaStoreClient
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension //支持hudi,可选
spark.hadoop.hoodie.write.lock.provider=org.apache.hudi.lakeformation.LakeCatMetastoreBasedLockProvider //支持hudi,可选
// 使用有OBS和lakeformation权限的委托访问,建议用户设置最小权限集
spark.dli.job.agency.name=agencyForLakeformation
//需要访问的lakeformation实例ID,在lakeformation console查看。可选,如不填写访问Lakeformation的默认实例
spark.hadoop.lakeformation.instance.id=xxx
//需要访问的lakeformation侧的CATA
LOG 名称,在lakeformation console查看。可选,如不填写则默认值为hive
spark.hadoop.lakecat.catalogname.default=lfcatalog
// lakeformation相关依赖加载
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/lakeformation/* DLI Flink OpenSource SQL 示例1:委托的方式对接Lakeformation 创建Flink OpenSource SQL作业并配置如下参数: 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 开启checkpoint 勾选开启checkpoint。 开启 自定义参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例中关于Catalog的参数说明请参考表4 表4 Flink OpenSource SQL示例中关于Catalog的参数说明 参数 说明 是否必填 参数值 type catalog类型 是 固定值hive hive-conf-dir hive-conf路径,固定值/opt/flink/conf 是 固定值/opt/flink/conf default-database 默认数据库名称 否 默认default库 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 CREATE CATALOG hive
WITH
(
'type' = 'hive',
'hive-conf-dir' = '/opt/flink/conf', -- 固定配置/opt/flink/conf
'default-database'='default'
);
USE CATALOG hive;
CREATE TABLE IF NOT EXISTS
dataGenSource612 (user_id string, amount int)
WITH
(
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.user_id.kind' = 'random',
'fields.user_id.length' = '3'
);
CREATE table IF NOT EXISTS
printSink612 (user_id string, amount int)
WITH
('connector' = 'print');
INSERT INTO
printSink612
SELECT
*
FROM
dataGenSource612;
示例2:DEW的方式对接Lakeformation 创建Flink OpenSource SQL作业并配置如下参数: 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 开启checkpoint 勾选开启checkpoint。 开启 自定义参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例中关于Catalog的参数说明请参考表5 需要指定properties.catalog.lakeformation.auth.identity.util.class参数值为com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator,并且配置dew相关配置。 表5 Flink OpenSource SQL示例中关于Catalog的参数说明(DEW方式) 参数 说明 是否必填 参数值 type catalog类型 是 固定值hive hive-conf-dir hive-conf路径,固定值/opt/flink/conf 是 固定值/opt/flink/conf default-database 默认数据库名称 否 不填默认default库 properties.catalog.lakecat.auth.identity.util.class 认证信息获取类 是 dew方式必填,固定配置为com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator properties.catalog.dew.projectId DEW所在的项目ID, 默认是Flink作业所在的项目ID。 是 使用dew方式必填 properties.catalog.dew.endpoint 指定要使用的DEW服务所在的endpoint信息。 是 使用dew方式必填。 配置示例:kms.xxx.com properties.catalog.dew.csms.secretName 在DEW服务的凭据管理中新建的通用凭据的名称。 是 使用dew方式必填 properties.catalog.dew.csms.version 在DEW服务的凭据管理中新建的通用凭据的版本号。 是 使用dew方式必填 properties.catalog.dew.access.key 在DEW服务的凭据中配置access.key值对应的key 是 使用dew方式必填 properties.catalog.dew.secret.key 在DEW服务的凭据中配置secret.key值对应的key 是 使用dew方式必填 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 CREATE CATALOG myhive
WITH
(
'type' = 'hive',
'hive-conf-dir' = '/opt/flink/conf',
'default-database'='default',
--下边是dew相关配置,请根据实际情况修改参数值
'properties.catalog.lakeformation.auth.identity.util.class' = 'com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator',
'properties.catalog.dew.endpoint'='kms.xxx.com',
'properties.catalog.dew.csms.secretName'='obsAksK',
'properties.catalog.dew.access.key' = 'myak',
'properties.catalog.dew.secret.key' = 'mysk',
'properties.catalog.dew.projectId'='330e068af1334c9782f4226xxxxxxxxx',
'properties.catalog.dew.csms.version'='v9'
);
USE CATALOG myhive;
create table IF NOT EXISTS dataGenSource_dew612(
user_id string,
amount int
) with (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.user_id.kind' = 'random',
'fields.user_id.length' = '3'
);
create table IF NOT EXISTS printSink_dew612(
user_id string,
amount int
) with (
'connector' = 'print'
);
insert into printSink_dew612 select * from dataGenSource_dew612;
示例3:委托的方式对接Lakeformation写hudi表 创建Flink OpenSource SQL作业并配置如下参数: 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 开启checkpoint 勾选开启checkpoint。 开启 自定义参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例中关于Catalog的参数说明请参考表6。 表6 hudi类型Catalog参数说明 参数 说明 是否必填 参数值 type catalog类型 是 hudi表配置为hudi。 hive-conf-dir hive-conf路径,固定值/opt/flink/conf 是 固定值/opt/flink/conf。 default-database 默认数据库名称 否 默认default库。 mode 取值'hms' 或 'non-hms'。 'hms' 表示创建的 Hudi Catalog 会使用 Hive Metastore 存储元数据信息。 'non-hms'表示不使用Hive Metastore存储元数据信息。 是 固定值hms。 表7 hudi类型sink表的connector参数 参数 说明 是否必填 参数值 connector flink connector类型。 配置为hudi表示sink表是hudi表。 是 hudi path 表的基本路径。如果该路径不存在,则会创建它。 是 请参考示例代码中的配置值。 hoodie.datasource.write.recordkey.field hoodie表的唯一键字段名 否 这里配置order_id为唯一键。 EXTERNAL 是否外表 是 hudi表必填,且设置为true true CREATE CATALOG hive_catalog
WITH (
'type'='hive',
'hive-conf-dir' = '/opt/flink/conf',
'default-database'='test'
);
USE CATALOG hive_catalog;
create table if not exists genSource618 (
order_id STRING,
order_name STRING,
price INT,
weight INT
) with (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.order_id.kind' = 'random',
'fields.order_id.length' = '8',
'fields.order_name.kind' = 'random',
'fields.order_name.length' = '5'
);
CREATE CATALOG hoodie_catalog
WITH (
'type'='hudi',
'hive.conf.dir' = '/opt/flink/conf',
'mode'='hms' -- supports 'dfs' mode that uses the DFS backend for table DDLs persistence
);
CREATE TABLE if not exists hoodie_catalog.`test`.`hudiSink618` (
`order_id` STRING PRIMARY KEY NOT ENFORCED,
`order_name` STRING,
`price` INT,
`weight` INT,
`create_time` BIGINT,
`create_date` String
) PARTITIONED BY (create_date) WITH (
'connector' = 'hudi',
'path' = 'obs://xxx/catalog/dbtest3/hudiSink618',
'hoodie.datasource.write.recordkey.field' = 'order_id',
'write.precombine.field' = 'create_time',
'EXTERNAL' = 'true' -- must be set
);
insert into hoodie_catalog.`test`.`hudiSink618`
select
order_id,
order_name,
price,
weight,
UNIX_TIMESTAMP() as create_time,
FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyyMMdd') as create_date
from genSource618; DLI Flink Jar 示例1:委托方式对接Lakeformation 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录 示例代码如下: 本例通过DataGen表产生随机数据并输出到Print结果表中。 其他connector类型可参考Flink 1.15支持的connector列表。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73 package com.huawei.test;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
@SuppressWarnings({"deprecation", "rawtypes", "unchecked"})
public class GenToPrintTaskAgency {
private static final Logger LOGGER = LoggerFactory.getLogger(GenToPrintTaskAgency.class);
private static final String datePattern = "yyyy-MM-dd_HH-mm-ss";
public static void main(String[] args) {
LOGGER.info("Start task.");
ParameterTool paraTool = ParameterTool.fromArgs(args);
String checkpointInterval = "180000000";
// set up execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval));
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern);
String time = dateTimeFormat.format(System.currentTimeMillis());
RocksDBStateBackend rocksDbBackend =
new RocksDBStateBackend(
new FsStateBackend("obs://obs/xxx/testcheckpoint/" + time), true);
env.setStateBackend(rocksDbBackend);
String createCatalog = "CREATE CATALOG lf_catalog WITH (\n" +
" 'type' = 'hive',\n" +
" 'hive-conf-dir' = '/opt/hadoop/conf'\n" +
" );";
tEnv.executeSql(createCatalog);
String dataSource = "CREATE TABLE if not exists lf_catalog.`testdb`.`dataGenSourceJar618_1` (\n" +
" user_id string,\n" +
" amount int\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second' = '1',\n" +
" 'fields.user_id.kind' = 'random',\n" +
" 'fields.user_id.length' = '3'\n" +
")";
/*testdb是用户自定义的数数据库*/
tEnv.executeSql(dataSource);
String printSink = "CREATE TABLE if not exists lf_catalog.`testdb`.`printSinkJar618_1` (\n" +
" user_id string,\n" +
" amount int\n" +
") WITH ('connector' = 'print')";
tEnv.executeSql(printSink);
/*testdb是用户自定义的数数据库*/
String query = "insert into lf_catalog.`test`.`printSinkJar618_1` " +
"select * from lf_catalog.`test`.`dataGenSourceJar618_1`";
tEnv.executeSql(query);
}
}
创建Flink jar作业并配置如下参数。 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 优化参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例2:DEW方式对接Lakeformation 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录 示例代码如下: 本例通过DataGen表产生随机数据并输出到Print结果表中。 其他connector类型可参考Flink 1.15支持的connector列表。 package com.huawei.test;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.SimpleDateFormat;
@SuppressWarnings({"deprecation", "rawtypes", "unchecked"})
public class GenToPrintTaskDew {
private static final Logger LOGGER = LoggerFactory.getLogger(GenToPrintTaskAgency.class);
private static final String datePattern = "yyyy-MM-dd_HH-mm-ss";
public static void main(String[] args) {
LOGGER.info("Start task.");
ParameterTool paraTool = ParameterTool.fromArgs(args);
String checkpointInterval = "180000000";
// set up execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval));
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern);
String time = dateTimeFormat.format(System.currentTimeMillis());
RocksDBStateBackend rocksDbBackend =
new RocksDBStateBackend(
new FsStateBackend("obs://obs/xxx/testcheckpoint/" + time), true);
env.setStateBackend(rocksDbBackend);
String createCatalog = "CREATE CATALOG lf_catalog WITH (\n" +
" 'type' = 'hive',\n" +
" 'hive-conf-dir' = '/opt/hadoop/conf',\n" +
" 'properties.catalog.lakeformation.auth.identity.util.class' = 'com.huawei.flink.provider.lakeformation.FlinkDewIdentityGenerator',\n" +
" 'properties.catalog.dew.endpoint'='kms.xxx.xxx.com',\n" +
" 'properties.catalog.dew.csms.secretName'='obsAksK',\n" +
" 'properties.catalog.dew.access.key' = 'ak',\n" +
" 'properties.catalog.dew.secret.key' = 'sk',\n" +
" 'properties.catalog.dew.projectId'='330e068af1334c9782f4226xxxxxxxxxx',\n" +
" 'properties.catalog.dew.csms.version'='v9'\n" +
" );";
tEnv.executeSql(createCatalog);
String dataSource = "CREATE TABLE if not exists lf_catalog.`testdb`.`dataGenSourceJarDew618_1` (\n" +
" user_id string,\n" +
" amount int\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second' = '1',\n" +
" 'fields.user_id.kind' = 'random',\n" +
" 'fields.user_id.length' = '3'\n" +
")";
tEnv.executeSql(dataSource);
/*testdb是用户自定义的数数据库*/
String printSink = "CREATE TABLE if not exists lf_catalog.`testdb`.`printSinkJarDew618_1` (\n" +
" user_id string,\n" +
" amount int\n" +
") WITH ('connector' = 'print')";
tEnv.executeSql(printSink);
/*testdb是用户自定义的数数据库*/
String query = "insert into lf_catalog.`test`.`printSinkJarDew618_1` " +
"select * from lf_catalog.`test`.`dataGenSourceJarDew618_1`";
tEnv.executeSql(query);
}
} 创建Flink jar作业并配置如下参数。 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 优化参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 - 示例3:Flink jar支持Hudi表 开发Flink jar程序,编译并上传jar包到obs,本例上传到obs://obs-test/dlitest/目录 示例代码如下: 本例通过DataGen表产生随机数据并输出到Hudi结果表中。 其他connector类型可参考Flink 1.15支持的connector列表。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91 package com.huawei.test;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.SimpleDateFormat;
public class GenToHudiTask4 {
private static final Logger LOGGER = LoggerFactory.getLogger(GenToHudiTask4.class);
private static final String datePattern = "yyyy-MM-dd_HH-mm-ss";
public static void main(String[] args) throws IOException {
LOGGER.info("Start task.");
ParameterTool paraTool = ParameterTool.fromArgs(args);
String checkpointInterval = "30000";
// set up execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointInterval(Long.valueOf(checkpointInterval));
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
SimpleDateFormat dateTimeFormat = new SimpleDateFormat(datePattern);
String time = dateTimeFormat.format(System.currentTimeMillis());
RocksDBStateBackend rocksDbBackend =
new RocksDBStateBackend(
new FsStateBackend("obs://xxx/jobs/testcheckpoint/" + time), true);
env.setStateBackend(rocksDbBackend);
String catalog = "CREATE CATALOG hoodie_catalog\n" +
" WITH (\n" +
" 'type'='hudi',\n" +
" 'hive.conf.dir' = '/opt/hadoop/conf',\n" +
" 'mode'='hms'\n" +
" )";
tEnv.executeSql(catalog);
String dwsSource = "CREATE TABLE if not exists genSourceJarForHudi618_1 (\n" +
" order_id STRING,\n" +
" order_name STRING,\n" +
" price INT,\n" +
" weight INT\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second' = '1',\n" +
" 'fields.order_id.kind' = 'random',\n" +
" 'fields.order_id.length' = '8',\n" +
" 'fields.order_name.kind' = 'random',\n" +
" 'fields.order_name.length' = '8'\n" +
")";
tEnv.executeSql(dwsSource);
/*testdb是用户自定义的数数据库*/
String printSinkdws =
"CREATE TABLE if not exists hoodie_catalog.`testdb`.`hudiSinkJarHudi618_1` (\n" +
" order_id STRING PRIMARY KEY NOT ENFORCED,\n" +
" order_name STRING,\n" +
" price INT,\n" +
" weight INT,\n" +
" create_time BIGINT,\n" +
" create_date String\n" +
") WITH (" +
"'connector' = 'hudi',\n" +
"'path' = 'obs://xxx/catalog/dbtest3/hudiSinkJarHudi618_1',\n" +
"'hoodie.datasource.write.recordkey.field' = 'order_id',\n" +
"'EXTERNAL' = 'true'\n" +
")";
tEnv.executeSql(printSinkdws);
/*testdb是用户自定义的数数据库*/
String query = "insert into hoodie_catalog.`testdb`.`hudiSinkJarHudi618_1` select\n" +
" order_id,\n" +
" order_name,\n" +
" price,\n" +
" weight,\n" +
" UNIX_TIMESTAMP() as create_time,\n" +
" FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyyMMdd') as create_date\n" +
" from genSourceJarForHudi618_1";
tEnv.executeSql(query);
}
}
表8 hudi类型sink表的connector参数 参数 说明 是否必填 参数值 connector flink connector类型。 配置为hudi表示sink表是hudi表。 是 hudi path 表的基本路径。如果该路径不存在,则会创建它。 是 请参考示例代码中的配置值。 hoodie.datasource.write.recordkey.field hoodie表的唯一键字段名 否 这里配置order_id为唯一键。 EXTERNAL 是否外表 是 hudi表必填,且设置为true true 创建Flink jar作业并配置如下参数。 参数 说明 配置示例 Flink版本 Flink 1.15及以上版本支持对接LakeFormation。 1.15 委托 使用Flink 1.15及以上版本的引擎执行作业时,需要您先在IAM页面创建相关委托,并在此处添加新建的委托信息。选择该参数后系统将自动为您的作业添加以下配置: flink.dli.job.agency.name=agency 委托权限示例请参考创建DLI自定义委托权限和常见场景的委托权限策略。 - 优化参数 配置Flink作业访问的元数据类型。 本场景下请选择Lakeformation。 flink.dli.job.catalog.type=lakeformation 配置Flink作业访问的数据目录名称。 flink.dli.job.catalog.name=[lakeformation中的catalog名称] 此处选择的是在DLI管理控制台创建的数据目录,即DLI与Lakeformation默认实例下的数据目录的映射,该数据目录连接的是LakeFormation默认实例下的数据目录。 -