云服务器内容精选

  • 修订记录 发布日期 修订记录 2024-05-29 使用Spark作业访问DLI元数据,补充说明不同Spark版本访问元数据的示例代码。 2024-05-17 修改: 修改Flink Jar作业开发基础样例示例。 2024-04-07 新增: 使用Spark Jar作业读取和查询OBS数据,修改“上传Jar包到OBS和DLI下”的操作步骤。 2024-03-30 新增: Flink作业委托场景开发指导。 Spark作业委托场景开发指导 2023-12-08 下线“使用Beeline提交Spark SQL作业”,推荐使用使用JDBC提交Spark SQL作业。 2023-11-09 修改 pyspark样例代码示例代码对3.1.1版本的相关说明。 2023-09-18 DLI从Spark 3.x版本开始不支持内置地理空间查询函数,下线“在Spark SQL作业中使用地理空间函数”。 2023-07-21 下线“使用ODBC提交Spark SQL作业”,您可以参考“ FineBI工具对接DLI Trino”、“PowerBI工具对接DLI Trino”对接DLI Trino。 2023-06-07 Flink Jar作业开发基础样例,补充传参和查看日志的操作指导。 2023-03-09 手册结构调整,按DLI作业类型拆分手册章节。 新增使用Flink Jar读写DIS开发指南。 2023-03-03 使用Spark作业跨源访问数据源,修改对Module的说明。 2022-10-28 使用Spark作业访问DLI元数据,修改Python样例代码。 2022-08-16 使用Spark作业访问DLI元数据,删除“不支持创建加密的DLI表”的相关描述。 2022-08-09 新增Flink作业高可靠推荐配置指导(异常自动重启)。 2022-07-19 新增使用Flink Jar连接开启SASL_SSL认证的Kafka。
  • 步骤8:查看作业运行结果 在Spark作业管理界面显示已提交的作业运行状态。初始状态显示为“启动中”。 如果作业运行成功则作业状态显示为“已成功”,通过以下操作查看创建的数据库和表。 可以在DLI控制台,左侧导航栏,单击“SQL编辑器”。在“数据库”中已显示创建的数据库“test_sparkapp”。 图14 查看创建的数据库 双击数据库名,可以在数据库下查看已创建成功的DLI和OBS表。 图15 查看表 双击DLI表名dli_testtable,单击“执行”查询DLI表数据。 图16 查询DLI表数据 注释掉DLI表查询语句,双击OBS表名dli_testobstable,单击“执行”查询OBS表数据。 图17 查询OBS表数据 如果作业运行失败则作业状态显示为“已失败”,单击“操作”列“更多”下的“Driver日志”,显示当前作业运行的日志,分析报错原因。 图18 查看Driver日志 原因定位解决后,可以在作业“操作”列,单击“编辑”,修改作业相关参数后,单击“执行”重新运行该作业即可。
  • Java样例代码 本示例操作步骤采用Java进行编码,具体完整的样例代码参考如下: package com.huawei.dli.demo; import org.apache.spark.sql.SparkSession; public class DliCatalogTest { public static void main(String[] args) { SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate(); spark.sql("create database if not exists test_sparkapp").collect(); spark.sql("drop table if exists test_sparkapp.dli_testtable").collect(); spark.sql("create table test_sparkapp.dli_testtable(id INT, name STRING)").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (123,'jason')").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (456,'merry')").collect(); spark.sql("drop table if exists test_sparkapp.dli_testobstable").collect(); spark.sql("create table test_sparkapp.dli_testobstable(age INT, name STRING) using csv options (path 'obs://dli-test-obs01/testdata.csv')").collect(); spark.stop(); } }
  • scala样例代码 object DliCatalogTest { def main(args:Array[String]): Unit = { val sql = args(0) val runDdl = Try(args(1).toBoolean).getOrElse(true) System.out.println(s"sql is $sql runDdl is $runDdl") val sparkConf = new SparkConf(true) sparkConf .set("spark.sql.session.state.builder","org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .set("spark.sql.catalog.class","org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") sparkConf.setAppName("dlicatalogtester") val spark = SparkSession.builder .config(sparkConf) .enableHiveSupport() .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("SparkTest") .getOrCreate() System.out.println("catalog is " + spark.sessionState.catalog.toString) if (runDdl) { val df = spark.sql(sql).collect() } else { spark.sql(sql).show() } spark.close() } }
  • Python样例代码 #!/usr/bin/python # -*- coding: UTF-8 -*- from __future__ import print_function import sys from pyspark.sql import SparkSession if __name__ == "__main__": url = sys.argv[1] creatTbl = "CREATE TABLE test_sparkapp.dli_rds USING JDBC OPTIONS ('url'='jdbc:mysql://%s'," \ "'driver'='com.mysql.jdbc.Driver','dbtable'='test.test'," \ " 'passwdauth' = 'DatasourceRDSTest_pwd','encryption' = 'true')" % url spark = SparkSession \ .builder \ .enableHiveSupport() \ .config("spark.sql.session.state.builder","org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") \ .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") \ .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") \ .appName("python Spark test catalog") \ .getOrCreate() spark.sql("CREATE database if not exists test_sparkapp").collect() spark.sql("drop table if exists test_sparkapp.dli_rds").collect() spark.sql(creatTbl).collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("insert into table test_sparkapp.dli_rds select 12,'aaa'").collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("insert overwrite table test_sparkapp.dli_rds select 1111,'asasasa'").collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("drop table test_sparkapp.dli_rds").collect() spark.stop()
  • 开发流程 DLI进行Spark作业访问DLI元数据开发流程参考如下: 图1 Spark作业访问DLI元数据开发流程 表2 开发流程说明 序号 阶段 操作界面 说明 1 创建DLI通用队列 DLI控制台 创建作业运行的DLI队列。 2 OBS桶文件配置 OBS控制台 如果是创建OBS表,则需要上传文件数据到OBS桶下。 配置Spark创建表的元数据信息的存储路径。该文件夹路径用来存储Spark创建表的元数据信息“spark.sql.warehouse.dir”。 3 新建Maven工程,配置pom文件 IntelliJ IDEA 参考样例代码说明,编写程序代码创建DLI表或OBS表。 4 编写程序代码 5 调试,编译代码并导出Jar包 6 上传Jar包到OBS和DLI OBS控制台 将生成的Spark Jar包文件上传到OBS目录下和DLI程序包中。 7 创建Spark Jar作业 DLI控制台 在DLI控制台创建Spark Jar作业并提交运行作业。 8 查看作业运行结果 DLI控制台 查看作业运行状态和作业运行日志。
  • 步骤2:OBS桶文件配置 如果需要创建OBS表,则需要先上传数据到OBS桶目录下。 本次演示的样例代码创建了OBS表,测试数据内容参考如下示例,创建名为的testdata.csv文件。 12,Michael 27,Andy 30,Justin 进入OBS管理控制台,在“桶列表”下,单击已创建的OBS桶名称,本示例桶名为“dli-test-obs01”,进入“概览”页面。 单击左侧列表中的“对象”,选择“上传对象”,将testdata.csv文件上传到OBS桶根目录下。 在OBS桶根目录下,单击“新建文件夹”,创建名为“warehousepath”的文件夹。该文件夹路径用来存储Spark创建表的元数据信息“spark.sql.warehouse.dir”。
  • 环境准备 在进行Spark 作业访问DLI元数据开发前,请准备以下开发环境。 表1 Spark Jar作业开发环境 准备项 说明 操作系统 Windows系统,支持Windows7以上版本。 安装JDK JDK使用1.8版本。 安装和配置IntelliJ IDEA IntelliJ IDEA为进行应用开发的工具,版本要求使用2019.1或其他兼容版本。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。
  • 步骤4:编写代码 编写DliCatalogTest程序创建数据库、DLI表和OBS表。 完整的样例请参考Java样例代码,样例代码分段说明如下: 导入依赖的包。 import org.apache.spark.sql.SparkSession; 创建SparkSession会话。 创建SparkSession会话时需要指定Spark参数:"spark.sql.session.state.builder"、"spark.sql.catalog.class"和"spark.sql.extensions",按照样例配置即可。 Spark2.3.x版本 SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate(); Spark2.4.x版本 SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .config("spark.sql.hive.implementation","org.apache.spark.sql.hive.client.DliHiveClientImpl") .appName("java_spark_demo") .getOrCreate(); Spark3.1.x版本 SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate(); Spark3.3.x版本 SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.DliLakeHouseBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.DliLakeHouseCatalog") .appName("java_spark_demo") .getOrCreate(); 创建数据库。 如下样例代码演示,创建名为test_sparkapp的数据库。 spark.sql("create database if not exists test_sparkapp").collect(); 创建DLI表并插入测试数据。 spark.sql("drop table if exists test_sparkapp.dli_testtable").collect(); spark.sql("create table test_sparkapp.dli_testtable(id INT, name STRING)").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (123,'jason')").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (456,'merry')").collect(); 创建OBS表。如下示例中的OBS路径需要根据步骤2:OBS桶文件配置中的实际数据路径修改。 spark.sql("drop table if exists test_sparkapp.dli_testobstable").collect(); spark.sql("create table test_sparkapp.dli_testobstable(age INT, name STRING) using csv options (path 'obs://dli-test-obs01/testdata.csv')").collect(); 关闭SparkSession会话spark。 spark.stop();
  • 步骤5:调试、编译代码并导出Jar包 双击IntelliJ IDEA工具右侧的“Maven”,参考下图分别双击“clean”、“compile”对代码进行编译。 编译成功后,双击“package”对代码进行打包。 图10 编译打包 打包成功后,生成的Jar包会放到target目录下,以备后用。本示例将会生成到:“D:\DLITest\SparkJarMetadata\target”下名为“SparkJarMetadata-1.0-SNAPSHOT.jar”。 图11 导出jar包
  • 步骤1:创建DLI通用队列 第一次提交Spark作业,需要先创建队列,例如创建名为“sparktest”的队列,队列类型选择为“通用队列”。 在DLI管理控制台的左侧导航栏中,选择“队列管理”。 单击“队列管理”页面右上角“购买队列”进行创建队列。 创建名为“sparktest”的队列,队列类型选择为“通用队列”。创建队列详细介绍请参考创建队列。 图2 创建队列 单击“立即购买”,确认配置。 配置确认无误,单击“提交”完成队列创建。
  • 约束限制 如果使用Spark 3.1访问元数据,则必须新建队列。 不支持的场景: 在SQL作业中创建了数据库(database),编写程序代码指定在该数据库下创建表。 例如在DLI的SQL编辑器中的某SQL队列下,创建了数据库testdb。后续通过编写程序代码在testdb下创建表testTable,编译打包后提交的Spark Jar作业则会运行失败。 支持的场景 在SQL作业中创建数据库(database),表(table) , 通过SQL或Spark程序作业读取插入数据。 在Spark程序作业中创建数据库(database),表(table), 通过SQL或Spark程序作业读取插入数据。
  • 准备Flink作业数据 创建Flink作业需要输入数据源和数据输出通道,即常说的Source和Sink。用户使用其他服务作为数据源或输出通道时,需要先开通相应服务。 Flink作业支持以下数据源和输出通道: DIS数据源和输出通道 如果用户作业需要DIS作为数据源和输出通道时,则要先开通数据接入服务(DIS)。 用户如何开通DIS服务,具体操作请参见《数据接入服务用户指南》中的“开通DIS通道”章节。 申请DIS通道后,用户可以将本地数据通过DIS通道不断上传至DIS服务,实现向Flink作业提供实时流数据源,具体操作请参见《数据接入服务用户指南》中的“发送数据到DIS服务”章节。 样例数据如下所示: 1,lilei,bmw320i,28 2,hanmeimei,audia4,27 OBS数据源 如果用户作业需要对象存储服务(OBS)作为数据源,则要先开通OBS服务,具体操作请参见《对象存储服务控制台指南》中的“开通OBS服务”章节。 开通OBS服务后,用户需要将本地文件通过Internet上传至OBS指定的位置,具体操作请参见《对象存储服务控制台指南》中的“上传文件”章节。 RDS输出通道 如果用户作业需要RDS作为输出通道,需要创建RDS实例,具体操作请参见《关系型数据库快速入门》中“购买实例”章节。 SMN输出通道 如果用户作业需要SMN作为输出通道,需要先在SMN中创建主题,获取URN资源标识,再添加订阅。具体操作请参见《消息通知服务快速入门》。 Kafka数据源和输出通道 如果用户作业需要Kafka作为数据源和输出通道,则必须要通过创建增强型跨源连接与Kafka进行对接,具体操作请参见增强型跨源连接。 如果Kafka服务端的端口监听在hostname上,则需要将Kafka Broker节点的hostname和ip的对应关系添加到跨源连接中。 CloudTable数据源和输出通道 如果用户作业需要CloudTable作为数据源和输出通道,需要先在CloudTable中创建集群,获取集群ID。具体操作请参见《表格存储服务用户指南》《表格存储服务用户指南》中的“入门”章节。 云搜索服务输出通道 如果用户作业需要云搜索服务作为输出通道,需要先在云搜索服务中创建集群,获取集群内网访问地址。具体操作请参见《云搜索服务用户指南》中的“入门”章节。 DCS输出通道 如果用户作业需要DCS作为输出通道,需要先在DCS中创建Redis类型的缓存实例,获取Redis实例连接地址。具体操作请参见购买Redis实例。 父主题: Flink作业管理
  • 修订记录 发布日期 修订记录 2024-05-28 新增 批作业SQL常用配置项说明新增配置参数:spark.sql.legacy.correlated.scalar.query.enabled 使用Hive语法创建OBS表新增关于设置多字符分隔符的使用示例。 使用Hive语法创建DLI表新增关于设置多字符分隔符的使用示例。 2024-02-01 新增 Spark开源命令支持说明。 2024-01-30 优化 使用DataSource语法创建OBS表,补充CTAS创建分区表约束限制说明。 使用Hive语法创建OBS表,补充CTAS创建分区表约束限制说明。 使用DataSource语法创建DLI表,补充CTAS创建分区表约束限制说明。 使用Hive语法创建DLI表,补充CTAS创建分区表约束限制说明。 2023-11-17 优化 创建函数,删除关键字TEMPORARY。 2023-11-09 优化 使用DataSource语法创建OBS表,补充示例代码。 使用Hive语法创建OBS表,补充示例代码。 使用DataSource语法创建DLI表,补充示例代码。 使用Hive语法创建DLI表,补充示例代码。 2023-09-22 优化 内置函数,优化日期函数、字符串函数、数学函数、聚合函数、分析窗口函数、其他函数的内容。 2023-08-14 新增 修改列注释 2023-08-01 新增 跨源连接Oracle表。 2023-04-02 修改 使用DataSource语法创建OBS表,修改建表相关参数说明。 2023-02-23 修改 使用DataSource语法创建OBS表,修改PERMISSIVE参数说明。 2023-01-11 修改 DDL语法定义,补充对CREATE FUNCTION语句的功能描述。 2022-11-09 修改 创建DLI表关联HBase,补充约束该语法不支持安全集群。 2022-10-26 修改 时间函数,描述信息中补充FROM_UNIXTIME(numeric[, string])的单位。 窗口,补充time_attr参数说明。 2022-09-01 修改OBS输出流,修改关键字说明。
  • 批作业SQL常用配置项说明 本章节为您介绍DLI 批作业SQL语法的常用配置项。 表1 常用配置项 名称 默认值 描述 spark.sql.files.maxRecordsPerFile 0 要写入单个文件的最大记录数。如果该值为零或为负,则没有限制。 spark.sql.shuffle.partitions 200 为连接或聚合过滤数据时使用的默认分区数。 spark.sql.dynamicPartitionOverwrite.enabled false 当前配置设置为“false”时,DLI在覆盖写之前,会删除所有符合条件的分区。例如,分区表中有一个“2021-01”的分区,当使用INSERT OVERWRITE语句向表中写入“2021-02”这个分区的数据时,会把“2021-01”的分区数据也覆盖掉。 当前配置设置为“true”时,DLI不会提前删除分区,而是在运行时覆盖那些有数据写入的分区。 spark.sql.files.maxPartitionBytes 134217728 读取文件时要打包到单个分区中的最大字节数。 spark.sql.badRecordsPath - Bad Records的路径。 spark.sql.legacy.correlated.scalar.query.enabled false 该参数设置为true: 当子查询中数据不重复的情况下,执行关联子查询,不需要对子查询的结果去重。 当子查询中数据重复的情况下,执行关联子查询,会提示异常,必须对子查询的结果做去重处理,比如max(),min()。 该参数设置为false: 不管子查询中数据重复与否,执行关联子查询时,都需要对子查询的结果去重,比如max(),min(),否则提示异常。 spark.sql.keep.distinct.expandThreshold - 参数说明: 对于包含count(distinct)的多维分析(with cube)的查询场景,spark典型的执行计划是将cube使用expand算子来实现,但该操作会导致查询膨胀,为了避免出现查询膨胀,建议执行如下配置: spark.sql.keep.distinct.expandThreshold: 默认值:-1,即使用Spark默认的expand算子。 设置具体数值:即代表定义了查询膨胀的阈值(例如512),超过该阈值count(distinct) 使用distinct聚合算子来执行,不再使用expand算子。 spark.sql.distinct.aggregator.enabled:强制使用distinct聚合算子的开关。配置为true时不再根据spark.sql.keep.distinct.expandThreshold来判断。 适用场景:包含count(distinct)的多维分析(with cube)的查询场景,可能包含多个count(distinct),且包含cube/roll up 典型场景示例: SELECT a1, a2, count(distinct b), count(distinct c) FROM test_distinct group by a1, a2 with cube spark.sql.distinct.aggregator.enabled false 父主题: Spark SQL语法参考(即将下线)