华为云用户手册

  • 场景说明 假定用户有某个周末网民网购停留时间的日志文本,基于某些业务要求,要求开发MapReduce应用程序实现如下功能: 统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志 LiuYang,female,20YuanJing,male,10GuoYijun,male,5CaiXuyu,female,50Liyuan,male,20FangBo,female,50LiuYang,female,20YuanJing,male,10GuoYijun,male,50CaiXuyu,female,50FangBo,female,60 log2.txt:周日网民停留日志 LiuYang,female,20YuanJing,male,10CaiXuyu,female,50FangBo,female,50GuoYijun,male,5CaiXuyu,female,50Liyuan,male,20CaiXuyu,female,50FangBo,female,50LiuYang,female,20YuanJing,male,10FangBo,female,50GuoYijun,male,50CaiXuyu,female,50FangBo,female,60
  • 数据规划 首先需要把原日志文件放置在HDFS系统里。 在Linux系统上新建两个文本文件,将log1.txt中的内容复制保存到input_data1.txt,将log2.txt中的内容复制保存到input_data2.txt。 在HDFS上建立一个文件夹“/tmp/input”,并上传input_data1.txt,input_data2.txt到此目录,命令如下: 登录HDFS客户端。 cd 客户端安装目录 source bigdata_env 在Linux系统HDFS客户端使用命令hdfs dfs -mkdir /tmp/input 在Linux系统HDFS客户端使用命令hdfs dfs -putlocal_filepath /tmp/input
  • 常用接口 YARN常用的Java类有如下几个。 ApplicationClientProtocol 用于Client与ResourceManager之间。Client通过该协议可实现将应用程序提交到ResourceManager上,查询应用程序的运行状态或者中止应用程序等功能。 表1 ApplicationClientProtocol常用方法 方法 说明 forceKillApplication(KillApplicationRequest request) Client通过此接口请求RM中止一个已提交的任务。 getApplicationAttemptReport(GetApplicationAttemptReportRequest request) Client通过此接口从RM获取指定ApplicationAttempt的报告信息。 getApplicationAttempts(GetApplicationAttemptsRequest request) Client通过此接口从RM获取所有ApplicationAttempt的报告信息。 getApplicationReport(GetApplicationReportRequest request) Client通过此接口从RM获取某个应用的报告信息。 getApplications(GetApplicationsRequest request) Client通过此接口从RM获取满足一定过滤条件的应用的报告信息。 getClusterMetrics(GetClusterMetricsRequest request) Client通过此接口从RM获取集群的Metrics。 getClusterNodes(GetClusterNodesRequest request) Client通过此接口从RM获取集群中的所有节点信息。 getContainerReport(GetContainerReportRequest request) Client通过此接口从RM获取某个Container的报告信息。 getContainers(GetContainersRequest request) Client通过此接口从RM获取某个ApplicationAttemp的所有Container的报告信息。 getDelegationToken(GetDelegationTokenRequest request) Client通过此接口获取授权票据,用于container访问相应的service。 getNewApplication(GetNewApplicationRequest request) Client通过此接口获取一个新的应用ID号,用于提交新的应用。 getQueueInfo(GetQueueInfoRequest request) Client通过此接口从RM中获取队列的相关信息。 getQueueUserAcls(GetQueueUserAclsInfoRequest request) Client通过此接口从RM中获取当前用户的队列访问权限信息。 moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest request) 移动一个应用到新的队列。 submitApplication(SubmitApplicationRequest request) Client通过此接口提交一个新的应用到RM。 ApplicationMasterProtocol 用于ApplicationMaster与ResourceManager之间。ApplicationMaster使用该协议向ResourceManager注册、申请资源、获取各个任务的运行情况等。 表2 ApplicationMasterProtocol常用方法 方法 说明 allocate(AllocateRequest request) AM通过此接口提交资源分配申请。 finishApplicationMaster(FinishApplicationMasterRequest request) AM通过此接口通知RM运行成功或者失败。 registerApplicationMaster(RegisterApplicationMasterRequest request) AM通过此接口向RM进行注册。 ContainerManagementProtocol 用于ApplicationMaster与NodeManager之间。ApplicationMaster使用该协议要求NodeManager启动/中止Container或者查询Container的运行状态。 表3 ContainerManagementProtocol常用方法 方法 说明 getContainerStatuses(GetContainerStatusesRequest request) AM通过此接口向NM请求Containers的当前状态信息。 startContainers(StartContainersRequest request) AM通过此接口向NM提供需要启动的containers列表的请求。 stopContainers(StopContainersRequest request) AM通过此接口请求NM停止一系列已分配的Containers。
  • Structured Streaming不支持的功能 不支持多个流聚合。 不支持limit、first、take这些取N条Row的操作。 不支持Distinct。 只有当output mode为complete时才支持排序操作。 有条件地支持流和静态数据集之间的外连接。 不支持部分DataSet上立即运行查询并返回结果的操作: count():无法从流式Dataset返回单个计数,而是使用ds.groupBy().count()返回一个包含运行计数的streaming Dataset。 foreach():使用ds.writeStream.foreach(...)代替。 show():使用输出console sink代替。
  • Structured Streaming可靠性说明 Structured Streaming通过checkpoint和WAL机制,对可重放的sources,以及支持重复处理的幂等性sinks,可以提供端到端的exactly-once容错语义。 用户可在程序中设置option("checkpointLocation", "checkpoint路径")启用checkpoint。 从checkpoint恢复时,应用程序或者配置可能发生变更,有部分变更会导致从checkpoint恢复失败,具体限制如下: 不允许source的个数或者类型发生变化。 source的参数变化,这种情况是否能被支持,取决于source类型和查询语句,例如: 速率控制相关参数的添加、删除和修改,此种情况能被支持,如:spark.readStream.format("kafka").option("subscribe", "topic")变更为spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...) 修改消费的topic/files可能会出现不可预知的问题,如:spark.readStream.format("kafka").option("subscribe", "topic")变更为spark.readStream.format("kafka").option("subscribe", "newTopic") sink的类型发生变化:允许特定的几个sink的组合,具体场景需要验证确认,例如: File sink允许变更为kafka sink,kafka中只处理新数据。 kafka sink不允许变更为file sink。 kafka sink允许变更为foreach sink,反之亦然。 sink的参数变化,这种情况是否能被支持,取决于sink类型和查询语句,例如: 不允许file sink的输出路径发生变更。 允许Kafka sink的输出topic发生变更。 允许foreach sink中的自定义算子代码发生变更,但是变更结果取决于用户代码。 Projection、filter和map-like操作变更,局部场景下能够支持,例如: 支持Filter的添加和删除,如:sdf.selectExpr("a")变更为sdf.where(...).selectExpr("a").filter(...) Output schema相同时,projections允许变更,如:sdf.selectExpr("stringColumn AS json").writeStream变更为sdf.select(to_json(...).as("json")).writeStream Output schema不相同时,projections在部分条件下允许变更,如:sdf.selectExpr("a").writeStream变更为sdf.selectExpr("b").writeStream,只有当sink支持“a”到“b”的schema转换时才不会出错。 状态操作的变更,在部分场景下会导致状态恢复失败: Streaming aggregation:如sdf.groupBy("a").agg(...)操作中,不允许分组键或聚合键的类型或者数量发生变化。 Streaming deduplication:如:sdf.dropDuplicates("a")操作中,不允许分组键或聚合键的类型或者数量发生变化。 Stream-stream join:如sdf1.join(sdf2, ...)操作中,关联键的schema不允许发生变化,join类型不允许发生变化,其他join条件的变更可能导致不确定性结果。 任意状态计算:如sdf.groupByKey(...).mapGroupsWithState(...)或者sdf.groupByKey(...).flatMapGroupsWithState(...)操作中,用户自定义状态的schema或者超时类型都不允许发生变化;允许用户自定义state-mapping函数变化,但是变更结果取决于用户代码;如果需要支持schema变更,用户可以将状态数据编码/解码成二进制数据以支持schema迁移。 Source的容错性支持列表 Sources 支持的Options 容错支持 说明 File source path:必填,文件路径 maxFilesPerTrigger:每次trigger最大文件数(默认无限大) latestFirst:是否有限处理新文件(默认值: false) fileNameOnly:是否以文件名作为新文件校验,而不是使用完整路径进行判断(默认值: false) 支持 支持通配符路径,但不支持以逗号分隔的多个路径。 文件必须以原子方式放置在给定的目录中,这在大多数文件系统中可以通过文件移动操作实现。 Socket Source host:连接的节点ip,必填 port:连接的端口,必填 不支持 - Rate Source rowsPerSecond:每秒产生的行数,默认值1 rampUpTime:在达到rowsPerSecond速度之前的上升时间 numPartitions:生成数据行的并行度 支持 - Kafka Source 参见https://archive.apache.org/dist/spark/docs/3.3.1/structured-streaming-kafka-integration.html 支持 - Sink的容错性支持列表 Sinks 支持的output模式 支持Options 容错性 说明 File Sink Append Path:必须指定 指定的文件格式,参见DataFrameWriter中的相关接口 exactly-once 支持写入分区表,按时间分区用处较大 Kafka Sink Append, Update, Complete 参见:https://archive.apache.org/dist/spark/docs/3.3.1/structured-streaming-kafka-integration.html at-least-once 参见https://archive.apache.org/dist/spark/docs/3.3.1/structured-streaming-kafka-integration.html Foreach Sink Append, Update, Complete None 依赖于ForeachWriter实现 参见https://archive.apache.org/dist/spark/docs/3.3.1/structured-streaming-programming-guide.html#using-foreach ForeachBatch Sink Append, Update, Complete None 依赖于算子实现 参见https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch Console Sink Append, Update, Complete numRows:每轮打印的行数,默认20 truncate:输出太长时是否清空,默认true 不支持容错 - Memory Sink Append, Complete None 不支持容错,在complete模式下,重启query会重建整个表 -
  • Structured Streaming支持的功能 支持对流式数据的ETL操作。 支持流式DataFrames或Datasets的schema推断和分区。 流式DataFrames或Datasets上的操作:包括无类型,类似SQL的操作(比如select、where、groupBy),以及有类型的RDD操作(比如map、filter、flatMap)。 支持基于Event Time的聚合计算,支持对迟到数据的处理。 支持对流式数据的去除重复数据操作。 支持状态计算。 支持对流处理任务的监控。 支持批流join,流流join。 当前JOIN操作支持列表如下: 左表 右表 支持的Join类型 说明 Static Static 全部类型 即使在流处理中,不涉及流数据的join操作也能全部支持 Stream Static Inner 支持,但是无状态 Left Outer 支持,但是无状态 Right Outer 不支持 Full Outer 不支持 Stream Stream Inner 支持,左右表可选择使用watermark或者时间范围进行状态清理 Left Outer 有条件的支持,左表可选择使用watermark进行状态清理,右表必须使用watermark+时间范围 Right Outer 有条件的支持,右表可选择使用watermark进行状态清理,左表必须使用watermark+时间范围 Full Outer 不支持
  • SparkSQL常用接口 Spark SQL中在Python中重要的类有: pyspark.sql.SQLContext:是Spark SQL功能和DataFrame的主入口。 pyspark.sql.DataFrame:是一个以命名列方式组织的分布式数据集。 pyspark.sql.HiveContext:获取存储在Hive中数据的主入口。 pyspark.sql.DataFrameStatFunctions:统计功能中一些函数。 pyspark.sql.functions:DataFrame中内嵌的函数。 pyspark.sql.Window:sql中提供窗口功能。 表4 Spark SQL常用的Action 方法 说明 collect() 返回一个数组,包含DataFrame的所有列。 count() 返回DataFrame中的行数。 describe() 计算统计信息,包含计数,平均值,标准差,最小值和最大值。 first() 返回第一行。 head(n) 返回前n行。 show() 用表格形式显示DataFrame。 take(num) 返回DataFrame中的前num行。 表5 基本的DataFrame Functions 方法 说明 explain() 打印出SQL语句的逻辑计划和物理计划。 printSchema() 打印schema信息到控制台。 registerTempTable(name) 将DataFrame注册为一张临时表,命名为name,其周期和SQLContext绑定在一起。 toDF() 返回一个列重命名的DataFrame。
  • Spark Streaming常用接口 Spark Streaming中常见的类有: pyspark.streaming.StreamingContext:是Spark Streaming功能的主入口,负责提供创建DStreams的方法,入参中需要设置批次的时间间隔。 pyspark.streaming.DStream:是一种代表RDDs连续序列的数据类型,代表连续数据流。 dsteam.PariDStreamFunctions:键值对的DStream,常见的操作如groupByKey和reduceByKey。 对应的Spark Streaming的JAVA API是JavaStreamingContext,JavaDStream和JavaPairDStream。 Spark Streaming的常见方法与Spark Core类似,下表罗列了Spark Streaming特有的一些方法。 表3 Spark Streaming常用接口介绍 方法 说明 socketTextStream(hostname, port, storageLevel) 从TCP源主机:端口创建一个输入流。 start() 启动Spark Streaming计算。 awaitTermination(timeout) 当前进程等待终止,如Ctrl+C等。 stop(stopSparkContext, stopGraceFully) 终止Spark Streaming计算,stopSparkContext用于判断是否需要终止相关的SparkContext,StopGracefully用于判断是否需要等待所有接收到的数据处理完成。 UpdateStateByKey(func) 更新DStream的状态。使用此方法,需要定义State和状态更新函数。 window(windowLength, slideInterval) 根据源DStream的窗口批次计算得到一个新的DStream。 countByWindow(windowLength, slideInterval) 返回流中滑动窗口元素的个数。 reduceByWindow(func, windowLength, slideInterval) 当调用在DStream的KV对上,返回一个新的DStream的KV对,其中每个Key的Value根据滑动窗口中批次的reduce函数聚合得到。 join(other,numPartitions) 实现不同的Spark Streaming之间做合并操作。
  • 场景说明 假定一个广告业务,存在广告请求事件、广告展示事件、广告点击事件,广告主需要实时统计有效的广告展示和广告点击数据。 已知: 终端用户每次请求一个广告后,会生成广告请求事件,保存到kafka的adRequest topic中。 请求一个广告后,可能用于多次展示,每次展示,会生成广告展示事件,保存到kafka的adShow topic中。 每个广告展示,可能会产生多次点击,每次点击,会生成广告点击事件,保存到kafka的adClick topic中。 广告有效展示的定义如下: 请求到展示的时长超过A分钟算无效展示。 A分钟内多次展示,每次展示事件为有效展示。 广告有效点击的定义如下: 展示到点击时长超过B分钟算无效点击。 B分钟内多次点击,仅首次点击事件为有效点击。 基于此业务场景,模拟简单的数据结构如下: 广告请求事件 数据结构:adID^reqTime 广告展示事件 数据结构:adID^showID^showTime 广告点击事件 数据结构:adID^showID^clickTime 数据关联关系如下: 广告请求事件与广告展示事件通过adID关联。 广告展示事件与广告点击事件通过adID+showID关联。 数据要求: 数据从产生到到达流处理引擎的延迟时间不超过2小时 广告请求事件、广告展示事件、广告点击事件到达流处理引擎的时间不能保证有序和时间对齐
  • 数据规划 在kafka中生成模拟数据(需要有Kafka权限用户)。 java -cp $SPARK_HOME/conf:$SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:{ClassPath} com.huawei.bigdata.spark.examples.KafkaADEventProducer {BrokerList} {timeOfProduceReqEvent} {eventTimeBeforeCurrentTime} {reqTopic} {reqEventCount} {showTopic} {showEventMaxDelay} {clickTopic} {clickEventMaxDelay} 确保集群安装完成,包括HDFS、Yarn、Spark2x和Kafka。 启动Kafka的Producer,向Kafka发送数据。 {ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考在Linux环境中编包并运行Spark程序章节中导出jar包的操作步骤。 命令举例: java -cp /opt/client/Spark2x/spark/conf:/opt/StructuredStreamingADScalaExample-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.KafkaADEventProducer 10.132.190.170:21005,10.132.190.165:21005 2h 1h req 10000000 show 5m click 5m 此命令将在kafka上创建3个topic:req、show、click,在2h内生成1千万条请求事件数据,请求事件的时间取值范围为{当前时间-1h 至 当前时间},并为每条请求事件随机生成0-5条展示事件,展示事件的时间取值范围为{请求事件时间 至请求事件时间+5m },为每条展示事件随机生成0-5条点击事件,点击事件的时间取值范围为{展示事件时间 至展示事件时间+5m }
  • 功能介绍 统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。 主要分为三个部分: 从原文件中筛选女性网民上网时间数据信息,通过类CollectionMapper继承Mapper抽象类实现。 汇总每个女性上网时间,并输出时间大于两个小时的女性网民信息,通过类CollectionReducer继承Reducer抽象类实现。 main方法提供建立一个MapReduce job,并提交MapReduce作业到hadoop集群。
  • 功能简介 本小节介绍了如何使用Impala SQL建内部表、外部表的基本操作。创建表主要有以下三种方式。 自定义表结构,以关键字EXTERNAL区分创建内部表和外部表。 内部表,如果对数据的处理都由Impala完成,则应该使用内部表。在删除内部表时,元数据和数据一起被删除。 外部表,如果数据要被多种工具共同处理,则应该使用外部表,可避免对该数据的误操作。删除外部表时,只删除掉元数据。 根据已有表创建新表,使用CREATE LIKE句式,完全复制原有的表结构,包括表的存储格式。 根据查询结果创建新表,使用CREATE AS SELECT句式。 这种方式比较灵活,可以在复制原表表结构的同时指定要复制哪些字段,不包括表的存储格式。
  • 操作步骤 控制台显示运行结果会有如下成功信息: cluset status is falseWarning: Could not get charToByteConverterClass!Workflow job submitted: 0000067-160729120057089-oozie-omm-WWorkflow job running ...0000067-160729120057089-oozie-omm-WWorkflow job running ...0000067-160729120057089-oozie-omm-WWorkflow job running ...0000067-160729120057089-oozie-omm-WWorkflow job running ...0000067-160729120057089-oozie-omm-WWorkflow job running ...0000067-160729120057089-oozie-omm-WWorkflow job running ...0000067-160729120057089-oozie-omm-WWorkflow job running ...0000067-160729120057089-oozie-omm-WWorkflow job completed ...0000067-160729120057089-oozie-omm-WWorkflow id[0000067-160729120057089-oozie-omm-W] status[SUCCEEDED]-----------finish Oozie ------------------- 同时在HDFS上生成目录“/user/developuser/examples/output-data/map-reduce”,包括如下两个文件: _SUC CES S part-00000 可以通过Hue的文件浏览器或者通过HDFS如下命令行查看: hdfs dfs -ls /user/developuser/examples/output-data/map-reduce 在Windows下面执行的时候可能会出现下面的异常,但是不影响业务: java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
  • Impala SQL编写之不支持隐式类型转换 查询语句使用字段的值做过滤时,不支持使用Hive类似的隐式类型转换来编写Impala SQL: Impala示例: select * from default.tbl_src where id = 10001;select * from default.tbl_src where name = 'TestName'; Hive示例(支持隐式类型转换): select * from default.tbl_src where id = '10001';select * from default.tbl_src where name = TestName; 表tbl_src的id字段为Int类型,name字段为String类型。
  • 操作步骤 编译JDBC样例程序: 在IDEA界面左下方单击“Terminal”进入终端,执行命令mvn clean package进行编译。 当输出“BUILD SUCCESS”,表示编译成功,如下图所示。编译成功后将会在样例工程的target下生成含有“-with-dependencies”字段的jar包。 运行JDBC样例程序: 在Linux上创建一个目录作为运行目录,如“/opt/jdbc_example”,将1中生成的“target”目录下包名中含有“-with-dependencies”字段的Jar包放进该路径下,并在该目录下创建子目录“src/main/resources”,将已获取的“hive-jdbc-example\src\main\resources”目录下的所有文件复制到“resources”下。 执行以下命令运行Jar包: chmod +x /opt/jdbc_example -R cd /opt/jdbc_example java -jar hive-jdbc-example-1.0-SNAPSHOT-jar-with-dependencies.jar 以上Jar包名称仅供参考,具体名称以实际生成为主。 在命令行终端查看样例代码中的HQL所查询出的结果,运行成功会显示如下信息: Create table success!_c00Delete table success!
  • 回答 旧插件storm-kafka中的KafkaSpout使用的是Kafka的“SimpleConsumer”接口,需要自主管理offset,KafkaSpout中根据用户定义的字段将Topic中每个Patition的offset记录在ZooKeeper中,定义如下: public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) { super(hosts, topic); this.zkRoot = zkRoot; this.id = id;} 其中“hosts”是ZooKeeper的连接串,如:192.168.0.1:2181/kafka,“topic”是待消费的Topic名,“zkRoot”表示在ZooKeeper中的存放数据的根路径,一般为:“/kafka/{topic}”,“id”表示应用的标示,如:app1。读取offset会有以下两种场景: 场景1 当拓扑运行后,KafkaSpout会将offset存放在ZooKeeper路径:“/{zkRoot}/{id}/{partitionId}”下,其中“zkRoot”和“id”是用户指定的,“partitionId”是自动获取的。默认情况下,拓扑在启动后会先从ZooKeeper上的offset存放路径读取历史的offset,用作本次的消费起点,因此只需要正确的指定“zkRoot”和“id”,就可以继承历史记录的offset,不用从头开始消费。 场景2 没有像场景1中那样设置固定的“zkRoot”或者“id”,导致无法读取历史的offset,如此一来每次提交拓扑都会把历史已经消费过的数据再消费一遍,这时需要通过如下方式手动指定: SpoutConfig spoutConfig = new SpoutConfig(hosts, inputTopicName, zkRoot, appId);spoutConfig.ignoreZkOffsets = true;spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); 通过指定SpoutConfig中的“ignoreZkOffsets”和“startOffsetTime”来强制消费最新的数据。
  • 回答 由于checkpoint中包含了spark应用的对象序列化信息、task执行状态信息、配置信息等,因此,当存在以下问题时,从checkpoint恢复spark应用将会失败。 业务代码变更且变更类未明确指定SerialVersionUID。 spark内部类变更,且变更类未明确指定SerialVersionUID。 另外,由于checkpoint保存了部分配置项,因此可能导致业务修改了部分配置项后,从checkpoint恢复时,配置项依然保持为旧值的情况。当前只有以下部分配置会在从checkpoint恢复时重新加载。 "spark.yarn.app.id", "spark.yarn.app.attemptId", "spark.driver.host", "spark.driver.bindAddress", "spark.driver.port", "spark.master", "spark.yarn.jars", "spark.yarn.keytab", "spark.yarn.principal", "spark.yarn.credentials.file", "spark.yarn.credentials.renewalTime", "spark.yarn.credentials.updateTime", "spark.ui.filters", "spark.mesos.driver.frameworkId", "spark.yarn.jars"
  • 操作场景 在程序代码完成开发后,您可以上传至Linux客户端环境中运行应用。使用Scala或Java语言开发的应用程序在Spark客户端的运行步骤是一样的。 使用Python开发的Spark应用程序无需打包成jar,只需将样例工程复制到编译机器上即可。 用户需保证worker和driver的Python版本一致,否则将报错:"Python in worker has different version %s than that in driver %s."。 用户需保证Maven已配置华为镜像站中SDK的Maven镜像仓库,具体可参考配置华为开源镜像仓
  • 数据规划 StructuredStreaming样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户)。 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。 创建Topic。 {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。 $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 1 --topic {Topic} 启动Kafka的Producer,向Kafka发送数据。 {ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考在Linux环境中编包并运行Spark程序章节中导出jar包的操作步骤。 java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:{ClassPath} com.huawei.bigdata.spark.examples.KafkaWordCountProducer {BrokerList} {Topic} {messagesPerSec} {wordsPerMessage}
  • 场景说明 假定Hive的person表存储用户当天消费的金额信息,HBase的table2表存储用户历史消费的金额信息。 现person表有记录name=1,account=100,表示用户1在当天消费金额为100元。 table2表有记录key=1,cf:cid=1000,表示用户1的历史消息记录金额为1000元。 基于某些业务要求,要求开发Spark应用程序实现如下功能: 根据用户名累计用户的历史消费金额,即用户总消费金额=100(用户当天的消费金额) + 1000(用户历史消费金额)。 上例所示,运行结果table2表用户key=1的总消费金额为cf:cid=1100元。
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中编包并运行Spark程序。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt/female/” )下。 运行样例程序前,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”)。
  • 数据规划 Spark Streaming样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户)。 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。 本地新建文件“input_data1.txt”,将“log1.txt”的内容复制保存到“input_data1.txt”。 在客户端安装节点下创建文件目录:“/home/data”。将上述文件上传到此“/home/data”目录下。 创建Topic。 {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。 $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic} 启动Kafka的Producer,向Kafka发送数据。 java -cp {ClassPath} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic} 其中,ClassPath除样例工程jar包路径外,还应包含Spark客户端Kafka jar包的绝对路径,例如:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/*:{ClassPath}
  • 数据规划 在开始开发应用前,需要创建Hive表,命名为person,并插入数据。同时,创建HBase table2表,用于将分析后的数据写入。 将原日志文件放置到HDFS系统中。 在本地新建一个空白的log1.txt文件,并在文件内写入如下内容: 1,100 在HDFS中新建一个目录/tmp/input,并将log1.txt文件上传至此目录。 在Linux系统HDFS客户端使用命令hadoop fs -mkdir /tmp/input(hdfs dfs命令有同样的作用),创建对应目录。 在Linux系统HDFS客户端使用命令hadoop fs -putlog1.txt /tmp/input,上传数据文件。 将导入的数据放置在Hive表里。 首先,确保JD BCS erver已启动。然后使用Beeline工具,创建Hive表,并插入数据。 执行如下命令,创建命名为person的Hive表。 create table person ( name STRING, account INT ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY '\\' STORED AS TEXTFILE; 执行如下命令插入数据。 load data inpath '/tmp/input/log1.txt' into table person; 创建HBase表。 确保JDB CS erver已启动,然后使用Spark-beeline工具,创建HBase表,并插入数据。 执行如下命令,创建命名为table2的HBase表。 create table table2 ( key string, cid string ) using org.apache.spark.sql.hbase.HBaseSource options( hbaseTableName "table2", keyCols "key", colsMapping "cid=cf.cid"); 通过HBase插入数据,执行如下命令。 put 'table2', '1', 'cf:cid', '1000'
  • 运行任务 进入Spark客户端目录,调用bin/spark-submit脚本运行代码,运行命令分别如下(类名与文件名等请与实际代码保持一致,此处仅为示例): 运行Java或Scala样例代码 bin/spark-submit --class com.huawei.bigdata.spark.examples.SparkHivetoHbase --master yarn --deploy-mode client /opt/female/SparkHivetoHbase-1.0.jar 运行Python样例程序 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现。将所提供 Java代码使用maven打包成jar,并放在相同目录下,运行python程序时要使用--jars把jar包加载到classpath中。 bin/spark-submit --master yarn --deploy-mode client --jars /opt/female/SparkHivetoHbasePythonExample/SparkHivetoHbase-1.0.jar /opt/female/SparkHivetoHbasePythonExample/SparkHivetoHbasePythonExample.py
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中编包并运行Spark程序。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在$SPARK_HOME目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkPutExample SparkOnHbaseJavaExample-1.0.jar bulktable cf1 python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample-1.0.jar HBaseBulkPutExample.py bulktable cf1 yarn-cluster模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkPutExample SparkOnHbaseJavaExample-1.0.jar bulktable cf1 python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --jars SparkOnHbaseJavaExample-1.0.jar HBaseBulkPutExample.py bulktable cf1
  • 打包项目 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中编包并运行Spark程序。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“$SPARK_HOME” )下。 若运行“Spark on HBase”样例程序,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”),将配置项“spark.inputFormat.cache.enabled”设置为“false”。
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseForEachPartitionExample文件: # -*- coding:utf-8 -*-"""【说明】由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现"""from py4j.java_gateway import java_importfrom pyspark.sql import SparkSession# 创建SparkSessionspark = SparkSession\ .builder\ .appName("JavaHBaseForEachPartitionExample")\ .getOrCreate()# 向sc._jvm中导入要运行的类java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseForEachPartitionExample')# 创建类实例并调用方法,传递sc._jsc参数spark._jvm.JavaHBaseForEachPartitionExample().execute(spark._jsc, sys.argv)# 停止SparkSessionspark.stop()
  • Python样例代码 下面代码片段仅为演示,具体代码参见SparkOnHbasePythonExample中HBaseBulkPutExample文件: # -*- coding:utf-8 -*-"""【说明】由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现"""from py4j.java_gateway import java_importfrom pyspark.sql import SparkSession# 创建SparkSessionspark = SparkSession\ .builder\ .appName("JavaHBaseBulkPutExample")\ .getOrCreate()# 向sc._jvm中导入要运行的类java_import(spark._jvm, 'com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseBulkPutExample')# 创建类实例并调用方法,传递sc._jsc参数spark._jvm.JavaHBaseBulkPutExample().execute(spark._jsc, sys.argv)# 停止SparkSessionspark.stop()
  • 提交命令 假设用例代码打包后的jar包名为spark-hbaseContext-test-1.0.jar,并将jar包放在客户端“$SPARK_HOME”目录下,以下命令均在“$SPARK_HOME”目录执行,Java接口对应的类名前有Java字样,请参考具体样例代码进行书写。 yarn-client模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseForEachPartitionExample SparkOnHbaseJavaExample-1.0.jar table2 cf1 python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode client --jars SparkOnHbaseJavaExample-1.0.jar HBaseForEachPartitionExample.py table2 cf1 yarn-cluster模式: java/scala版本(类名等请与实际代码保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --class com.huawei.bigdata.spark.examples.hbasecontext.JavaHBaseForEachPartitionExample SparkOnHbaseJavaExample-1.0.jar table2 cf1 python版本(文件名等请与实际保持一致,此处仅为示例) bin/spark-submit --master yarn --deploy-mode cluster --jars SparkOnHbaseJavaExample-1.0.jar HBaseForEachPartitionExample.py table2 cf1
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全