华为云用户手册

  • 操作步骤 获取运行在Yarn上的任务的具体信息。 命令: curl -k -i --negotiate -u : "https://10-120-85-2:8090/ws/v1/cluster/apps/" 其中10-120-85-2为ResourceManager主节点的hostname,8090为ResourceManager的端口号。 用户能看到哪个队列的任务,要看这个用户是否有这个队列的admin权限。 如果当前组件使用了Ranger进行权限控制,需基于Ranger配置相关策略进行权限管理。 运行结果: { "apps": { "app": [ { "id": "application_1461743120947_0001", "user": "spark", "name": "Spark-JD BCS erver", "queue": "default", "state": "RUNNING", "finalStatus": "UNDEFINED", "progress": 10, "trackingUI": "ApplicationMaster", "trackingUrl": "https://10-120-85-2:8090/proxy/application_1461743120947_0001/", "diagnostics": "AM is launched. ", "clusterId": 1461743120947, "applicationType": "SPARK", "applicationTags": "", "startedTime": 1461804906260, "finishedTime": 0, "elapsedTime": 6888848, "amContainerLogs": "https://10-120-85-2:8044/node/containerlogs/container_e12_1461743120947_0001_01_000001/spark", "amHostHttpAddress": "10-120-85-2:8044", "allocatedMB": 1024, "allocatedVCores": 1, "runningContainers": 1, "memorySeconds": 7053309, "vcoreSeconds": 6887, "preemptedResourceMB": 0, "preemptedResourceVCores": 0, "numNonAMContainerPreempted": 0, "numAMContainerPreempted": 0, "resourceRequests": [ { "capability": { "memory": 1024, "virtualCores": 1 }, "nodeLabelExpression": "", "numContainers": 0, "priority": { "priority": 0 }, "relaxLocality": true, "resourceName": "*" } ], "logAggregationStatus": "NOT_START", "amNodeLabelExpression": "" }, { "id": "application_1461722876897_0002", "user": "admin", "name": "QuasiMonteCarlo", "queue": "default", "state": "FINISHED", "finalStatus": "SUCCEEDED", "progress": 100, "trackingUI": "History", "trackingUrl": "https://10-120-85-2:8090/proxy/application_1461722876897_0002/", "diagnostics": "Attempt recovered after RM restart", "clusterId": 1461743120947, "applicationType": "MAPREDUCE", "applicationTags": "", "startedTime": 1461741052993, "finishedTime": 1461741079483, "elapsedTime": 26490, "amContainerLogs": "https://10-120-85-2:8044/node/containerlogs/container_e11_1461722876897_0002_01_000001/admin", "amHostHttpAddress": "10-120-85-2:8044", "allocatedMB": -1, "allocatedVCores": -1, "runningContainers": -1, "memorySeconds": 158664, "vcoreSeconds": 52, "preemptedResourceMB": 0, "preemptedResourceVCores": 0, "numNonAMContainerPreempted": 0, "numAMContainerPreempted": 0, "amNodeLabelExpression": "" } ] } } 结果分析: 通过这个接口,可以查询当前集群中Yarn上的任务,并且可以得到如下表1。 表1 常用信息 参数 参数描述 user 运行这个任务的用户。 applicationType 例如MAPREDUCE或者SPARK等。 finalStatus 可以知道任务是成功还是失败。 elapsedTime 任务运行的时间。 获取Yarn资源的总体信息。 命令: curl -k -i --negotiate -u : "https://10-120-85-102:8090/ws/v1/cluster/metrics" 运行结果: { "clusterMetrics": { "appsSubmitted": 2, "appsCompleted": 1, "appsPending": 0, "appsRunning": 1, "appsFailed": 0, "appsKilled": 0, "reservedMB": 0, "availableMB": 23552, "allocatedMB": 1024, "reservedVirtualCores": 0, "availableVirtualCores": 23, "allocatedVirtualCores": 1, "containersAllocated": 1, "containersReserved": 0, "containersPending": 0, "totalMB": 24576, "totalVirtualCores": 24, "totalNodes": 3, "lostNodes": 0, "unhealthyNodes": 0, "decommissionedNodes": 0, "rebootedNodes": 0, "activeNodes": 3, "rmMainQueueSize": 0, "schedulerQueueSize": 0, "stateStoreQueueSize": 0 } } 结果分析: 通过这个接口,可以查询当前集群中如表2。 表2 常用信息 参数 参数描述 appsSubmitted 已经提交的任务数。 appsCompleted 已经完成的任务数。 appsPending 正在挂起的任务数。 appsRunning 正在运行的任务数。 appsFailed 已经失败的任务数。 appsKilled 已经被kill的任务数。 totalMB Yarn资源总的内存。 totalVirtualCores Yarn资源总的VCore数。
  • 功能简介 通过调用“org.apache.hadoop.hbase.hindex.global.GlobalIndexAdmin”中的方法进行HBase全局二级索引的管理,该类中addIndices用于创建全局二级索引。 全局二级索引的创建需要指定索引列、覆盖列(可选)、索引表预分区(可选,建议指定)。 在已有存量数据的表上创建全局二级索引,需要创建索引预分区,防止索引表出现热点,索引表数据的rowkey由索引列构成,并且包含分隔符,格式为“\x01索引值\x00”,因此预分区需要指定成对应格式,例如,当使用id列和age列作为索引列时,两个列均为整数,使用id列完成预分区,可以指定索引表预分区点为: \x010,\x011,\x012....
  • 创建Doris连接 以下代码片段在“JDBCExample”类的“createConnection”方法中。 USER和PASSWD为在创建连接时用于进行安全认证的用户名和密码。 Class.forName(JDBC_DRIVER); String dbUrl = String.format(DB_URL_PATTERN, HOST, PORT); connection = DriverManager.getConnection(dbUrl, USER, PASSWD); 父主题: Doris JDBC接口调用样例程序
  • 准备开发环境 在进行二次开发时,要准备的开发和运行环境如表1所示。 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows 7以上版本。 运行环境:Windows系统或Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置,版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端:Oracle JDK:支持1.8版本;IBM JDK:支持1.8.5.11版本。 TaiShan客户端:OpenJDK:支持1.8.0_272版本。 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装和配置IntelliJ IDEA 用于开发HBase应用程序的工具。版本要求:2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 安装Junit插件 开发环境的基本配置。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 7-zip 用于解压“*.zip”和“*.rar”文件。 支持7-Zip 16.04版本。
  • 代码样例 以下代码片段是查找用户的示例,在“rest”包的“UserManager”类的main方法中。 //访问Manager接口完成查找用户列表 operationName = "QueryUserList"; operationUrl = webUrl + QUERY_USER_LIST_URL; String responseLineContent = httpManager.sendHttpGetRequest(httpClient, operationUrl, operationName); LOG .info("The {} response is {}.", operationName, responseLineContent);
  • MRS 组件应用开发流程说明 通常MRS组件应用开发流程如下所示,各组件应用的开发编译操作可参考组件开发指南对应章节。 图1 MRS组件应用开发流程 表1 MRS组件应用开发流程说明 阶段 说明 准备开发环境 在进行应用开发前,需首先准备开发环境,推荐使用IntelliJ IDEA工具,同时本地需完成JDK、Maven等初始配置。 准备连接集群配置文件 应用程序开发或运行过程中,需通过集群相关配置文件信息连接MRS集群,配置文件通常包括用于安全认证的用户文件,可从已创建好的MRS集群中获取相关内容。 用于程序调测或运行的节点,需要与MRS集群内节点网络互通。 配置并导入样例工程 MRS提供了不同组件场景下的多种样例程序,用户可获取样例工程并导入本地开发环境中进行程序学习。 配置安全认证 连接开启了Kerberos认证的MRS集群时,应用程序中需配置具有相关资源访问权限的用户进行安全认证。 根据业务场景开发程序 根据实际业务场景开发程序,调用组件接口实现对应功能。 编译并运行程序 将开发好的程序编译运行,用户可在本地Windows开发环境中进行程序调测运行,也可以将程序编译为Jar包后,提交到Linux节点上运行。
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseDistributedScanExample文件: # -*- coding:utf-8 -*- # -*- coding:utf-8 -*- """ 【说明】 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现 """ from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession spark = SparkSession\ .builder\ .appName("JavaHBaseDistributedScan")\ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseDistributedScanExample') # 创建类实例并调用方法,传递sc._jsc参数 spark._jvm.JavaHBaseDistributedScan().execute(spark._jsc, sys.argv) # 停止SparkSession spark.stop()
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端$SPARK_HOME目录下,以下命令均在$SPARK_HOME目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseDistributedScanExample SparkOnHbaseJavaExample-1.0.jar ExampleAvrotable python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample-1.0.jar HBaseDistributedScanExample.py ExampleAvrotable yarn-cluster模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseDistributedScanExample SparkOnHbaseJavaExample-1.0.jar ExampleAvrotable python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --jars SparkOnHbaseJavaExample-1.0.jar HBaseDistributedScanExample.py ExampleAvrotable
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
  • 操作步骤 在Windows本地运行程序,需要配置https ssl证书。 登录集群任意节点,进入如下目录下载ca.crt文件。 cd ${BIGDATA_HOME}/om-agent_8.1.2.2/nodeagent/security/cert/subcert/certFile/ 将ca.crt文件下载到本地,以管理员的身份打开cmd。 输入如下命令: keytool -import -v -trustcacerts -alias ca -file "D:\xx\ca.crt" -storepass changeit -keystore "%JAVA_HOME%\jre\lib\security\cacerts" 其中“D:\xx\ca.crt”是实际ca.crt文件存放路径;“%JAVA_HOME% ”为jdk安装路径。 在开发环境中(例如IDEA中),右击OozieRestApiMain.java,单击“Run 'OozieRestApiMain.main()'”运行对应的应用程序工程。 使用Oozie客户端执行以下命令: oozie job -oozie https://Oozie业务IP:21003/oozie -config job.properties -run 其中需要提前将待使用样例工程目录“src\main\resources”中的“job.properties”文件复制到Oozie客户端所在目录。
  • 场景说明 用户可以在Spark应用程序中使用HBaseContext的方式去使用HBase,将要插入的数据的rowKey构造成rdd,然后通过HBaseContext的bulkLoad接口将rdd写入HFile中。将生成的HFile导入HBase表的操作采用如下格式的命令,不属于本接口范围,不在此进行详细说明: hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles {hfilePath} {tableName}
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseBulkLoadPythonExample文件: # -*- coding:utf-8 -*- """ 【说明】 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现 """ from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession spark = SparkSession\ .builder\ .appName("JavaHBaseBulkLoadExample")\ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.HBaseBulkLoadPythonExample') # 创建类实例并调用方法,传递sc._jsc参数 spark._jvm.HBaseBulkLoadPythonExample().hbaseBulkLoad(spark._jsc, sys.argv[1], sys.argv[2]) # 停止SparkSession spark.stop()
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkLoadExample SparkOnHbaseJavaExample-1.0.jar /tmp/hfile bulkload-table-test python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample-1.0.jar HBaseBulkLoadExample.py /tmp/hfile bulkload-table-test yarn-cluster模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkLoadExample SparkOnHbaseJavaExample-1.0.jar /tmp/hfile bulkload-table-test python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --jars SparkOnHbaseJavaExample-1.0.jar HBaseBulkLoadExample.py /tmp/hfile bulkload-table-test
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
  • 准备本地应用开发环境 在进行二次开发时,要准备的开发和运行环境如表1所示。 表1 开发环境 准备项 说明 操作系统 Windows系统,支持Windows 7以上版本。 开发和运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置。版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端:Oracle JDK,支持1.8版本;IBM JDK,支持1.8.5.11版本。 TaiShan客户端:OpenJDK,支持1.8.0_272版本。 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 安装和配置IDEA 开发环境的基本配置,建议使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 不同的IntelliJ IDEA不要使用相同的workspace和相同路径下的示例工程。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 准备开发用户 参考准备MRS应用开发用户进行操作,准备用于应用开发的集群用户并授予相应权限。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-zip 16.04版本。 父主题: 准备Oozie应用开发环境
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的testDelete方法中 public void testDelete() { LOG.info("Entering testDelete."); byte[] rowKey = Bytes.toBytes("012005000201"); Table table = null; try { // Instantiate an HTable object. table = conn.getTable(tableName); // Instantiate an Delete object. Delete delete = new Delete(rowKey); // Submit a delete request. table.delete(delete); LOG.info("Delete table successfully."); } catch (IOException e) { LOG.error("Delete table failed " ,e); } finally { if (table != null) { try { // Close the HTable object. table.close(); } catch (IOException e) { LOG.error("Close table failed " ,e); } } } LOG.info("Exiting testDelete."); } 如果被删除的cell所在的列族上设置了二级索引,也会同步删除索引数据。
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HBaseSample”类的dropTable方法中。 public void dropTable() { LOG.info("Entering dropTable."); Admin admin = null; try { admin = conn.getAdmin(); if (admin.tableExists(tableName)) { // Disable the table before deleting it. admin.disableTable(tableName); // Delete table. admin.deleteTable(tableName);//注[1] } LOG.info("Drop table successfully."); } catch (IOException e) { LOG.error("Drop table failed " ,e); } finally { if (admin != null) { try { // Close the Admin object. admin.close(); } catch (IOException e) { LOG.error("Close admin failed " ,e); } } } LOG.info("Exiting dropTable."); }
  • 解决步骤 检查工程conf目录下“producer.properties”中配置的“bootstrap.servers”配置值中访问的IP和端口是否正确: 如果IP与Kafka集群部署的业务IP不一致,那么需要修改为当前集群正确的IP地址。 如果配置中的端口为21007(Kafka安全模式端口),那么修改该端口为9092(Kafka普通模式端口)。 检查网络是否正常,确保当前机器能够正常访问Kafka集群。
  • 准备开发环境 在进行应用开发时,要准备的开发和运行环境如表1所示。 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,支持Windows7以上版本。 运行环境:Linux系统。 如需在本地调测程序,运行环境需要和集群业务平面网络互通。 安装JDK 开发和运行环境的基本配置。版本要求如下: 服务端和客户端仅支持自带的OpenJDK,版本为1.8.0_272,不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的。 X86客户端:Oracle JDK:支持1.8版本;IBM JDK:支持1.8.5.11版本。 TaiShan客户端:OpenJDK:支持1.8.0_272版本。 说明: 基于安全考虑,服务端只支持TLS V1.2及以上的加密协议。 IBM JDK默认只支持TLS V1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS V1.0/V1.1/V1.2,详情参见https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装和配置IntelliJ IDEA 开发环境的基本配置,建议使用2019.1或其他兼容版本。 说明: 若使用IBM JDK,请确保IntelliJ IDEA中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保IntelliJ IDEA中的JDK配置为Oracle JDK。 若使用Open JDK,请确保IntelliJ IDEA中的JDK配置为Open JDK。 不同的IntelliJ IDEA不要使用相同的workspace和相同路径下的示例工程。 安装Maven 开发环境的基本配置。用于项目管理,贯穿软件开发生命周期。 准备开发用户 准备用于应用开发的ClickHouse集群用户并授予相应权限。 7-zip 用于解压“*.zip”和“*.rar”文件,支持7-Zip 16.04版本。
  • 参数解释 MapReduce Action节点中包含的各参数及其含义,请参见表1。 表1 参数含义 参数 含义 name map-reduce action的名称 resourceManager MapReduce ResourceManager地址 name-node HDFS NameNode地址 queueName 任务处理时使用的MapReduce队列名 mapred.mapper.class Mapper类名 mapred.reducer.class Reducer类名 mapred.input.dir MapReduce处理数据的输入目录 mapred.output.dir MapReduce处理后结果数据输出目录 mapred.map.tasks MapReduce map任务个数 “${变量名}”表示:该值来自“job.properties”所定义。 例如:${nameNode}表示的就是“hdfs://hacluster”。(可参见job.properties)
  • SparkSQL常用接口 Spark SQL中在Python中重要的类有: pyspark.sql.SQLContext:是Spark SQL功能和DataFrame的主入口。 pyspark.sql.DataFrame:是一个以命名列方式组织的分布式数据集。 pyspark.sql.HiveContext:获取存储在Hive中数据的主入口。 pyspark.sql.DataFrameStatFunctions:统计功能中一些函数。 pyspark.sql.functions:DataFrame中内嵌的函数。 pyspark.sql.Window:sql中提供窗口功能。 表4 Spark SQL常用的Action 方法 说明 collect() 返回一个数组,包含DataFrame的所有列。 count() 返回DataFrame中的行数。 describe() 计算统计信息,包含计数,平均值,标准差,最小值和最大值。 first() 返回第一行。 head(n) 返回前n行。 show() 用表格形式显示DataFrame。 take(num) 返回DataFrame中的前num行。 表5 基本的DataFrame Functions 方法 说明 explain() 打印出SQL语句的逻辑计划和物理计划。 printSchema() 打印schema信息到控制台。 registerTempTable(name) 将DataFrame注册为一张临时表,命名为name,其周期和SQLContext绑定在一起。 toDF() 返回一个列重命名的DataFrame。
  • Spark Streaming常用接口 Spark Streaming中常见的类有: pyspark.streaming.StreamingContext:是Spark Streaming功能的主入口,负责提供创建DStreams的方法,入参中需要设置批次的时间间隔。 pyspark.streaming.DStream:是一种代表RDDs连续序列的数据类型,代表连续数据流。 dsteam.PariDStreamFunctions:键值对的DStream,常见的操作如groupByKey和reduceByKey。 对应的Spark Streaming的JAVA API是JavaStreamingContext,JavaDStream和JavaPairDStream。 Spark Streaming的常见方法与Spark Core类似,下表罗列了Spark Streaming特有的一些方法。 表3 Spark Streaming常用接口介绍 方法 说明 socketTextStream(hostname, port, storageLevel) 从TCP源主机:端口创建一个输入流。 start() 启动Spark Streaming计算。 awaitTermination(timeout) 当前进程等待终止,如Ctrl+C等。 stop(stopSparkContext, stopGraceFully) 终止Spark Streaming计算,stopSparkContext用于判断是否需要终止相关的SparkContext,StopGracefully用于判断是否需要等待所有接受到的数据处理完成。 UpdateStateByKey(func) 更新DStream的状态。使用此方法,需要定义State和状态更新函数。 window(windowLength, slideInterval) 根据源DStream的窗口批次计算得到一个新的DStream。 countByWindow(windowLength, slideInterval) 返回流中滑动窗口元素的个数。 reduceByWindow(func, windowLength, slideInterval) 当调用在DStream的KV对上,返回一个新的DStream的KV对,其中每个Key的Value根据滑动窗口中批次的reduce函数聚合得到。 join(other,numPartitions) 实现不同的Spark Streaming之间做合并操作。
  • Kafka样例工程介绍 MRS样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下Kafka相关样例工程: 表1 Kafka相关样例工程 样例工程位置 描述 kafka-examples 单线程生产数据,相关样例请参考使用Producer API向安全Topic生产消息。 单线程消费数据,相关样例请参考使用Consumer API订阅安全Topic并消费。 多线程生产数据,相关样例请参考使用多线程Producer发送消息。 多线程消费数据,相关样例请参考使用多线程Consumer消费消息。 基于KafkaStreams实现WordCount,相关样例请参考使用KafkaStreams统计数据 springboot/kafka-examples 基于SpringBoot搭建的应用Demo,实现了生产数据的REST API以及消费数据的REST API。参考Kafka对接SpringBoot样例代码。 父主题: Kafka开发指南(安全模式)
  • 代码样例 如下是写文件的代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsExample类。 /** * 创建目录 * * @throws java.io.IOException */ private void mkdir() throws IOException { Path destPath = new Path(DEST_PATH); if (!createPath(destPath)) { LOG.error("failed to create destPath " + DEST_PATH); return; } LOG.info("success to create path " + DEST_PATH); } /** * create file path * * @param filePath * @return * @throws java.io.IOException */ private boolean createPath(final Path filePath) throws IOException { if (!fSystem.exists(filePath)) { fSystem.mkdirs(filePath); } return true; }
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上(文件上传的路径需要和生成的jar包路径一致)。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseDistributedScanExample文件: # -*- coding:utf-8 -*- # -*- coding:utf-8 -*- """ 【说明】 (1)由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现 (2)如果使用yarn-client模式运行,请确认Spark2x客户端Spark2x/spark/conf/spark-defaults.conf中 spark.yarn.security.credentials.hbase.enabled参数配置为true """ from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession spark = SparkSession\ .builder\ .appName("JavaHBaseDistributedScan")\ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseDistributedScanExample') # 创建类实例并调用方法,传递sc._jsc参数 spark._jvm.JavaHBaseDistributedScan().execute(spark._jsc, sys.argv) # 停止SparkSession spark.stop()
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseDistributedScanExample SparkOnHbaseJavaExample.jar ExampleAvrotable python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample.jar HBaseDistributedScanExample.py ExampleAvrotable yarn-cluster模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseDistributedScanExample --files /opt/user.keytab,/opt/krb5.conf SparkOnHbaseJavaExample.jar ExampleAvrotable python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode cluster --files /opt/user.keytab,/opt/krb5.conf --jars SparkOnHbaseJavaExample.jar HBaseDistributedScanExample.py ExampleAvrotable
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala 版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkGetExample SparkOnHbaseJavaExample-1.0.jar bulktable python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample-1.0.jar HBaseBulkGetExample.py bulktable yarn-cluster模式: java/scala 版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkGetExample SparkOnHbaseJavaExample-1.0.jar bulktable python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --jars SparkOnHbaseJavaExample-1.0.jar HBaseBulkGetExample.py bulktable
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseBulkGetExample文件: # -*- coding:utf-8 -*- """ 【说明】 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现 """ from py4j.java_gateway import java_import from pyspark.sql import SparkSession # 创建SparkSession spark = SparkSession\ .builder\ .appName("JavaHBaseBulkGetExample")\ .getOrCreate() # 向sc._jvm中导入要运行的类 java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkGetExample') # 创建类实例并调用方法,传递sc._jsc参数 spark._jvm.JavaHBaseBulkGetExample().execute(spark._jsc, sys.argv) # 停止SparkSession spark.stop()
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
共100000条