华为云用户手册

  • 建立连接 创建连接时传入表1中配置的user和password作为认证凭据,ClickHouse会带着用户名和密码在服务端进行安全认证。 ClickHouseDataSource clickHouseDataSource =new ClickHouseDataSource(JDBC_PREFIX + serverList.get(tries - 1), clickHouseProperties);connection = clickHouseDataSource.getConnection(user, password); 父主题: 样例代码说明
  • Spark SQL动态分区插入优化之sortby 如下SQL中,p1和p2是target表的分区字段,使用sort by关键字来减少小文件的产生。 insert overwrite table target partition(p1,p2)select * from sourcedistribute by p1, p2sort by p1,p2 经过动态分区Shuffle优化之后,每一个Hive分区的数据都会集中在一个Spark Task中,但是由于Hive分区的数量远远大于Task数量,所以一个Task中会包含多个Hive分区的数据,即Task与Hive分区的关系是一对多。 每一个Task会将其包含的数据按照行顺序写入文件,文件所在的分区由该行数据中的分区字段值决定。Task在写入第一行数据时会创建一个新文件,随后写的每行数据都会判断该行数据的分区字段值与上一行数据的分区字段值是否相同,如果不相同就会新建一个文件并将该行数据写入,否则将该行数据写入上一条数据所在的文件。因此在Task写动态分区数据时,相邻两行数据如果分区字段值相同,就会写入同一个文件,否则就会写入不同的文件。假设Task有N行数据,在最坏情况下,所有相邻数据的分区字段值都不相同,那么Task将会写N个文件,每个文件只有一行数据。 为了将一个Task中相同分区的数据集中在一起,减少Task写的文件数量,需要将数据按照分区字段进行排序。假设一个Task中包含M个分区数据,排序之后,一个Task中相同分区的数据就会相邻,最终一个Task只会写M个文件。在Spark SQL中增加sort by关键词可完成排序功能。 在Spark SQL中使用动态分区写入数据,需要同时使用distribute by和sort by才能有效减少小文件数量。
  • 开发流程 开发流程中各阶段的说明如图1和表1所示。 图1 ClickHouse应用程序开发流程 表1 ClickHouse应用开发的流程说明 阶段 说明 参考文档 准备开发环境 在进行应用开发前,需首先准备开发环境,ClickHouse的应用程序支持多种语言开发,推荐使用Java语言,使用IntelliJ IDEA工具,同时完成JDK、Maven等初始配置。 准备开发环境 准备连接集群配置文件 应用程序开发或运行过程中,需通过集群相关配置文件信息连接 MRS 集群,配置文件通常包括集群信息文件以及用于安全认证的用户文件,可从已创建好的MRS集群中获取相关内容。 用于程序调测或运行的节点,需要与MRS集群内节点网络互通,同时配置hosts域名信息。 准备连接集群配置文件 配置并导入样例工程 ClickHouse提供了不同场景下的样例程序,用户可获取样例工程并导入本地开发环境中进行程序学习。 配置并导入样例工程 根据业务场景开发程序 提供样例工程,帮助用户快速了解ClickHouse各部件的编程接口。 开发程序 编译并运行程序 将开发好的程序编译运行,用户可在本地Windows开发环境中进行程序调测运行,也可以将程序编译为Jar包后,提交到Linux节点上运行。 在Windows下调测程序 在Linux下调测程序 父主题: 概述
  • RDD多次使用时,建议将RDD持久化 RDD在默认情况下的存储级别是StorageLevel.NONE,即既不存磁盘也不放在内存中,如果某个RDD需要多次使用,可以考虑将该RDD持久化,方法如下: 调用spark.RDD中的cache()、persist()、persist(newLevel: StorageLevel)函数均可将RDD持久化,cache()和persist()都是将RDD的存储级别设置为StorageLevel.MEMORY_ONLY,persist(newLevel: StorageLevel)可以为RDD设置其他存储级别,但是要求调用该方法之前RDD的存储级别为StorageLevel.NONE或者与newLevel相同,也就是说,RDD的存储级别一旦设置为StorageLevel.NONE之外的级别,则无法改变。 如果想要将RDD去持久化,那么可以调用unpersist(blocking: Boolean = true),将该RDD从持久化列表中移除,并将RDD的存储级别重新设置为StorageLevel.NONE。
  • Spark建议使用Commit V2算法 在Spark提交作业时配置参数spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2,使用commit v2算法,例如在 DataArts Studio 提交作业增加—conf配置项,值为spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2。 Spark默认采用Commit V1算法,该算法原理为:Task执行完毕后将数据写入了临时目录,Driver等待所有Task执行完毕后,串行的将每个Task的临时输出文件移动到最终的目录中。如果输出的小文件过多,Driver串行移动文件耗时会过长,最终导致Commit过程比较耗时。 将Commit算法修改为V2,使得每个Task在执行成功后将临时文件移动到最终目录,相当于Driver中串行的移动操作优化为Task并行的移动文件操作。V2算法的缺点在于Spark作业执行过程中,最终目录的文件是对外可见的,如果此时有其他程序读取了最终目录的数据,那么其他程序处理的数据出现不一致问题。 然而使用Spark写Hive表,Spark作业的最终目录也是一个临时目录,通过load操作将临时目录数据导入hive表,所以Hive表的目录才是真正的最终目录,外部作业是无法读取到中间临时生成目录,因此针对Spark写Hive场景推荐使用Commit V2算法。
  • Spark SQL动态分区插入优化之Distributeby 如下SQL中,p1和p2是target表的分区字段,使用distribute by关键字来减少小文件的产生。 insert overwrite table target partition(p1,p2)select * from sourcedistribute by p1, p2 Spark程序以动态分区的方式写入Hive表时,会出现了大量的小文件,导致最后移动文件到hive表目录非常耗时,这是因为在Shuffle时Hive多个分区的数据随机落到Spark的多个Task中,此时Task与Hive分区数据的关系是多对多,即每个Task会包含多个分区的部分数据,每个Task中包含的每个分区的数据都很少,最终会导致Task写多个分区文件,每个分区文件都比较小。 为了减少小文件的数量,需要将数据按照分区字段进行Shuffle,将各个分区的数据尽量各自集中在一个Task,在Spark SQL中就是通过distribute by关键字来完成这个功能的。 当使用distribute by关键字在后出现了数据倾斜,即有的分区数据多,有的分区数据少,也会导致spark 作业整体耗时长。需要在distribute by后面增加随机数,例如: insert overwrite table target partition(p1,p2)select * from sourcedistribute by p1, p2, cast(rand() * N as int) N值可以在文件数量和倾斜度之间做权衡。 在Spark SQL中使用动态分区写入数据,需要同时使用distribute by和sort by才能有效减少小文件数量。
  • 操作步骤 查看Spark应用运行结果数据。 结果数据存储路径和格式已经由Spark应用程序指定,可通过指定文件获取。 查看Spark应用程序运行情况。 Spark主要有两个Web页面。 Spark UI页面,用于展示正在执行的应用的运行情况。 页面主要包括了Jobs、Stages、Storage、Environment和Executors五个部分。Streaming应用会多一个Streaming标签页。 页面入口:在YARN的Web UI界面,查找到对应的Spark应用程序。单击应用信息的最后一列“ApplicationMaster”,即可进入SparkUI页面。 History Server页面,用于展示已经完成的和未完成的Spark应用的运行情况。 页面包括了应用ID、应用名称、开始时间、结束时间、执行时间、所属用户等信息。单击应用ID,页面将跳转到该应用的SparkUI页面。 查看Spark日志获取应用运行情况。 您可以查看Spark日志了解应用运行情况,并根据日志信息调整应用程序。相关日志信息可参考Spark2x日志介绍。
  • 设置属性 设置连接属性,如下样例代码设置socket超时时间为60s。 ClickHouseProperties clickHouseProperties = new ClickHouseProperties();clickHouseProperties.setSocketTimeout(60000); 如果配置并导入样例工程中的“clickhouse-example.properties”配置文件中“sslUsed”参数配置为“true”时,则需要设置如下连接属性: clickHouseProperties.setSsl(true);clickHouseProperties.setSslMode("none"); 父主题: 样例代码说明
  • 操作场景 在程序代码完成开发后,您可以上传至Linux客户端环境中运行应用。使用Scala或Java语言开发的应用程序在Spark客户端的运行步骤是一样的。 使用Python开发的Spark应用程序无需打包成jar,只需将样例工程拷贝到编译机器上即可。 用户需保证worker和driver的Python版本一致,否则将报错:"Python in worker has different version %s than that in driver %s."。 用户需保证Maven已配置华为镜像站中SDK的Maven镜像仓库,具体可参考华为开源镜像配置方式
  • 数据规划 在kafka中生成模拟数据(需要有Kafka权限用户)。 确保集群安装完成,包括安装HDFS、Yarn、Spark2x和Kafka服务。 将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”。 创建Topic。 {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。 $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 1 --topic {Topic} 启动Kafka的Producer,向Kafka发送数据。 {ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考编包并运行程序章节中导出jar包的操作步骤。 java -cp $SPARK_HOME/conf:$SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:{ClassPath} com.huawei.bigdata.spark.examples.KafkaProducer {brokerlist} {topic} {number of events produce every 0.02s} 示例: java -cp /opt/client/Spark2x/spark/conf:/opt/StructuredStreamingState-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.KafkaProducer xxx.xxx.xxx.xxx:21005,xxx.xxx.xxx.xxx:21005,xxx.xxx.xxx.xxx:21005 mytopic 10
  • 打包项目 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考编包并运行程序。 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/female/user.keytab”,“/opt/female/krb5.conf”。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt” )下。
  • 数据规划 将cluster2集群的所有Zookeeper节点和HBase节点的IP和主机名配置到cluster1集群的客户端节点的“/etc/hosts”文件中。 分别将cluster1和cluster2集群Spark2x客户端conf下的hbase-site.xml文件放到“/opt/example/A”,“/opt/example/B”两个目录下。 用spark-submit提交命令: 运行样例程序前,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”)。 spark-submit --master yarn --deploy-mode client --files /opt/example/B/hbase-site.xml --keytab /opt/FIclient/user.keytab --principal sparkuser --class com.huawei.spark.examples.SparkOnMultiHbase /opt/example/SparkOnMultiHbase-1.0.jar
  • 数据规划 在kafka中生成模拟数据(需要有Kafka权限用户)。 java -cp $SPARK_HOME/conf:$SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:{ClassPath} com.huawei.bigdata.spark.examples.KafkaADEventProducer {BrokerList} {timeOfProduceReqEvent} {eventTimeBeforeCurrentTime} {reqTopic} {reqEventCount} {showTopic} {showEventMaxDelay} {clickTopic} {clickEventMaxDelay} 确保集群安装完成,包括HDFS、Yarn、Spark2x和Kafka。 将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”。 启动Kafka的Producer,向Kafka发送数据。 {ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考编包并运行程序章节中导出jar包的操作步骤。 命令举例: java -cp /opt/client/Spark2x/spark/conf:/opt/StructuredStreamingADScalaExample-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.KafkaADEventProducer 10.132.190.170:21005,10.132.190.165:21005 2h 1h req 10000000 show 5m click 5m 此命令将在kafka上创建3个topic:req、show、click,在2h内生成1千万条请求事件数据,请求事件的时间取值范围为{当前时间-1h 至 当前时间},并为每条请求事件随机生成0-5条展示事件,展示事件的时间取值范围为{请求事件时间 至请求事件时间+5m },为每条展示事件随机生成0-5条点击事件,点击事件的时间取值范围为{展示事件时间 至展示事件时间+5m }
  • 场景说明 假定一个广告业务,存在广告请求事件、广告展示事件、广告点击事件,广告主需要实时统计有效的广告展示和广告点击数据。 已知: 终端用户每次请求一个广告后,会生成广告请求事件,保存到kafka的adRequest topic中。 请求一个广告后,可能用于多次展示,每次展示,会生成广告展示事件,保存到kafka的adShow topic中。 每个广告展示,可能会产生多次点击,每次点击,会生成广告点击事件,保存到kafka的adClick topic中。 广告有效展示的定义如下: 请求到展示的时长超过A分钟算无效展示。 A分钟内多次展示,每次展示事件为有效展示。 广告有效点击的定义如下: 展示到点击时长超过B分钟算无效点击。 B分钟内多次点击,仅首次点击事件为有效点击。 基于此业务场景,模拟简单的数据结构如下: 广告请求事件 数据结构:adID^reqTime 广告展示事件 数据结构:adID^showID^showTime 广告点击事件 数据结构:adID^showID^clickTime 数据关联关系如下: 广告请求事件与广告展示事件通过adID关联。 广告展示事件与广告点击事件通过adID+showID关联。 数据要求: 数据从产生到到达流处理引擎的延迟时间不超过2小时 广告请求事件、广告展示事件、广告点击事件到达流处理引擎的时间不能保证有序和时间对齐
  • 数据规划 StructuredStreaming样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户)。 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。 将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”。 创建Topic。 {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。 $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 1 --topic {Topic} 启动Kafka的Producer,向Kafka发送数据。 {ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考编包并运行程序章节中导出jar包的操作步骤。 java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:{ClassPath} com.huawei.bigdata.spark.examples.KafkaWordCountProducer {BrokerList} {Topic} {messagesPerSec} {wordsPerMessage}
  • 打包项目 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考编包并运行程序。 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/female/user.keytab”,“/opt/female/krb5.conf”。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt” )下。
  • 场景说明 假定Hive的person表存储用户当天消费的金额信息,HBase的table2表存储用户历史消费的金额信息。 现person表有记录name=1,account=100,表示用户1在当天消费金额为100元。 table2表有记录key=1,cf:cid=1000,表示用户1的历史消息记录金额为1000元。 基于某些业务要求,要求开发Spark应用程序实现如下功能: 根据用户名累计用户的历史消费金额,即用户总消费金额=100(用户当天的消费金额) + 1000(用户历史消费金额)。 上例所示,运行结果table2表用户key=1的总消费金融为cf:cid=1100元。
  • 打包项目 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考编包并运行程序。 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/female/user.keytab”,“/opt/female/krb5.conf”。 运行样例程序前,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”)。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt/female/” )下。
  • 运行任务 进入Spark客户端目录,调用bin/spark-submit脚本运行代码,运行命令分别如下(类名与文件名等请与实际代码保持一致,此处仅为示例): 运行Java或Scala样例代码 bin/spark-submit --class com.huawei.bigdata.spark.examples.SparkHivetoHbase --master yarn --deploy-mode client /opt/female/SparkHivetoHbase-1.0.jar 运行Python样例程序 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现。将所提供 Java代码使用maven打包成jar,并放在相同目录下,运行python程序时要使用--jars把jar包加载到classpath中。 由于Python样例代码中未给出认证信息,请在执行应用程序时通过配置项“--keytab”和“--principal”指定认证信息。 bin/spark-submit --master yarn --deploy-mode client --keytab /opt/FIclient/user.keytab --principal sparkuser --jars /opt/female/SparkHivetoHbasePythonExample/SparkHivetoHbase-1.0.jar /opt/female/SparkHivetoHbasePythonExample/SparkHivetoHbasePythonExample.py
  • 数据规划 Spark Streaming样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户)。 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。 本地新建文件“input_data1.txt”,将“log1.txt”的内容复制保存到“input_data1.txt”。 在客户端安装节点下创建文件目录:“/home/data”。将上述文件上传到此“/home/data”目录下。 将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”。 创建Topic。 {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。 $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic} 启动Kafka的Producer,向Kafka发送数据。 java -cp {ClassPath} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic} 其中,ClassPath除样例jar包路径外,还应包含Spark客户端Kafka jar包的绝对路径,例如:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/*:{ClassPath}
  • 打包项目 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考编包并运行程序。 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/female/user.keytab”,“/opt/female/krb5.conf”。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt” )下。
  • 场景说明 假定HBase的table1表存储用户当天消费的金额信息,table2表存储用户历史消费的金额信息。 现table1表有记录key=1,cf:cid=100,表示用户1在当天消费金额为100元。 table2表有记录key=1,cf:cid=1000,表示用户1的历史消息记录金额为1000元。 基于某些业务要求,要求开发Spark应用程序实现如下功能: 根据用户名累计用户的历史消费金额,即用户总消费金额=100(用户当天的消费金额) + 1000(用户历史消费金额)。 上例所示,运行结果table2表用户key=1的总消费金融为cf:cid=1100元。
  • 代码样例 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现。 下面代码片段仅为演示,具体代码参见SparkHivetoHbasePythonExample: # -*- coding:utf-8 -*-from py4j.java_gateway import java_importfrom pyspark.sql import SparkSession# 创建SparkSessionspark = SparkSession\ .builder\ .appName("SparkHivetoHbase") \ .getOrCreate()# 向sc._jvm中导入要运行的类java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.SparkHivetoHbase')# 创建类实例并调用方法spark._jvm.SparkHivetoHbase().hivetohbase(spark._jsc)# 停止SparkSessionspark.stop()
  • 数据规划 使用Spark-Beeline工具创建Spark和HBase表table1、table2,并通过HBase插入数据。 确保JD BCS erver已启动。然后在Spark2x客户端,使用Spark-Beeline工具执行如下操作。 使用Spark-beeline工具创建Spark表table1。 create table table1 ( key string, cid string ) using org.apache.spark.sql.hbase.HBaseSource options( hbaseTableName "table1", keyCols "key", colsMapping "cid=cf.cid"); 通过HBase插入数据,命令如下: put 'table1', '1', 'cf:cid', '100' 使用Spark-Beeline工具创建Spark表table2。 create table table2 ( key string, cid string ) using org.apache.spark.sql.hbase.HBaseSource options( hbaseTableName "table2", keyCols "key", colsMapping "cid=cf.cid"); 通过HBase插入数据,命令如下: put 'table2', '1', 'cf:cid', '1000'
  • 打包项目 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考编包并运行程序。 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/female/user.keytab”,“/opt/female/krb5.conf”。 运行样例程序前,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”)。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt/female/” )下。
  • 运行任务 进入Spark客户端目录,调用bin/spark-submit脚本运行代码,运行命令分别如下(类名与文件名等请与实际代码保持一致,此处仅为示例): 运行Java或Scala样例代码 bin/spark-submit --conf spark.yarn.user.classpath.first=true --class com.huawei.bigdata.spark.examples.SparkHbasetoHbase --master yarn --deploy-mode client /opt/female/SparkHbasetoHbase-1.0.jar 运行Python样例程序 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现。将所提供 Java代码使用maven打包成jar,并放在相同目录下,运行python程序时要使用--jars把jar包加载到classpath中。 由于Python样例代码中未给出认证信息,请在执行应用程序时通过配置项“--keytab”和“--principal”指定认证信息。 bin/spark-submit --master yarn --deploy-mode client --keytab /opt/FIclient/user.keytab --principal sparkuser --conf spark.yarn.user.classpath.first=true --jars /opt/female/SparkHbasetoHbasePythonExample/SparkHbasetoHbase-1.0.jar,/opt/female/protobuf-java-2.5.0.jar /opt/female/SparkHbasetoHbasePythonExample/SparkHbasetoHbasePythonExample.py
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例),${ip}请使用实际执行nc -lk 9999的命令的机器ip bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.streaming.JavaHBaseStreamingBulkPutExample SparkOnHbaseJavaExample.jar ${ip} 9999 streamingTable cf1 python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample.jar HBaseStreamingBulkPutExample.py ${ip} 9999 streamingTable cf1 yarn-cluster模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例),${ip}请使用实际执行nc -lk 9999的命令的机器ip bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.streaming.JavaHBaseStreamingBulkPutExample --files /opt/user.keytab,/opt/krb5.conf SparkOnHbaseJavaExample.jar ${ip} 9999 streamingTable cf1 python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode cluster --files /opt/user.keytab,/opt/krb5.conf --jars SparkOnHbaseJavaExample.jar HBaseStreamingBulkPutExample.py ${ip} 9999 streamingTable cf1
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseStreamingBulkPutExample文件: # -*- coding:utf-8 -*-"""【说明】(1)由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现(2)如果使用yarn-client模式运行,请确认Spark2x客户端Spark2x/spark/conf/spark-defaults.conf中 spark.yarn.security.credentials.hbase.enabled参数配置为true"""from py4j.java_gateway import java_importfrom pyspark.sql import SparkSession# 创建SparkSessionspark = SparkSession\ .builder\ .appName("JavaHBaseStreamingBulkPutExample")\ .getOrCreate()# 向sc._jvm中导入要运行的类java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.streaming.JavaHBaseStreamingBulkPutExample')# 创建类实例并调用方法,传递sc._jsc参数spark._jvm.JavaHBaseStreamingBulkPutExample().execute(spark._jsc, sys.argv)# 停止SparkSessionspark.stop()
  • 代码样例 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现。 下面代码片段仅为演示,具体代码参见SparkHbasetoHbasePythonExample: # -*- coding:utf-8 -*-from py4j.java_gateway import java_importfrom pyspark.sql import SparkSession# 创建SparkSession,设置kryo序列化spark = SparkSession\ .builder\ .appName("SparkHbasetoHbase") \ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ .config("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator") \ .getOrCreate()# 向sc._jvm中导入要运行的类java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.SparkHbasetoHbase')# 创建类实例并调用方法spark._jvm.SparkHbasetoHbase().hbasetohbase(spark._jsc)# 停止SparkSessionspark.stop()
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseDistributedScanExample SparkOnHbaseJavaExample.jar ExampleAvrotable python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample.jar HBaseDistributedScanExample.py ExampleAvrotable yarn-cluster模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseDistributedScanExample --files /opt/user.keytab,/opt/krb5.conf SparkOnHbaseJavaExample.jar ExampleAvrotable python版本(文件名等请与实际保持一致,此处仅为示例),假设对应的Java代码打包后包名为SparkOnHbaseJavaExample.jar,且放在当前提交目录。 bin/spark-submit --master yarn --deploy-mode cluster --files /opt/user.keytab,/opt/krb5.conf --jars SparkOnHbaseJavaExample.jar HBaseDistributedScanExample.py ExampleAvrotable
共100000条