云服务器内容精选

  • 常见问题 Q:作业运行失败,运行日志中有如下报错信息,应该怎么解决? java.lang.NoSuchMethodError: org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Z)V A:该问题是因为所选择的huaweicloud-dis-flink-connector_2.11版本过低导致,请选择2.0.1及以上版本。 Q:运行作业读取DIS数据时,无法读出数据且Taskmanager的运行日志中有如下报错信息,应该怎么解决? ERROR com.huaweicloud.dis.adapter.common.consumer.Coordinator [] - Failed to getCheckpointAsync, error : [400 : {"errorCode":"DIS.4332","message":"app not found. "}], request : [{"stream_name":"xx","partition_id":"shardId-0000000000","checkpoint_type":"LAST_READ","app_name":"xx"}] A: 该问题是因为读取DIS数据所使用的group.id在DIS的Apps中并没有提前创建。
  • 环境准备 已在DLI控制台购买了通用队列。 已购买了DIS通道。开通DIS通道。 用户在使用Flink 1.12版本,则依赖的Dis connector版本需要不低于2.0.1,详细代码参考DISFlinkConnector相关依赖,如何配置connector,详细参考自定义Flink Streaming作业。 若读取DIS,且配置groupId,则需要提前在DIS的“App管理”中创建所需的App名称。 请勿将disToDis.properties放在生成的jar包中,在代码里有关于disToDis.properties的路径,如果放在jar包中,代码会找不到disToDis.properties路径。
  • 配置节点调度任务(实时作业) 配置实时处理作业的节点调度任务,支持单次调度、周期调度、事件驱动调度三种方式。操作方法如下: 单击画布中的节点,在右侧显示“调度配置”页签,单击此页签,展开配置页面,配置如表4所示的参数。 表4 节点调度配置 参数 说明 调度方式 选择作业的调度方式: 单次调度:手动触发作业单次运行。 周期调度:周期性自动运行作业。 事件驱动调度:根据外部条件触发作业运行。 “周期调度”的参数 生效时间 调度任务的生效时间段。 系统支持生效时间可以快速选到今天和明天。单击生效时间的时间框,在时间框界面单击“今天”或“明天”,可以快速选择当前日期。 调度周期 选择调度任务的执行周期,并配置相关参数: 分钟 小时 系统支持按间隔小时和离散小时配置调度周期,离散小时调度仅支持自然周期调度。 天 周 月 同时系统支持可以选择“每月最后一天”进行业务调度。 调度周期需要合理设置,如CDM、ETL作业的调度周期至少应在5分钟以上,并根据作业表的数据量、源端表更新频次等调整。 已经在运行中的作业,可以修改其调度周期。 跨周期依赖 选择作业下实例之间的依赖关系。 不依赖上一调度周期 自依赖(等待上一调度周期结束才能继续运行) 跳过等待的实例,运行最近的批次。 “事件驱动调度”的参数 触发事件类型 选择触发作业运行的事件类型。 DIS通道名称 选择DIS通道,当指定的DIS通道有新消息时,数据开发模块将新消息传递给作业,触发该作业运行。 连接名称 选择数据连接,需先在“管理中心”创建kafka数据连接。 Topic 选择需要发往kafka的消息Topic。 消费组 消费者组是kafka提供的可扩展且具有容错性的消费者机制。 它是一个组,所以内部可以有多个消费者,这些消费者共用一个ID,一个组内的所有消费者共同协作,完成对订阅的主题的所有分区进行消费。其中一个主题中的一个分区只能由一个消费者消费。 说明: 一个消费者组可以有多个消费者。 Group ID是一个字符串,在一个kafka集群中,它标识唯一的一个消费者组。 每个消费者组订阅的所有主题中,每个主题的每个分区只能由一个消费者消费。消费者组之间不影响。 当触发事件类型选择了DIS或KAFKA时,会自动关联出消费组的ID,用户也可以手动修改。 事件处理并发数 选择作业并行处理的数量,最大并发数为10。 事件检测间隔 配置时间间隔,检测DIS通道下是否有新的消息。时间间隔单位可以配置为秒或分钟。 失败策略 选择节点执行失败后的策略: 挂起 忽略失败,继续调度
  • 约束限制 调度周期需要合理设置,单个作业最多允许5个实例并行执行,如果作业实际执行时间大于作业配置的调度周期,会导致后面批次的作业实例堆积,从而出现计划时间和开始时间相差大。例如CDM、ETL作业的调度周期至少应在5分钟以上,并根据作业表的数据量、源端表更新频次等调整。 如果通过DataArts Studio数据开发调度CDM迁移作业,CDM迁移作业处也配置了定时任务,则两种调度均会生效。为了业务运行逻辑统一和避免调度冲突,推荐您启用数据开发调度即可,无需配置CDM定时任务。
  • 相关操作 怎样设置作业的参数? 在FLink作业列表中选择待编辑的作业。 单击操作列“编辑”。 在参数区域输入参数信息。 指定类的参数列表,参数之间使用空格分隔。 参数输入格式:--key1 value1 --key2 value2 例如:控制台入输入的参数 --bootstrap.server 192.168.168.xxx:9092 通过ParameterTool解析后的参数如下所示: 图5 解析后的参数 怎样查看作业日志? 在FLink作业列表中点击作业名称,进入作业详情页面。 单击“运行日志”,即可在控制台查看作业日志。 此处只展示最新的运行日志,更多信息请查看保存日志的OBS桶。
  • 环境准备 登录MRS管理控制台,创建MRS集群,选择“开启kerberos”,勾选“Kafka”, “HBase”, “HDFS”等。请参见《MapReduce服务用户指南》的“购买自定义集群”的章节创建MRS。 “安全组规则”开通对应UDP/TCP端口。详细内容请参考《私有云用户指南》中的“添加安全组规则”章节。 进入MRS manager管理界面: 创建机机账号,需确保该用户含有“hdfs_admin”, “hbase_admin”权限,下载该用户认证凭据,其中包含“user.keytab” 和 “krb5.conf” 文件。 由于人机账号的keytab会随用户密码过期而失效,故建议使用机机账号进行配置。 单击“服务管理”,下载客户端,单击“确定”。 在MRS节点上下载配置文件,所需集群配置文件包含“hbase-site.xml”和“hiveclient.properties”。 创建弹性资源池和队列。 弹性资源池与队列为DLI作业提供计算资源,创建弹性资源池,弹性资源池添加队列。 使用该DLI独享队列与MRS集群建立增强型跨源连接,且用户可以根据实际所需设置相应安全组规则。 如何建立增强型跨源连接,请参考《数据湖探索用户指南》中增强型跨源连接章节。 如何设置安全组规则,请参见《虚拟私有云用户指南》中“安全组”章节。 获取MRS集群全部节点的ip和域名映射,在DLI跨源连接修改主机信息中配置host映射。 如何添加IP域名映射,请参见《数据湖探索用户指南》中“修改主机信息”章节。 Kafka服务端的端口如果监听在hostname上,则需要将Kafka Broker节点的hostname和IP的对应关系添加到DLI队列中。Kafka Broker节点的hostname和IP请联系Kafka服务的部署人员。
  • 步骤8:查看作业运行结果 在Spark作业管理界面显示已提交的作业运行状态。初始状态显示为“启动中”。 如果作业运行成功则作业状态显示为“已成功”,通过以下操作查看创建的数据库和表。 可以在DLI控制台,左侧导航栏,单击“SQL编辑器”。在“数据库”中已显示创建的数据库“test_sparkapp”。 图14 查看创建的数据库 双击数据库名,可以在数据库下查看已创建成功的DLI和OBS表。 图15 查看表 双击DLI表名dli_testtable,单击“执行”查询DLI表数据。 图16 查询DLI表数据 注释掉DLI表查询语句,双击OBS表名dli_testobstable,单击“执行”查询OBS表数据。 图17 查询OBS表数据 如果作业运行失败则作业状态显示为“已失败”,单击“操作”列“更多”下的“Driver日志”,显示当前作业运行的日志,分析报错原因。 图18 查看Driver日志 原因定位解决后,可以在作业“操作”列,单击“编辑”,修改作业相关参数后,单击“执行”重新运行该作业即可。
  • Java样例代码 本示例操作步骤采用Java进行编码,具体完整的样例代码参考如下: package com.huawei.dli.demo; import org.apache.spark.sql.SparkSession; public class DliCatalogTest { public static void main(String[] args) { SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate(); spark.sql("create database if not exists test_sparkapp").collect(); spark.sql("drop table if exists test_sparkapp.dli_testtable").collect(); spark.sql("create table test_sparkapp.dli_testtable(id INT, name STRING)").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (123,'jason')").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (456,'merry')").collect(); spark.sql("drop table if exists test_sparkapp.dli_testobstable").collect(); spark.sql("create table test_sparkapp.dli_testobstable(age INT, name STRING) using csv options (path 'obs://dli-test-obs01/testdata.csv')").collect(); spark.stop(); } }
  • scala样例代码 object DliCatalogTest { def main(args:Array[String]): Unit = { val sql = args(0) val runDdl = Try(args(1).toBoolean).getOrElse(true) System.out.println(s"sql is $sql runDdl is $runDdl") val sparkConf = new SparkConf(true) sparkConf .set("spark.sql.session.state.builder","org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .set("spark.sql.catalog.class","org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") sparkConf.setAppName("dlicatalogtester") val spark = SparkSession.builder .config(sparkConf) .enableHiveSupport() .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("SparkTest") .getOrCreate() System.out.println("catalog is " + spark.sessionState.catalog.toString) if (runDdl) { val df = spark.sql(sql).collect() } else { spark.sql(sql).show() } spark.close() } }
  • Python样例代码 #!/usr/bin/python # -*- coding: UTF-8 -*- from __future__ import print_function import sys from pyspark.sql import SparkSession if __name__ == "__main__": url = sys.argv[1] creatTbl = "CREATE TABLE test_sparkapp.dli_rds USING JDBC OPTIONS ('url'='jdbc:mysql://%s'," \ "'driver'='com.mysql.jdbc.Driver','dbtable'='test.test'," \ " 'passwdauth' = 'DatasourceRDSTest_pwd','encryption' = 'true')" % url spark = SparkSession \ .builder \ .enableHiveSupport() \ .config("spark.sql.session.state.builder","org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") \ .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") \ .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") \ .appName("python Spark test catalog") \ .getOrCreate() spark.sql("CREATE database if not exists test_sparkapp").collect() spark.sql("drop table if exists test_sparkapp.dli_rds").collect() spark.sql(creatTbl).collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("insert into table test_sparkapp.dli_rds select 12,'aaa'").collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("insert overwrite table test_sparkapp.dli_rds select 1111,'asasasa'").collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("drop table test_sparkapp.dli_rds").collect() spark.stop()
  • 开发流程 DLI进行Spark作业访问DLI元数据开发流程参考如下: 图1 Spark作业访问DLI元数据开发流程 表2 开发流程说明 序号 阶段 操作界面 说明 1 创建DLI通用队列 DLI控制台 创建作业运行的DLI队列。 2 OBS桶文件配置 OBS控制台 如果是创建OBS表,则需要上传文件数据到OBS桶下。 配置Spark创建表的元数据信息的存储路径。该文件夹路径用来存储Spark创建表的元数据信息“spark.sql.warehouse.dir”。 3 新建Maven工程,配置pom文件 IntelliJ IDEA 参考样例代码说明,编写程序代码创建DLI表或OBS表。 4 编写程序代码 5 调试,编译代码并导出Jar包 6 上传Jar包到OBS和DLI OBS控制台 将生成的Spark Jar包文件上传到OBS目录下和DLI程序包中。 7 创建Spark Jar作业 DLI控制台 在DLI控制台创建Spark Jar作业并提交运行作业。 8 查看作业运行结果 DLI控制台 查看作业运行状态和作业运行日志。
  • 步骤2:OBS桶文件配置 如果需要创建OBS表,则需要先上传数据到OBS桶目录下。 本次演示的样例代码创建了OBS表,测试数据内容参考如下示例,创建名为的testdata.csv文件。 12,Michael 27,Andy 30,Justin 进入OBS管理控制台,在“桶列表”下,单击已创建的OBS桶名称,本示例桶名为“dli-test-obs01”,进入“概览”页面。 单击左侧列表中的“对象”,选择“上传对象”,将testdata.csv文件上传到OBS桶根目录下。 在OBS桶根目录下,单击“新建文件夹”,创建名为“warehousepath”的文件夹。该文件夹路径用来存储Spark创建表的元数据信息“spark.sql.warehouse.dir”。
  • 环境准备 在进行Spark 作业访问DLI元数据开发前,请准备以下开发环境。 表1 Spark Jar作业开发环境 准备项 说明 操作系统 Windows系统,支持Windows7以上版本。 安装JDK JDK使用1.8版本。 安装和配置IntelliJ IDEA IntelliJ IDEA为进行应用开发的工具,版本要求使用2019.1或其他兼容版本。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。
  • 步骤4:编写代码 编写DliCatalogTest程序创建数据库、DLI表和OBS表。 完整的样例请参考Java样例代码,样例代码分段说明如下: 导入依赖的包。 import org.apache.spark.sql.SparkSession; 创建SparkSession会话。 创建SparkSession会话时需要指定Spark参数:"spark.sql.session.state.builder"、"spark.sql.catalog.class"和"spark.sql.extensions",按照样例配置即可。 Spark2.3.x版本 SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate(); Spark2.4.x版本 SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .config("spark.sql.hive.implementation","org.apache.spark.sql.hive.client.DliHiveClientImpl") .appName("java_spark_demo") .getOrCreate(); Spark3.1.x版本 SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate(); Spark3.3.x版本 SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.DliLakeHouseBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.DliLakeHouseCatalog") .appName("java_spark_demo") .getOrCreate(); 创建数据库。 如下样例代码演示,创建名为test_sparkapp的数据库。 spark.sql("create database if not exists test_sparkapp").collect(); 创建DLI表并插入测试数据。 spark.sql("drop table if exists test_sparkapp.dli_testtable").collect(); spark.sql("create table test_sparkapp.dli_testtable(id INT, name STRING)").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (123,'jason')").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (456,'merry')").collect(); 创建OBS表。如下示例中的OBS路径需要根据步骤2:OBS桶文件配置中的实际数据路径修改。 spark.sql("drop table if exists test_sparkapp.dli_testobstable").collect(); spark.sql("create table test_sparkapp.dli_testobstable(age INT, name STRING) using csv options (path 'obs://dli-test-obs01/testdata.csv')").collect(); 关闭SparkSession会话spark。 spark.stop();
  • 步骤5:调试、编译代码并导出Jar包 双击IntelliJ IDEA工具右侧的“Maven”,参考下图分别双击“clean”、“compile”对代码进行编译。 编译成功后,双击“package”对代码进行打包。 图10 编译打包 打包成功后,生成的Jar包会放到target目录下,以备后用。本示例将会生成到:“D:\DLITest\SparkJarMetadata\target”下名为“SparkJarMetadata-1.0-SNAPSHOT.jar”。 图11 导出jar包