华为云用户手册

  • 基于API的Glob路径模式以获取LocatedFileStatus和从FileStatus打开文件 在DistributedFileSystem中添加了以下API,以获取具有块位置的FileStatus,并从FileStatus对象打开文件。这些API将减少从客户端到Namenode的RPC调用的数量。 表6 FileSystem API接口说明 Interface接口 Description说明 public LocatedFileStatus[] globLocatedStatus(Path, PathFilter, boolean) throws IOException 返回一个LocatedFileStatus对象数组,其对应文件路径符合路径过滤规则。 public FSDataInputStream open(FileStatus stat) throws IOException 如果stat对象是LocatedFileStatusHdfs的实例,该实例已具有位置信息,则直接创建InputStream而不联系Namenode。
  • 样例代码 使用Python方式提交数据分析任务,参考样例程序中的“hive-examples/python-examples/pyCLI_sec.py”。 导入HAConnection类。 from pyhs2.haconnection import HAConnection 声明HiveServer的IP地址列表。本例中hosts代表HiveServer的节点,xxx.xxx.xxx.xxx代表业务IP地址。 hosts = ["xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx"] 如果HiveServer实例被迁移,原始的示例程序会失效。在HiveServer实例迁移之后,用户需要更新示例程序中使用的HiveServer的IP地址。 在HAConnection的第三个参数填写正确的用户名,密码可以不填写。创建连接,执行HQL,样例代码中仅执行查询所有表功能,可根据实际情况修改HQL内容,输出查询的列名和结果到控制台。 try: with HAConnection(hosts = hosts, port = 10000, authMechanism = "PLAIN", user='root', password='******') as haConn: with haConn.getConnection() as conn: with conn.cursor() as cur: # Show databases print cur.getDatabases() # Execute query cur.execute("show tables") # Return column info from query print cur.getSchema() # Fetch table results for i in cur.fetch(): print iexcept Exception, e: print e
  • SpringBoot样例工程的命令行形式运行 在IDEA界面左下方单击“Terminal”进入终端,执行命令mvn clean package进行编译。 当输出“BUILD SUC CES S”,表示编译成功,如下图所示。编译成功后将会在样例工程的target下生成含有“-with-dependencies”字段的Jar包。 在Windows或Linux上创建一个目录作为运行目录,如“D:\hive-rest-client-example”(Windows环境)或“/opt/hive-rest-client-example”(Linux环境),将1中生成的“target”目录下包名中含有“-with-dependencies”字段的Jar包放进该路径下,并在该目录下创建子目录“src/main/resources"。将hive-rest-client-example工程resources目录下的所有文件复制到“resources”下。 执行以下命令启动SpringBoot服务: 在Windows环境下执行: cd /d d:\hive-rest-client-example java -jar hive-rest-client-example-8.1.0.1-3.5.1-SNAPSHOT-jar-with-dependencies.jar 在Linux环境下执行: chmod +x /opt/hive-rest-client-example -R cd /opt/hive-rest-client-example java -jar hive-rest-client-example-8.1.0.1-3.5.1-SNAPSHOT-jar-with-dependencies.jar 以上Jar包名称仅供参考,具体名称以实际生成为主。 调用Hive的SpringBoot样例接口触发样例代码运行: Windows环境运行方式: 打开浏览器,输入:http://localhost:8080/hive/example/executesql。 Linux环境下执行运行方式: 在2中存放Jar的节点上执行curl http://localhost:8080/hive/example/executesql命令。 运行样例代码时日志中可能会打印以下日志信息,虽然日志级别显示ERROR,但是不影响执行结果: ERROR 51320 --- [c-8-EventThread] o.a.c.framework.imps.EnsembleTracker : Invalid config event received: {version=100000000, server.48=ZooKeeper节点IP地址:ZooKeeper端口号:ZooKeeper端口号:participant...} 查看样例代码中的HQL所查询出的结果。 Windows环境运行成功结果会有如下信息: =========================== Hive Example Start ===========================Start create table.Table created successfully.Start to insert data into the table.Inserting data to the table succeeded.Start to query table data.Query result :employees_infoa.id employees_infoa.age employees_infoa.name1 31 SJK2 25 HS3 28 HTQuerying table data succeeded.Start to delete the table.Table deleted successfully.=========================== Hive Example End =========================== Linux环境运行成功结果会有如下信息: =========================== Hive Example Start ===========================Start create table.Table created successfully.Start to insert data into the table.Inserting data to the table succeeded.Start to query table data.Query result :employees_infoa.id employees_infoa.age employees_infoa.name1 31 SJK2 25 HS3 28 HTQuerying table data succeeded.Start to delete the table.Table deleted successfully.=========================== Hive Example End ===========================
  • MapReduce应用开发流程介绍 开发流程中各阶段的说明如图1和表1所示。 图1 MapReduce应用程序开发流程 表1 MapReduce应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解MapReduce的基本概念。 MapReduce应用开发简介 准备开发和运行环境 使用IntelliJ IDEA工具,请根据指导完成开发环境配置。 MapReduce的运行环境即MapReduce客户端,请根据指导完成客户端的安装和配置。 准备MapReduce开发和运行环境 准备工程 MapReduce提供了不同场景下的样例程序,您可以导入样例工程进行程序学习。或者您可以根据指导,新建一个MapReduce工程。 导入并配置MapReduce样例工程 (可选)创建MapReduce样例工程 根据场景开发工程 提供了样例工程。 帮助用户快速了解MapReduce各部件的编程接口。 开发MapReduce应用 编译并运行程序 指导用户将开发好的程序编译并提交运行。 调测MapReduce应用 查看程序运行结果 程序运行结果会写在用户指定的路径下。用户还可以通过UI查看应用运行情况。 调测MapReduce应用 父主题: MapReduce开发指南(普通模式)
  • Oozie应用开发流程 本文档主要基于Java API对Oozie进行应用开发。 开发流程中各阶段的说明如图1和表1所示。 图1 Oozie应用程序开发流程 表1 Oozie应用开发的流程说明 阶段 说明 参考文档 了解基本概念 在开始开发应用前,需要了解Oozie的基本概念,了解场景需求等。 Oozie应用开发常见概念 准备开发和运行环境 Oozie的应用程序当前推荐使用Java语言进行开发。可使用IDEA工具。 准备本地应用开发环境 准备工程 Oozie提供了不同场景下的样例程序,您可以导入样例工程进行程序学习。 导入并配置Oozie样例工程 根据场景开发工程 提供了Java语言的样例工程。 开发Oozie应用 编译并运行程序 指导用户将开发好的程序编译并提交运行。 调测Oozie应用 查看程序运行结果 程序运行结果会写在用户指定的路径下。用户还可以通过UI查看应用运行情况。 父主题: Oozie开发指南(普通模式)
  • 功能介绍 主要分为三个部分: 从HDFS原文件中抽取name信息,查询HBase、Hive相关数据,并进行数据拼接,通过类MultiComponentMapper继承Mapper抽象类实现。 获取拼接后的数据取最后一条输出到HBase、HDFS,通过类MultiComponentReducer继承Reducer抽象类实现。 main方法提供建立一个MapReduce job,并提交MapReduce作业到Hadoop集群。
  • 数据规划 确保以多主实例模式启动了JD BCS erver服务,并至少有一个实例可连接客户端。在JDB CS erver节点上分别创建“/home/data”文件,内容如下: Miranda,32Karlie,23Candice,27 确保其对启动JDBCServer的用户有读写权限。 确保客户端classpath下有“hive-site.xml”文件,且根据实际集群情况配置所需要的参数。JDBCServer相关参数详情,请参见Spark JDBCServer接口介绍。
  • 运行任务 进入Spark客户端目录,使用java -cp命令运行代码(类名与文件名等请与实际代码保持一致,此处仅为示例): 运行Java样例代码: java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/hive/*:$SPARK_HOME/conf:/opt/female/SparkThriftServerJavaExample-1.0.jar com.huawei.bigdata.spark.examples.ThriftServerQueriesTest $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/conf/spark-defaults.conf 运行Scala样例代码: java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/hive/*:$SPARK_HOME/conf:/opt/female/SparkThriftServerExample-1.0.jar com.huawei.bigdata.spark.examples.ThriftServerQueriesTest $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/conf/spark-defaults.conf 集群开启ZooKeeper的SSL特性后(查看ZooKeeper服务的ssl.enabled参数),请在执行命令中添加-Dzookeeper.client.secure=true -Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty两项参数: java -Dzookeeper.client.secure=true -Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/hive/*:$SPARK_HOME/conf:/opt/female/SparkThriftServerJavaExample-1.0.jar com.huawei.bigdata.spark.examples.ThriftServerQueriesTest $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/conf/spark-defaults.conf
  • 操作步骤 在Windows本地运行程序,需要配置https ssl证书。 登录集群任意节点,进入如下目录下载ca.crt文件。 cd ${BIGDATA_HOME}/om-agent_8.1.2.2/nodeagent/security/cert/subcert/certFile/ 将ca.crt文件下载到本地,以管理员的身份打开cmd。 输入如下命令: keytool -import -v -trustcacerts -alias ca -file "D:\xx\ca.crt" -storepass changeit -keystore "%JAVA_HOME%\jre\lib\security\cacerts" 其中“D:\xx\ca.crt”是实际ca.crt文件存放路径;“%JAVA_HOME% ”为jdk安装路径。 在开发环境中(例如IDEA中),右击OozieRestApiMain.java,单击“Run 'OozieRestApiMain.main()'”运行对应的应用程序工程。 使用Oozie客户端执行以下命令: oozie job -oozie https://Oozie业务IP:21003/oozie -config job.properties -run 其中需要提前将待使用样例工程目录“src\main\resources”中的“job.properties”文件复制到Oozie客户端所在目录。
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行。 yarn-client模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --jars /opt/female/protobuf-java-2.5.0.jar --conf spark.yarn.user.classpath.first=true --class com.huawei.bigdata.spark.examples.datasources.HBaseSource SparkOnHbaseJavaExample-1.0.jar python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --conf spark.yarn.user.classpath.first=true --jars SparkOnHbaseJavaExample-1.0.jar,/opt/female/protobuf-java-2.5.0.jar HBaseSource.py yarn-cluster模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --jars /opt/female/protobuf-java-2.5.0.jar --conf spark.yarn.user.classpath.first=true --class com.huawei.bigdata.spark.examples.datasources.HBaseSource SparkOnHbaseJavaExample-1.0.jar python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.user.classpath.first=true --jars SparkOnHbaseJavaExample-1.0.jar,/opt/female/protobuf-java-2.5.0.jar HBaseSource.py
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中的HBaseSource文件: # -*- coding:utf-8 -*-"""【说明】由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现"""from py4j.java_gateway import java_importfrom pyspark.sql import SparkSession# 创建SparkSessionspark = SparkSession\ .builder\ .appName("HBaseSourceExample")\ .getOrCreate()# 向sc._jvm中导入要运行的类java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.datasources.HBaseSource')# 创建类实例并调用方法,传递sc._jsc参数spark._jvm.HBaseSource().execute(spark._jsc)# 停止SparkSessionspark.stop()
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseBulkLoadPythonExample文件: # -*- coding:utf-8 -*-"""【说明】由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现"""from py4j.java_gateway import java_importfrom pyspark.sql import SparkSession# 创建SparkSessionspark = SparkSession\ .builder\ .appName("JavaHBaseBulkLoadExample")\ .getOrCreate()# 向sc._jvm中导入要运行的类java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.HBaseBulkLoadPythonExample')# 创建类实例并调用方法,传递sc._jsc参数spark._jvm.HBaseBulkLoadPythonExample().hbaseBulkLoad(spark._jsc, sys.argv[1], sys.argv[2])# 停止SparkSessionspark.stop()
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
  • 场景说明 用户可以在Spark应用程序中使用HBaseContext的方式去使用HBase,将要插入的数据的rowKey构造成rdd,然后通过HBaseContext的bulkLoad接口将rdd写入HFile中。将生成的HFile导入HBase表的操作采用如下格式的命令,不属于本接口范围,不在此进行详细说明: hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles {hfilePath} {tableName}
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkLoadExample SparkOnHbaseJavaExample-1.0.jar /tmp/hfile bulkload-table-test python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample-1.0.jar HBaseBulkLoadExample.py /tmp/hfile bulkload-table-test yarn-cluster模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkLoadExample SparkOnHbaseJavaExample-1.0.jar /tmp/hfile bulkload-table-test python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --jars SparkOnHbaseJavaExample-1.0.jar HBaseBulkLoadExample.py /tmp/hfile bulkload-table-test
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
  • 场景说明 假定Hive的person表存储用户当天消费的金额信息,HBase的table2表存储用户历史消费的金额信息。 现person表有记录name=1,account=100,表示用户1在当天消费金额为100元。 table2表有记录key=1,cf:cid=1000,表示用户1的历史消息记录金额为1000元。 基于某些业务要求,要求开发Spark应用程序实现如下功能: 根据用户名累计用户的历史消费金额,即用户总消费金额=100(用户当天的消费金额) + 1000(用户历史消费金额)。 上例所示,运行结果table2表用户key=1的总消费金额为cf:cid=1100元。
  • 运行任务 进入Spark客户端目录,调用bin/spark-submit脚本运行代码,运行命令分别如下(类名与文件名等请与实际代码保持一致,此处仅为示例): 运行Java或Scala样例代码 bin/spark-submit --class com.huawei.bigdata.spark.examples.SparkHivetoHbase --master yarn --deploy-mode client /opt/female/SparkHivetoHbase-1.0.jar 运行Python样例程序 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现。将所提供 Java代码使用maven打包成jar,并放在相同目录下,运行python程序时要使用--jars把jar包加载到classpath中。 bin/spark-submit --master yarn --deploy-mode client --jars /opt/female/SparkHivetoHbasePythonExample/SparkHivetoHbase-1.0.jar /opt/female/SparkHivetoHbasePythonExample/SparkHivetoHbasePythonExample.py
  • 数据规划 在开始开发应用前,需要创建Hive表,命名为person,并插入数据。同时,创建HBase table2表,用于将分析后的数据写入。 将原日志文件放置到HDFS系统中。 在本地新建一个空白的log1.txt文件,并在文件内写入如下内容: 1,100 在HDFS中新建一个目录/tmp/input,并将log1.txt文件上传至此目录。 在Linux系统HDFS客户端使用命令hadoop fs -mkdir /tmp/input(hdfs dfs命令有同样的作用),创建对应目录。 在Linux系统HDFS客户端使用命令hadoop fs -putlog1.txt /tmp/input,上传数据文件。 将导入的数据放置在Hive表里。 首先,确保JDBCServer已启动。然后使用Beeline工具,创建Hive表,并插入数据。 执行如下命令,创建命名为person的Hive表。 create table person ( name STRING, account INT ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY '\\' STORED AS TEXTFILE; 执行如下命令插入数据。 load data inpath '/tmp/input/log1.txt' into table person; 创建HBase表。 确保JDBCServer已启动,然后使用Spark-beeline工具,创建HBase表,并插入数据。 执行如下命令,创建命名为table2的HBase表。 create table table2 ( key string, cid string ) using org.apache.spark.sql.hbase.HBaseSource options( hbaseTableName "table2", keyCols "key", colsMapping "cid=cf.cid"); 通过HBase插入数据,执行如下命令。 put 'table2', '1', 'cf:cid', '1000'
  • 场景说明 假定HBase的table1表存储用户当天消费的金额信息,table2表存储用户历史消费的金额信息。 现table1表有记录key=1,cf:cid=100,表示用户1在当天消费金额为100元。 table2表有记录key=1,cf:cid=1000,表示用户1的历史消息记录金额为1000元。 基于某些业务要求,要求开发Spark应用程序实现如下功能: 根据用户名累计用户的历史消费金额,即用户总消费金额=100(用户当天的消费金额) + 1000(用户历史消费金额)。 上例所示,运行结果table2表用户key=1的总消费金额为cf:cid=1100元。
  • 数据规划 使用Spark-Beeline工具创建Spark和HBase表table1、table2,并通过HBase插入数据。 确保JDBCServer已启动。登录Spark2x客户端节点。 使用Spark-Beeline工具创建Spark表table1。 create table table1 ( key string, cid string ) using org.apache.spark.sql.hbase.HBaseSource options( hbaseTableName "table1", keyCols "key", colsMapping "cid=cf.cid"); 通过HBase插入数据,命令如下: put 'table1', '1', 'cf:cid', '100' 使用Spark-Beeline工具创建Spark表table2。 create table table2 ( key string, cid string ) using org.apache.spark.sql.hbase.HBaseSource options( hbaseTableName "table2", keyCols "key", colsMapping "cid=cf.cid"); 通过HBase插入数据,命令如下: put 'table2', '1', 'cf:cid', '1000'
  • 运行任务 进入Spark客户端目录,调用bin/spark-submit脚本运行代码,运行命令分别如下(类名与文件名等请与实际代码保持一致,此处仅为示例): 运行Java或Scala样例代码 bin/spark-submit --jars --conf spark.yarn.user.classpath.first=true --class com.huawei.bigdata.spark.examples.SparkHbasetoHbase --master yarn --deploy-mode client /opt/female/SparkHbasetoHbase-1.0.jar 运行Python样例程序 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现。将所提供 Java代码使用maven打包成jar,并放在相同目录下,运行python程序时要使用--jars把jar包加载到classpath中。 bin/spark-submit --master yarn --deploy-mode client --conf spark.yarn.user.classpath.first=true --jars /opt/female/SparkHbasetoHbasePythonExample/SparkHbasetoHbase-1.0.jar,/opt/female/protobuf-java-2.5.0.jar /opt/female/SparkHbasetoHbasePythonExample/SparkHbasetoHbasePythonExample.py
  • 回答 问题原因: 在IBM JDK下建立的JDBC connection时间超过登录用户的认证超时时间(默认一天),导致认证失败。 IBM JDK的机制跟Oracle JDK的机制不同,IBM JDK在认证登录后的使用过程中做了时间检查却没有检测外部的时间更新,导致即使显式调用relogin也无法得到刷新。 解决措施: 通常情况下,在发现JDBC connection不可用的时候,可以关闭该connection,重新创建一个connection继续执行。
  • 回答 第三方jar包(例如自定义udf)区分x86和TaiShan版本时,混合使用方案: 进入到服务端spark2x sparkResource的安装目录(这个集群安装过程中可能会安装在多个节点上,随便进入一个安装节点,cd到sparkResource的安装目录)。 准备好自己的jar包例如xx.jar的x86版本和TaiShan版本。将x86版本和TaiShan版本的xx.jar分别复制到当前目录的x86文件夹和TaiShan文件夹里面。 在当前目录下执行以下命令将jar包打包: zip -qDj spark-archive-2x-x86.zip x86/* zip -qDj spark-archive-2x-arm.zip arm/* 执行以下命令查看hdfs上的spark2x依赖的jar包: hdfs dfs -ls /user/spark2x/jars/8.1.0.1 8.1.0.1是版本号,不同版本不同。 执行以下命令移动hdfs上旧的jar包文件到其他目录,例如移动到“tmp”目录。 hdfs dfs -mv /user/spark2x/jars/8.1.0.1/spark-archive-2x-arm.zip /tmp hdfs dfs -mv /user/spark2x/jars/8.1.0.1/spark-archive-2x-x86.zip /tmp 上传3中打包的spark-archive-2x-arm.zip和spark-archive-2x-x86.zip到hdfs的/user/spark2x/jars/8.1.0.1目录下,上传命令如下: hdfs dfs -put spark-archive-2x-arm.zip /user/spark2x/jars/8.1.0.1/ hdfs dfs -put spark-archive-2x-x86.zip /user/spark2x/jars/8.1.0.1/ 上传完毕后删除本地的spark-archive-2x-arm.zip,spark-archive-2x-x86.zip文件。 对其他的SparkResource安装节点执行1~2。 进入WebUI重启Spark的JDBCServer实例。 重启后,需要更新客户端配置。按照客户端所在的机器类型(x86、TaiShan)复制xx.jar的相应版本到客户端的spark2x安装目录${install_home}/Spark2x/spark/jars文件夹中。${install_home}是用户的客户端安装路径,用户需要填写实际的安装目录;若本地的安装目录为/opt/hadoopclient,那么就复制相应版本xx.jar到/opt/hadoopclient/Spark2x/spark/jars文件夹里。
  • 解决方案 提交yarn-client模式的结构流任务时需要额外如下操作: 将Spark客户端目录下spark-default.conf文件中的spark.driver.extraClassPath配置复制出来,并将Kafka相关jar包路径追加到该配置项之后,提交结构流任务时需要通过--conf将该配置项给加上。例如:kafka相关jar包路径为“/kafkadir”,提交任务需要增加--conf spark.driver.extraClassPath=/opt/client/Spark2x/spark/conf/:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/x86/*:/kafkadir/*。 提交yarn-cluster模式的结构流任务时需要额外如下操作: 将Spark客户端目录下spark-default.conf文件中的spark.yarn.cluster.driver.extraClassPath配置给复制出来,并将Kafka相关jar包相对路径追加到该配置项之后,提交结构流任务时需要通过--conf 将该配置项给加上。例如:kafka相关包为kafka-clients-x.x.x.jar,kafka_2.11-x.x.x.jar,提交任务需要增加--conf spark.yarn.cluster.driver.extraClassPath=/home/huawei/Bigdata/common/runtime/security:./kafka-clients-x.x.x.jar:./kafka_2.11-x.x.x.jar。 当前版本Spark结构流部分不再支持kafka2.x之前的版本,对于升级场景请继续使用旧的客户端。
  • REST接口 通过以下命令可跳过REST接口过滤器获取相应的应用信息。 普通模式下,JobHistory仅支持http协议,故在如下命令的url中请使用http协议。 获取JobHistory中所有应用信息: 命令: curl http://192.168.227.16:4040/api/v1/applications?mode=monitoring --insecure 其中192.168.227.16为JobHistory节点的业务IP,4040为JobHistory的端口号。 结果: [ { "id" : "application_1517290848707_0008", "name" : "Spark Pi", "attempts" : [ { "startTime" : "2018-01-30T15:05:37.433CST", "endTime" : "2018-01-30T15:06:04.625CST", "lastUpdated" : "2018-01-30T15:06:04.848CST", "duration" : 27192, "sparkUser" : "sparkuser", "completed" : true, "startTimeEpoch" : 1517295937433, "endTimeEpoch" : 1517295964625, "lastUpdatedEpoch" : 1517295964848 } ]}, { "id" : "application_1517290848707_0145", "name" : "Spark shell", "attempts" : [ { "startTime" : "2018-01-31T15:20:31.286CST", "endTime" : "1970-01-01T07:59:59.999CST", "lastUpdated" : "2018-01-31T15:20:47.086CST", "duration" : 0, "sparkUser" : "admintest", "completed" : false, "startTimeEpoch" : 1517383231286, "endTimeEpoch" : -1, "lastUpdatedEpoch" : 1517383247086 } ]}] 结果分析: 通过这个命令,可以查询当前集群中所有的Spark应用(包括正在运行的应用和已经完成的应用),每个应用的信息如下表1。 表1 应用常用信息 参数 描述 id 应用的ID name 应用的Name attempts 应用的尝试,包含了开始时间、结束时间、执行用户、是否完成等信息 获取JobHistory中某个应用的信息: 命令: curl http://192.168.227.16:4040/api/v1/applications/application_1517290848707_0008?mode=monitoring --insecure 其中192.168.227.16为JobHistory节点的业务IP,4040为JobHistory的端口号,application_1517290848707_0008为应用的id。 结果: { "id" : "application_1517290848707_0008", "name" : "Spark Pi", "attempts" : [ { "startTime" : "2018-01-30T15:05:37.433CST", "endTime" : "2018-01-30T15:06:04.625CST", "lastUpdated" : "2018-01-30T15:06:04.848CST", "duration" : 27192, "sparkUser" : "sparkuser", "completed" : true, "startTimeEpoch" : 1517295937433, "endTimeEpoch" : 1517295964625, "lastUpdatedEpoch" : 1517295964848 } ]} 结果分析: 通过这个命令,可以查询某个Spark应用的信息,显示的信息如表1所示。 获取正在执行的某个应用的Executor信息: 针对alive executor命令: curl http://192.168.169.84:8088/proxy/application_1478570725074_0046/api/v1/applications/application_1478570725074_0046/executors?mode=monitoring --insecure 针对全部executor(alive&dead)命令: curl http://192.168.169.84:8088/proxy/application_1478570725074_0046/api/v1/applications/application_1478570725074_0046/allexecutors?mode=monitoring --insecure 其中192.168.195.232为ResourceManager主节点的业务IP,8088为ResourceManager的端口号,application_1478570725074_0046为在YARN中的应用ID。 结果: [{ "id" : "driver", "hostPort" : "192.168.169.84:23886", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, "diskUsed" : 0, "activeTasks" : 0, "failedTasks" : 0, "completedTasks" : 0, "totalTasks" : 0, "totalDuration" : 0, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "maxMemory" : 278019440, "executorLogs" : { }}, { "id" : "1", "hostPort" : "192.168.169.84:23902", "isActive" : true, "rddBlocks" : 0, "memoryUsed" : 0, "diskUsed" : 0, "totalCores" : 1, "maxTasks" : 1, "activeTasks" : 0, "failedTasks" : 0, "completedTasks" : 0, "totalTasks" : 0, "totalDuration" : 0, "totalGCTime" : 139, "totalInputBytes" : 0, "totalShuffleRead" : 0, "totalShuffleWrite" : 0, "maxMemory" : 555755765, "executorLogs" : { "stdout" : "https://XTJ-224:8044/node/containerlogs/container_1478570725074_0049_01_000002/admin/stdout?start=-4096", "stderr" : "https://XTJ-224:8044/node/containerlogs/container_1478570725074_0049_01_000002/admin/stderr?start=-4096" }} ] 结果分析: 通过这个命令,可以查询当前应用的所有Executor信息(包括Driver),每个Executor的信息包含如下表2所示的常用信息。 表2 Executor常用信息 参数 描述 id Executor的ID hostPort Executor所在节点的ip:端口 executorLogs Executor的日志查看路径
  • 功能简介 Spark的REST API以JSON格式展现Web UI的一些指标,提供用户一种更简单的方法去创建新的展示和监控的工具,并且支持查询正在运行的app和已经结束的app的相关信息。开源的Spark REST接口支持对Jobs、Stages、Storage、Environment和Executors的信息进行查询, FusionInsight 版本中添加了查询SQL、JDBC Server和Streaming的信息的REST接口。开源REST接口完整和详细的描述请参考官网上的文档以了解其使用方法:https://archive.apache.org/dist/spark/docs/3.3.1/monitoring.html#rest-api。
  • REST API增强 SQL相关的命令:获取所有SQL语句和执行时间最长的SQL语句 SparkUI命令: curl http://192.168.195.232:8088/proxy/application_1476947670799_0053/api/v1/applications/application_1476947670799_0053/SQL?mode=monitoring --insecure 其中192.168.195.232为ResourceManager主节点的业务IP,8088为ResourceManager的端口号,application_1476947670799_0053为在YARN中的应用ID。 JobHistory命令: curl http://192.168.227.16:4040/api/v1/applications/application_1478570725074_0004/SQL?mode=monitoring --insecure 其中192.168.227.16为JobHistory节点的业务IP,4040为JobHistory的端口号,application_1478570725074_0004为应用ID。 结果: SparkUI命令和JobHistory命令的查询结果均为: { "longestDurationOfCompletedSQL" : [ { "id" : 0, "status" : "COMPLETED", "description" : "getCallSite at SQLExecution.scala:48", "submissionTime" : "2016/11/08 15:39:00", "duration" : "2 s", "runningJobs" : [ ], "successedJobs" : [ 0 ], "failedJobs" : [ ] } ], "sqls" : [ { "id" : 0, "status" : "COMPLETED", "description" : "getCallSite at SQLExecution.scala:48", "submissionTime" : "2016/11/08 15:39:00", "duration" : "2 s", "runningJobs" : [ ], "successedJobs" : [ 0 ], "failedJobs" : [ ] }]} 结果分析: 通过这个命令,可以查询当前应用的所有SQL语句的信息(即结果中“sqls”的部分),执行时间最长的SQL语句的信息(即结果中“longestDurationOfCompletedSQL”的部分)。每个SQL语句的信息如下表3。 表3 SQL的常用信息 参数 描述 id SQL语句的ID status SQL语句的执行状态,有RUNNING、COMPLETED、FAILED三种 runningJobs SQL语句产生的job中,正在执行的job列表 successedJobs SQL语句产生的job中,执行成功的job列表 failedJobs SQL语句产生的job中,执行失败的job列表 JDBC Server相关的命令:获取连接数,正在执行的SQL数,所有session信息,所有SQL的信息 命令: curl http://192.168.195.232:8088/proxy/application_1476947670799_0053/api/v1/applications/application_1476947670799_0053/sqlserver?mode=monitoring --insecure 其中192.168.195.232为ResourceManager主节点的业务IP,8088为ResourceManager的端口号,application_1476947670799_0053为在YARN中的应用ID。 结果: { "sessionNum" : 1, "runningSqlNum" : 0, "sessions" : [ { "user" : "spark", "ip" : "192.168.169.84", "sessionId" : "9dfec575-48b4-4187-876a-71711d3d7a97", "startTime" : "2016/10/29 15:21:10", "finishTime" : "", "duration" : "1 minute 50 seconds", "totalExecute" : 1 } ], "sqls" : [ { "user" : "spark", "jobId" : [ ], "groupId" : "e49ff81a-230f-4892-a209-a48abea2d969", "startTime" : "2016/10/29 15:21:13", "finishTime" : "2016/10/29 15:21:14", "duration" : "555 ms", "statement" : "show tables", "state" : "FINISHED", "detail" : "== Parsed Logical Plan ==\nShowTablesCommand None\n\n== Analyzed Logical Plan ==\ntableName: string, isTemporary: boolean\nShowTablesCommand None\n\n== Cached Logical Plan ==\nShowTablesCommand None\n\n== Optimized Logical Plan ==\nShowTablesCommand None\n\n== Physical Plan ==\nExecutedCommand ShowTablesCommand None\n\nCode Generation: true" } ]} 结果分析: 通过这个命令,可以查询当前JDBC应用的session连接数,正在执行的SQL数,所有的session和SQL信息。每个session的信息如下表4,每个SQL的信息如下表5。 表4 session常用信息 参数 描述 user 该session连接的用户 ip session所在的节点IP sessionId session的ID startTime session开始连接的时间 finishTime session结束连接的时间 duration session连接时长 totalExecute 在该session上执行的SQL数 表5 sql常用信息 参数 描述 user SQL执行的用户 jobId SQL语句包含的job id列表 groupId SQL所在的group id startTime SQL开始时间 finishTime SQL结束时间 duration SQL执行时长 statement 对应的语句 detail 对应的逻辑计划,物理计划 Streaming相关的命令:获取平均输入频率,平均调度时延,平均执行时长,总时延平均值 命令: curl http://192.168.195.232:8088/proxy/application_1477722033672_0008/api/v1/applications/application_1477722033672_0008/streaming/statistics?mode=monitoring --insecure 其中192.168.195.232为ResourceManager主节点的业务IP,8088为ResourceManager的端口号,application_1477722033672_0008为在YARN中的应用ID。 结果: {"startTime" : "2018-12-25T08:58:10.836GMT", "batchDuration" : 1000, "numReceivers" : 1, "numActiveReceivers" : 1, "numInactiveReceivers" : 0, "numTotalCompletedBatches" : 373, "numRetainedCompletedBatches" : 373, "numActiveBatches" : 0, "numProcessedRecords" : 1, "numReceivedRecords" : 1, "avgInputRate" : 0.002680965147453083, "avgSchedulingDelay" : 14, "avgProcessingTime" : 47, "avgTotalDelay" : 62} 结果分析: 通过这个命令,可以查询当前Streaming应用的平均输入频率(events/sec),平均调度时延(ms),平均执行时长(ms),总时延平均值(ms)。
  • 常用接口 YARN常用的Java类有如下几个。 ApplicationClientProtocol 用于Client与ResourceManager之间。Client通过该协议可实现将应用程序提交到ResourceManager上,查询应用程序的运行状态或者中止应用程序等功能。 表1 ApplicationClientProtocol常用方法 方法 说明 forceKillApplication(KillApplicationRequest request) Client通过此接口请求RM中止一个已提交的任务。 getApplicationAttemptReport(GetApplicationAttemptReportRequest request) Client通过此接口从RM获取指定ApplicationAttempt的报告信息。 getApplicationAttempts(GetApplicationAttemptsRequest request) Client通过此接口从RM获取所有ApplicationAttempt的报告信息。 getApplicationReport(GetApplicationReportRequest request) Client通过此接口从RM获取某个应用的报告信息。 getApplications(GetApplicationsRequest request) Client通过此接口从RM获取满足一定过滤条件的应用的报告信息。 getClusterMetrics(GetClusterMetricsRequest request) Client通过此接口从RM获取集群的Metrics。 getClusterNodes(GetClusterNodesRequest request) Client通过此接口从RM获取集群中的所有节点信息。 getContainerReport(GetContainerReportRequest request) Client通过此接口从RM获取某个Container的报告信息。 getContainers(GetContainersRequest request) Client通过此接口从RM获取某个ApplicationAttemp的所有Container的报告信息。 getDelegationToken(GetDelegationTokenRequest request) Client通过此接口获取授权票据,用于container访问相应的service。 getNewApplication(GetNewApplicationRequest request) Client通过此接口获取一个新的应用ID号,用于提交新的应用。 getQueueInfo(GetQueueInfoRequest request) Client通过此接口从RM中获取队列的相关信息。 getQueueUserAcls(GetQueueUserAclsInfoRequest request) Client通过此接口从RM中获取当前用户的队列访问权限信息。 moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest request) 移动一个应用到新的队列。 submitApplication(SubmitApplicationRequest request) Client通过此接口提交一个新的应用到RM。 ApplicationMasterProtocol 用于ApplicationMaster与ResourceManager之间。ApplicationMaster使用该协议向ResourceManager注册、申请资源、获取各个任务的运行情况等。 表2 ApplicationMasterProtocol常用方法 方法 说明 allocate(AllocateRequest request) AM通过此接口提交资源分配申请。 finishApplicationMaster(FinishApplicationMasterRequest request) AM通过此接口通知RM其运行成功或者失败。 registerApplicationMaster(RegisterApplicationMasterRequest request) AM通过此接口向RM进行注册。 ContainerManagementProtocol 用于ApplicationMaster与NodeManager之间。ApplicationMaster使用该协议要求NodeManager启动/中止Container或者查询Container的运行状态。 表3 ContainerManagementProtocol常用方法 方法 说明 getContainerStatuses(GetContainerStatusesRequest request) AM通过此接口向NM请求Containers的当前状态信息。 startContainers(StartContainersRequest request) AM通过此接口向NM提供需要启动的containers列表的请求。 stopContainers(StopContainersRequest request) AM通过此接口请求NM停止一系列已分配的Containers。
  • 操作步骤 编译JDBC样例程序: 在IDEA界面左下方单击“Terminal”进入终端,执行命令mvn clean package进行编译。 当输出“BUILD SUCCESS”,表示编译成功,如下图所示。编译成功后将会在样例工程的target下生成含有“-with-dependencies”字段的jar包。 运行JDBC样例程序: 在Linux上创建一个目录作为运行目录,如“/opt/jdbc_example”,将1中生成的“target”目录下包名中含有“-with-dependencies”字段的Jar包放进该路径下,并在该目录下创建子目录“src/main/resources”,将已获取的“hive-jdbc-example\src\main\resources”目录下的所有文件复制到“resources”下。 执行以下命令运行Jar包: chmod +x /opt/jdbc_example -R cd /opt/jdbc_example java -jar hive-jdbc-example-1.0-SNAPSHOT-jar-with-dependencies.jar 以上Jar包名称仅供参考,具体名称以实际生成为主。 在命令行终端查看样例代码中的HQL所查询出的结果,运行成功会显示如下信息: Create table success!_c00Delete table success!
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全