云服务器内容精选
-
修订记录 发布日期 修订记录 2024-05-29 使用Spark作业访问DLI元数据,补充说明不同Spark版本访问元数据的示例代码。 2024-05-17 修改: 修改Flink Jar作业开发基础样例示例。 2024-04-07 新增: 使用Spark Jar作业读取和查询OBS数据,修改“上传Jar包到OBS和DLI下”的操作步骤。 2024-03-30 新增: Flink作业委托场景开发指导。 Spark作业委托场景开发指导 2023-12-08 下线“使用Beeline提交Spark SQL作业”,推荐使用使用JDBC提交Spark SQL作业。 2023-11-09 修改 pyspark样例代码示例代码对3.1.1版本的相关说明。 2023-09-18 DLI从Spark 3.x版本开始不支持内置地理空间查询函数,下线“在Spark SQL作业中使用地理空间函数”。 2023-07-21 下线“使用ODBC提交Spark SQL作业”,您可以参考“ FineBI工具对接DLI Trino”、“PowerBI工具对接DLI Trino”对接DLI Trino。 2023-06-07 Flink Jar作业开发基础样例,补充传参和查看日志的操作指导。 2023-03-09 手册结构调整,按DLI作业类型拆分手册章节。 新增使用Flink Jar读写DIS开发指南。 2023-03-03 使用Spark作业跨源访问数据源,修改对Module的说明。 2022-10-28 使用Spark作业访问DLI元数据,修改Python样例代码。 2022-08-16 使用Spark作业访问DLI元数据,删除“不支持创建加密的DLI表”的相关描述。 2022-08-09 新增Flink作业高可靠推荐配置指导(异常自动重启)。 2022-07-19 新增使用Flink Jar连接开启SASL_SSL认证的Kafka。
-
步骤8:查看作业运行结果 在Spark作业管理界面显示已提交的作业运行状态。初始状态显示为“启动中”。 如果作业运行成功则作业状态显示为“已成功”,通过以下操作查看创建的数据库和表。 可以在DLI控制台,左侧导航栏,单击“SQL编辑器”。在“数据库”中已显示创建的数据库“test_sparkapp”。 图14 查看创建的数据库 双击数据库名,可以在数据库下查看已创建成功的DLI和OBS表。 图15 查看表 双击DLI表名dli_testtable,单击“执行”查询DLI表数据。 图16 查询DLI表数据 注释掉DLI表查询语句,双击OBS表名dli_testobstable,单击“执行”查询OBS表数据。 图17 查询OBS表数据 如果作业运行失败则作业状态显示为“已失败”,单击“操作”列“更多”下的“Driver日志”,显示当前作业运行的日志,分析报错原因。 图18 查看Driver日志 原因定位解决后,可以在作业“操作”列,单击“编辑”,修改作业相关参数后,单击“执行”重新运行该作业即可。
-
Java样例代码 本示例操作步骤采用Java进行编码,具体完整的样例代码参考如下: package com.huawei.dli.demo; import org.apache.spark.sql.SparkSession; public class DliCatalogTest { public static void main(String[] args) { SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate(); spark.sql("create database if not exists test_sparkapp").collect(); spark.sql("drop table if exists test_sparkapp.dli_testtable").collect(); spark.sql("create table test_sparkapp.dli_testtable(id INT, name STRING)").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (123,'jason')").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (456,'merry')").collect(); spark.sql("drop table if exists test_sparkapp.dli_testobstable").collect(); spark.sql("create table test_sparkapp.dli_testobstable(age INT, name STRING) using csv options (path 'obs://dli-test-obs01/testdata.csv')").collect(); spark.stop(); } }
-
scala样例代码 object DliCatalogTest { def main(args:Array[String]): Unit = { val sql = args(0) val runDdl = Try(args(1).toBoolean).getOrElse(true) System.out.println(s"sql is $sql runDdl is $runDdl") val sparkConf = new SparkConf(true) sparkConf .set("spark.sql.session.state.builder","org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .set("spark.sql.catalog.class","org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") sparkConf.setAppName("dlicatalogtester") val spark = SparkSession.builder .config(sparkConf) .enableHiveSupport() .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("SparkTest") .getOrCreate() System.out.println("catalog is " + spark.sessionState.catalog.toString) if (runDdl) { val df = spark.sql(sql).collect() } else { spark.sql(sql).show() } spark.close() } }
-
Python样例代码 #!/usr/bin/python # -*- coding: UTF-8 -*- from __future__ import print_function import sys from pyspark.sql import SparkSession if __name__ == "__main__": url = sys.argv[1] creatTbl = "CREATE TABLE test_sparkapp.dli_rds USING JDBC OPTIONS ('url'='jdbc:mysql://%s'," \ "'driver'='com.mysql.jdbc.Driver','dbtable'='test.test'," \ " 'passwdauth' = 'DatasourceRDSTest_pwd','encryption' = 'true')" % url spark = SparkSession \ .builder \ .enableHiveSupport() \ .config("spark.sql.session.state.builder","org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") \ .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") \ .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") \ .appName("python Spark test catalog") \ .getOrCreate() spark.sql("CREATE database if not exists test_sparkapp").collect() spark.sql("drop table if exists test_sparkapp.dli_rds").collect() spark.sql(creatTbl).collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("insert into table test_sparkapp.dli_rds select 12,'aaa'").collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("insert overwrite table test_sparkapp.dli_rds select 1111,'asasasa'").collect() spark.sql("select * from test_sparkapp.dli_rds").show() spark.sql("drop table test_sparkapp.dli_rds").collect() spark.stop()
-
开发流程 DLI进行Spark作业访问DLI元数据开发流程参考如下: 图1 Spark作业访问DLI元数据开发流程 表2 开发流程说明 序号 阶段 操作界面 说明 1 创建DLI通用队列 DLI控制台 创建作业运行的DLI队列。 2 OBS桶文件配置 OBS控制台 如果是创建OBS表,则需要上传文件数据到OBS桶下。 配置Spark创建表的元数据信息的存储路径。该文件夹路径用来存储Spark创建表的元数据信息“spark.sql.warehouse.dir”。 3 新建Maven工程,配置pom文件 IntelliJ IDEA 参考样例代码说明,编写程序代码创建DLI表或OBS表。 4 编写程序代码 5 调试,编译代码并导出Jar包 6 上传Jar包到OBS和DLI OBS控制台 将生成的Spark Jar包文件上传到OBS目录下和DLI程序包中。 7 创建Spark Jar作业 DLI控制台 在DLI控制台创建Spark Jar作业并提交运行作业。 8 查看作业运行结果 DLI控制台 查看作业运行状态和作业运行日志。
-
步骤2:OBS桶文件配置 如果需要创建OBS表,则需要先上传数据到OBS桶目录下。 本次演示的样例代码创建了OBS表,测试数据内容参考如下示例,创建名为的testdata.csv文件。 12,Michael 27,Andy 30,Justin 进入OBS管理控制台,在“桶列表”下,单击已创建的OBS桶名称,本示例桶名为“dli-test-obs01”,进入“概览”页面。 单击左侧列表中的“对象”,选择“上传对象”,将testdata.csv文件上传到OBS桶根目录下。 在OBS桶根目录下,单击“新建文件夹”,创建名为“warehousepath”的文件夹。该文件夹路径用来存储Spark创建表的元数据信息“spark.sql.warehouse.dir”。
-
环境准备 在进行Spark 作业访问DLI元数据开发前,请准备以下开发环境。 表1 Spark Jar作业开发环境 准备项 说明 操作系统 Windows系统,支持Windows7以上版本。 安装JDK JDK使用1.8版本。 安装和配置IntelliJ IDEA IntelliJ IDEA为进行应用开发的工具,版本要求使用2019.1或其他兼容版本。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。
-
步骤4:编写代码 编写DliCatalogTest程序创建数据库、DLI表和OBS表。 完整的样例请参考Java样例代码,样例代码分段说明如下: 导入依赖的包。 import org.apache.spark.sql.SparkSession; 创建SparkSession会话。 创建SparkSession会话时需要指定Spark参数:"spark.sql.session.state.builder"、"spark.sql.catalog.class"和"spark.sql.extensions",按照样例配置即可。 Spark2.3.x版本 SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate(); Spark2.4.x版本 SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .config("spark.sql.hive.implementation","org.apache.spark.sql.hive.client.DliHiveClientImpl") .appName("java_spark_demo") .getOrCreate(); Spark3.1.x版本 SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.UQueryHiveACLSessionStateBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.UQueryHiveACLExternalCatalog") .config("spark.sql.extensions","org.apache.spark.sql.DliSparkExtension") .appName("java_spark_demo") .getOrCreate(); Spark3.3.x版本 SparkSession spark = SparkSession .builder() .config("spark.sql.session.state.builder", "org.apache.spark.sql.hive.DliLakeHouseBuilder") .config("spark.sql.catalog.class", "org.apache.spark.sql.hive.DliLakeHouseCatalog") .appName("java_spark_demo") .getOrCreate(); 创建数据库。 如下样例代码演示,创建名为test_sparkapp的数据库。 spark.sql("create database if not exists test_sparkapp").collect(); 创建DLI表并插入测试数据。 spark.sql("drop table if exists test_sparkapp.dli_testtable").collect(); spark.sql("create table test_sparkapp.dli_testtable(id INT, name STRING)").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (123,'jason')").collect(); spark.sql("insert into test_sparkapp.dli_testtable VALUES (456,'merry')").collect(); 创建OBS表。如下示例中的OBS路径需要根据步骤2:OBS桶文件配置中的实际数据路径修改。 spark.sql("drop table if exists test_sparkapp.dli_testobstable").collect(); spark.sql("create table test_sparkapp.dli_testobstable(age INT, name STRING) using csv options (path 'obs://dli-test-obs01/testdata.csv')").collect(); 关闭SparkSession会话spark。 spark.stop();
-
步骤5:调试、编译代码并导出Jar包 双击IntelliJ IDEA工具右侧的“Maven”,参考下图分别双击“clean”、“compile”对代码进行编译。 编译成功后,双击“package”对代码进行打包。 图10 编译打包 打包成功后,生成的Jar包会放到target目录下,以备后用。本示例将会生成到:“D:\DLITest\SparkJarMetadata\target”下名为“SparkJarMetadata-1.0-SNAPSHOT.jar”。 图11 导出jar包
-
步骤1:创建DLI通用队列 第一次提交Spark作业,需要先创建队列,例如创建名为“sparktest”的队列,队列类型选择为“通用队列”。 在DLI管理控制台的左侧导航栏中,选择“队列管理”。 单击“队列管理”页面右上角“购买队列”进行创建队列。 创建名为“sparktest”的队列,队列类型选择为“通用队列”。创建队列详细介绍请参考创建队列。 图2 创建队列 单击“立即购买”,确认配置。 配置确认无误,单击“提交”完成队列创建。
-
约束限制 如果使用Spark 3.1访问元数据,则必须新建队列。 不支持的场景: 在SQL作业中创建了数据库(database),编写程序代码指定在该数据库下创建表。 例如在DLI的SQL编辑器中的某SQL队列下,创建了数据库testdb。后续通过编写程序代码在testdb下创建表testTable,编译打包后提交的Spark Jar作业则会运行失败。 支持的场景 在SQL作业中创建数据库(database),表(table) , 通过SQL或Spark程序作业读取插入数据。 在Spark程序作业中创建数据库(database),表(table), 通过SQL或Spark程序作业读取插入数据。
-
准备Flink作业数据 创建Flink作业需要输入数据源和数据输出通道,即常说的Source和Sink。用户使用其他服务作为数据源或输出通道时,需要先开通相应服务。 Flink作业支持以下数据源和输出通道: DIS数据源和输出通道 如果用户作业需要DIS作为数据源和输出通道时,则要先开通数据接入服务(DIS)。 用户如何开通DIS服务,具体操作请参见《数据接入服务用户指南》中的“开通DIS通道”章节。 申请DIS通道后,用户可以将本地数据通过DIS通道不断上传至DIS服务,实现向Flink作业提供实时流数据源,具体操作请参见《数据接入服务用户指南》中的“发送数据到DIS服务”章节。 样例数据如下所示: 1,lilei,bmw320i,28 2,hanmeimei,audia4,27 OBS数据源 如果用户作业需要对象存储服务(OBS)作为数据源,则要先开通OBS服务,具体操作请参见《对象存储服务控制台指南》中的“开通OBS服务”章节。 开通OBS服务后,用户需要将本地文件通过Internet上传至OBS指定的位置,具体操作请参见《对象存储服务控制台指南》中的“上传文件”章节。 RDS输出通道 如果用户作业需要RDS作为输出通道,需要创建RDS实例,具体操作请参见《关系型数据库快速入门》中“购买实例”章节。 SMN输出通道 如果用户作业需要SMN作为输出通道,需要先在SMN中创建主题,获取URN资源标识,再添加订阅。具体操作请参见《消息通知服务快速入门》。 Kafka数据源和输出通道 如果用户作业需要Kafka作为数据源和输出通道,则必须要通过创建增强型跨源连接与Kafka进行对接,具体操作请参见增强型跨源连接。 如果Kafka服务端的端口监听在hostname上,则需要将Kafka Broker节点的hostname和ip的对应关系添加到跨源连接中。 CloudTable数据源和输出通道 如果用户作业需要CloudTable作为数据源和输出通道,需要先在CloudTable中创建集群,获取集群ID。具体操作请参见《表格存储服务用户指南》《表格存储服务用户指南》中的“入门”章节。 云搜索服务输出通道 如果用户作业需要云搜索服务作为输出通道,需要先在云搜索服务中创建集群,获取集群内网访问地址。具体操作请参见《云搜索服务用户指南》中的“入门”章节。 DCS输出通道 如果用户作业需要DCS作为输出通道,需要先在DCS中创建Redis类型的缓存实例,获取Redis实例连接地址。具体操作请参见购买Redis实例。 父主题: Flink作业管理
-
修订记录 发布日期 修订记录 2024-05-28 新增 批作业SQL常用配置项说明新增配置参数:spark.sql.legacy.correlated.scalar.query.enabled 使用Hive语法创建OBS表新增关于设置多字符分隔符的使用示例。 使用Hive语法创建DLI表新增关于设置多字符分隔符的使用示例。 2024-02-01 新增 Spark开源命令支持说明。 2024-01-30 优化 使用DataSource语法创建OBS表,补充CTAS创建分区表约束限制说明。 使用Hive语法创建OBS表,补充CTAS创建分区表约束限制说明。 使用DataSource语法创建DLI表,补充CTAS创建分区表约束限制说明。 使用Hive语法创建DLI表,补充CTAS创建分区表约束限制说明。 2023-11-17 优化 创建函数,删除关键字TEMPORARY。 2023-11-09 优化 使用DataSource语法创建OBS表,补充示例代码。 使用Hive语法创建OBS表,补充示例代码。 使用DataSource语法创建DLI表,补充示例代码。 使用Hive语法创建DLI表,补充示例代码。 2023-09-22 优化 内置函数,优化日期函数、字符串函数、数学函数、聚合函数、分析窗口函数、其他函数的内容。 2023-08-14 新增 修改列注释 2023-08-01 新增 跨源连接Oracle表。 2023-04-02 修改 使用DataSource语法创建OBS表,修改建表相关参数说明。 2023-02-23 修改 使用DataSource语法创建OBS表,修改PERMISSIVE参数说明。 2023-01-11 修改 DDL语法定义,补充对CREATE FUNCTION语句的功能描述。 2022-11-09 修改 创建DLI表关联HBase,补充约束该语法不支持安全集群。 2022-10-26 修改 时间函数,描述信息中补充FROM_UNIXTIME(numeric[, string])的单位。 窗口,补充time_attr参数说明。 2022-09-01 修改OBS输出流,修改关键字说明。
-
批作业SQL常用配置项说明 本章节为您介绍DLI 批作业SQL语法的常用配置项。 表1 常用配置项 名称 默认值 描述 spark.sql.files.maxRecordsPerFile 0 要写入单个文件的最大记录数。如果该值为零或为负,则没有限制。 spark.sql.shuffle.partitions 200 为连接或聚合过滤数据时使用的默认分区数。 spark.sql.dynamicPartitionOverwrite.enabled false 当前配置设置为“false”时,DLI在覆盖写之前,会删除所有符合条件的分区。例如,分区表中有一个“2021-01”的分区,当使用INSERT OVERWRITE语句向表中写入“2021-02”这个分区的数据时,会把“2021-01”的分区数据也覆盖掉。 当前配置设置为“true”时,DLI不会提前删除分区,而是在运行时覆盖那些有数据写入的分区。 spark.sql.files.maxPartitionBytes 134217728 读取文件时要打包到单个分区中的最大字节数。 spark.sql.badRecordsPath - Bad Records的路径。 spark.sql.legacy.correlated.scalar.query.enabled false 该参数设置为true: 当子查询中数据不重复的情况下,执行关联子查询,不需要对子查询的结果去重。 当子查询中数据重复的情况下,执行关联子查询,会提示异常,必须对子查询的结果做去重处理,比如max(),min()。 该参数设置为false: 不管子查询中数据重复与否,执行关联子查询时,都需要对子查询的结果去重,比如max(),min(),否则提示异常。 spark.sql.keep.distinct.expandThreshold - 参数说明: 对于包含count(distinct)的多维分析(with cube)的查询场景,spark典型的执行计划是将cube使用expand算子来实现,但该操作会导致查询膨胀,为了避免出现查询膨胀,建议执行如下配置: spark.sql.keep.distinct.expandThreshold: 默认值:-1,即使用Spark默认的expand算子。 设置具体数值:即代表定义了查询膨胀的阈值(例如512),超过该阈值count(distinct) 使用distinct聚合算子来执行,不再使用expand算子。 spark.sql.distinct.aggregator.enabled:强制使用distinct聚合算子的开关。配置为true时不再根据spark.sql.keep.distinct.expandThreshold来判断。 适用场景:包含count(distinct)的多维分析(with cube)的查询场景,可能包含多个count(distinct),且包含cube/roll up 典型场景示例: SELECT a1, a2, count(distinct b), count(distinct c) FROM test_distinct group by a1, a2 with cube spark.sql.distinct.aggregator.enabled false 父主题: Spark SQL语法参考(即将下线)
更多精彩内容
CDN加速
GaussDB
文字转换成语音
免费的服务器
如何创建网站
域名网站购买
私有云桌面
云主机哪个好
域名怎么备案
手机云电脑
SSL证书申请
云点播服务器
免费OCR是什么
电脑云桌面
域名备案怎么弄
语音转文字
文字图片识别
云桌面是什么
网址安全检测
网站建设搭建
国外CDN加速
SSL免费证书申请
短信批量发送
图片OCR识别
云数据库MySQL
个人域名购买
录音转文字
扫描图片识别文字
OCR图片识别
行驶证识别
虚拟电话号码
电话呼叫中心软件
怎么制作一个网站
Email注册网站
华为VNC
图像文字识别
企业网站制作
个人网站搭建
华为云计算
免费租用云托管
云桌面云服务器
ocr文字识别免费版
HTTPS证书申请
图片文字识别转换
国外域名注册商
使用免费虚拟主机
云电脑主机多少钱
鲲鹏云手机
短信验证码平台
OCR图片文字识别
SSL证书是什么
申请企业邮箱步骤
免费的企业用邮箱
云免流搭建教程
域名价格