华为云用户手册

  • 操作场景 在程序代码完成开发后,您可以上传至Linux客户端环境中运行应用。使用Scala或Java语言开发的应用程序在Spark客户端的运行步骤是一样的。 使用Python开发的Spark应用程序无需打包成jar,只需将样例工程拷贝到编译机器上即可。 用户需保证worker和driver的Python版本一致,否则将报错:"Python in worker has different version %s than that in driver %s."。 用户需保证Maven已配置华为镜像站中SDK的Maven镜像仓库,具体可参考华为开源镜像配置方式
  • 简介 JD BCS erver是Hive中的HiveServer2的另外一个实现,它底层使用了Spark SQL来处理SQL语句,从而比Hive拥有更高的性能。 JDB CS erver是一个JDBC接口,用户可以通过JDBC连接JDBCServer来访问SparkSQL的数据。JDBCServer在启动的时候,会启动一个sparkSQL的应用程序,而通过JDBC连接进来的客户端共同分享这个sparkSQL应用程序的资源,也就是说不同的用户之间可以共享数据。JDBCServer启动时还会开启一个侦听器,等待JDBC客户端的连接和提交查询。所以,在配置JDBCServer的时候,至少要配置JDBCServer的主机名和端口,如果要使用hive数据的话,还要提供hive metastore的uris。 JDBCServer默认在安装节点上的10000端口起一个JDBC服务,可以通过Beeline或者JDBC客户端代码来连接它,从而执行SQL命令。 如果您需要了解JDBCServer的其他信息,请参见Spark官网:http://spark.apache.org/docs/3.1.1/sql-programming-guide.html#distributed-sql-engine。
  • SparkSQL常用接口 Spark SQL中在Python中重要的类有: pyspark.sql.SQLContext:是Spark SQL功能和DataFrame的主入口。 pyspark.sql.DataFrame:是一个以命名列方式组织的分布式数据集。 pyspark.sql.HiveContext:获取存储在Hive中数据的主入口。 pyspark.sql.DataFrameStatFunctions:统计功能中一些函数。 pyspark.sql.functions:DataFrame中内嵌的函数。 pyspark.sql.Window:sql中提供窗口功能。 表4 Spark SQL常用的Action 方法 说明 collect() 返回一个数组,包含DataFrame的所有列。 count() 返回DataFrame中的行数。 describe() 计算统计信息,包含计数,平均值,标准差,最小值和最大值。 first() 返回第一行。 head(n) 返回前n行。 show() 用表格形式显示DataFrame。 take(num) 返回DataFrame中的前num行。 表5 基本的DataFrame Functions 方法 说明 explain() 打印出SQL语句的逻辑计划和物理计划。 printSchema() 打印schema信息到控制台。 registerTempTable(name) 将DataFrame注册为一张临时表,命名为name,其周期和SQLContext绑定在一起。 toDF() 返回一个列重命名的DataFrame。
  • Spark Streaming常用接口 Spark Streaming中常见的类有: pyspark.streaming.StreamingContext:是Spark Streaming功能的主入口,负责提供创建DStreams的方法,入参中需要设置批次的时间间隔。 pyspark.streaming.DStream:是一种代表RDDs连续序列的数据类型,代表连续数据流。 dsteam.PariDStreamFunctions:键值对的DStream,常见的操作如groupByKey和reduceByKey。 对应的Spark Streaming的JAVA API是JavaStreamingContext,JavaDStream和JavaPairDStream。 Spark Streaming的常见方法与Spark Core类似,下表罗列了Spark Streaming特有的一些方法。 表3 Spark Streaming常用接口介绍 方法 说明 socketTextStream(hostname, port, storageLevel) 从TCP源主机:端口创建一个输入流。 start() 启动Spark Streaming计算。 awaitTermination(timeout) 当前进程等待终止,如Ctrl+C等。 stop(stopSparkContext, stopGraceFully) 终止Spark Streaming计算,stopSparkContext用于判断是否需要终止相关的SparkContext,StopGracefully用于判断是否需要等待所有接受到的数据处理完成。 UpdateStateByKey(func) 更新DStream的状态。使用此方法,需要定义State和状态更新函数。 window(windowLength, slideInterval) 根据源DStream的窗口批次计算得到一个新的DStream。 countByWindow(windowLength, slideInterval) 返回流中滑动窗口元素的个数。 reduceByWindow(func, windowLength, slideInterval) 当调用在DStream的KV对上,返回一个新的DStream的KV对,其中每个Key的Value根据滑动窗口中批次的reduce函数聚合得到。 join(other,numPartitions) 实现不同的Spark Streaming之间做合并操作。
  • Spark Streaming常用接口 Spark Streaming中常见的类有: StreamingContext:是Spark Streaming功能的主入口,负责提供创建DStreams的方法,入参中需要设置批次的时间间隔。 dstream.DStream:是一种代表RDDs连续序列的数据类型,代表连续数据流。 dstream.PariDStreamFunctions:键值对的DStream,常见的操作如groupByKey和reduceByKey。 对应的Spark Streaming的JAVA API是JavaStreamingContext,JavaDStream和JavaPairDStream。 Spark Streaming的常见方法与Spark Core类似,下表罗列了Spark Streaming特有的一些方法。 表4 Spark Streaming方法介绍 方法 说明 socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String] 从TCP源主机:端口创建一个输入流。 start():Unit 启动Spark Streaming计算。 awaitTermination(timeout: long):Unit 当前进程等待终止,如Ctrl+C等。 stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit 终止Spark Streaming计算。 transform[T](dstreams: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) ? RDD[T])(implicit arg0: ClassTag[T]): DStream[T] 对每一个RDD应用function操作得到一个新的DStream。 UpdateStateByKey(func) 更新DStream的状态。使用此方法,需要定义状态和状态更新函数。 window(windowLength, slideInterval) 根据源DStream的窗口批次计算得到一个新的DStream。 countByWindow(windowLength, slideInterval) 返回流中滑动窗口元素的个数。 reduceByWindow(func, windowLength, slideInterval) 当调用在DStream的KV对上,返回一个新的DStream的KV对,其中每个Key的Value根据滑动窗口中批次的reduce函数聚合得到。 join(otherStream, [numTasks]) 实现不同的Spark Streaming之间做合并操作。 DStreamKafkaWriter.writeToKafka() 支持将DStream中的数据批量写入到Kafka。 DStreamKafkaWriter.writeToKafkaBySingle() 支持将DStream中的数据逐条写入到Kafka。 表5 Spark Streaming增强特性接口 方法 说明 DStreamKafkaWriter.writeToKafka() 支持将DStream中的数据批量写入到Kafka。 DStreamKafkaWriter.writeToKafkaBySingle() 支持将DStream中的数据逐条写入到Kafka。
  • SparkSQL常用接口 Spark SQL中常用的类有: SQLContext:是Spark SQL功能和DataFrame的主入口。 DataFrame:是一个以命名列方式组织的分布式数据集。 HiveContext:获取存储在Hive中数据的主入口。 表6 常用的Actions方法 方法 说明 collect(): Array[Row] 返回一个数组,包含DataFrame的所有列。 count(): Long 返回DataFrame中的行数。 describe(cols: String*): DataFrame 计算统计信息,包含计数,平均值,标准差,最小值和最大值。 first(): Row 返回第一行。 Head(n:Int): Row 返回前n行。 show(numRows: Int, truncate: Boolean): Unit 用表格形式显示DataFrame。 take(n:Int): Array[Row] 返回DataFrame中的前n行。 表7 基本的DataFrame Functions 方法 说明 explain(): Unit 打印出SQL语句的逻辑计划和物理计划。 printSchema(): Unit 打印schema信息到控制台。 registerTempTable(tableName: String): Unit 将DataFrame注册为一张临时表,其周期和SQLContext绑定在一起。 toDF(colNames: String*): DataFrame 返回一个列重命名的DataFrame。
  • 问题 使用运行的Spark Streaming任务回写Kafka时,Kafka上接收不到回写的数据,且Kafka日志报错信息如下: 2016-03-02 17:46:19,017 | INFO | [kafka-network-thread-21005-1] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68)2016-03-02 17:46:19,155 | INFO | [kafka-network-thread-21005-2] | Closing socket connection to /10.91.8.208. | kafka.network.Processor (Logging.scala:68)2016-03-02 17:46:19,270 | INFO | [kafka-network-thread-21005-0] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68)2016-03-02 17:46:19,513 | INFO | [kafka-network-thread-21005-1] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68)2016-03-02 17:46:19,763 | INFO | [kafka-network-thread-21005-2] | Closing socket connection to /10.91.8.208 due to invalid request: Request of length 122371301 is not valid, it is larger than the maximum size of 104857600 bytes. | kafka.network.Processor (Logging.scala:68)53393 [main] INFO org.apache.hadoop.mapreduce.Job - Counters: 50
  • 回答 Spark部署时,如下jar包存放在客户端的“${SPARK_HOME}/jars/streamingClient010”目录以及服务端的“${BIGDATA_HOME}/ FusionInsight _Spark2x_8.1.0.1/install/FusionInsight-Spark2x-3.1.1/spark/jars/streamingClient010”目录: kafka-clients-xxx.jar kafka_2.12-xxx.jar spark-streaming-kafka-0-10_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar spark-token-provider-kafka-0-10_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar 由于“$SPARK_HOME/jars/streamingClient010/*”默认没有添加到classpath,所以需要手动配置。 在提交应用程序运行时,在命令中添加如下参数即可,详细示例可参考编包并运行程序。 --jars $SPARK_CLIENT_HOME/jars/streamingClient010/kafka-client-2.4.0.jar,$SPARK_CLIENT_HOME/jars/streamingClient010/kafka_2.12-*.jar,$SPARK_CLIENT_HOME/jars/streamingClient010/spark-streaming-kafka-0-10_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar 用户自己开发的应用程序以及样例工程都可使用上述命令提交。 但是Spark开源社区提供的KafkaWordCount等样例程序,不仅需要添加--jars参数,还需要配置其他,否则会报“ClassNotFoundException”错误,yarn-client和yarn-cluster模式下稍有不同。 yarn-client模式下 在除--jars参数外,在客户端“spark-defaults.conf”配置文件中,将“spark.driver.extraClassPath”参数值中添加客户端依赖包路径,如“$SPARK_HOME/jars/streamingClient010/*”。 yarn-cluster模式下 除--jars参数外,还需要配置其他,有三种方法任选其一即可,具体如下: 在客户端spark-defaults.conf配置文件中,在“spark.yarn.cluster.driver.extraClassPath”参数值中添加服务端的依赖包路径,如“${BIGDATA_HOME}/FusionInsight_Spark2x_8.1.0.1/install/FusionInsight-Spark2x-3.1.1/spark/jars/streamingClient010/*”。 将各服务端节点的“original-spark-examples_2.12-3.1.1-xxx.jar”包删除。 在客户端“spark-defaults.conf”配置文件中,修改或增加配置选项“spark.driver.userClassPathFirst” = “true”。
  • 回答 如下图所示,Spark Streaming应用中定义的逻辑为,从Kafka中读取数据,执行对应处理之后,然后将结果数据回写至Kafka中。 例如:Spark Streming中定义了批次时间,如果数据传入Kafka的速率为10MB/s,而Spark Streaming中定义了每60s一个批次,回写数据总共为600MB。而Kafka中定义了接收数据的阈值大小为500MB。那么此时回写数据已超出阈值。此时,会出现上述错误。 图1 应用场景 解决措施: 方式一:推荐优化Spark Streaming应用程序中定义的批次时间,降低批次时间,可避免超过Kafka定义的阈值。一般建议以5-10秒/次为宜。 方式二:将Kafka的阈值调大,建议在FusionInsight Manager中的Kafka服务进行参数设置,将socket.request.max.bytes参数值根据应用场景,适当调整。
  • 回答 用户尝试收集大量数据到Driver端,如果Driver端的内存不足以存放这些数据,那么就会抛出OOM(OutOfMemory)的异常,然后Driver端一直在进行GC,尝试回收垃圾来存放返回的数据,导致应用长时间挂起。 解决措施: 如果用户需要在OOM场景下强制将应用退出,那么可以在启动Spark Core应用时,在客户端配置文件“$SPARK_HOME/conf/spark-defaults.conf”中的配置项“spark.driver.extraJavaOptions”中添加如下内容: -XX:OnOutOfMemoryError='kill -9 %p'
  • 问题 执行Spark Core应用,尝试收集大量数据到Driver端,当Driver端内存不足时,应用挂起不退出,日志内容如下。 16/04/19 15:56:22 ERROR Utils: Uncaught exception in thread task-result-getter-2java.lang.OutOfMemoryError: Java heap spaceat java.lang.reflect.Array.newArray(Native Method)at java.lang.reflect.Array.newInstance(Array.java:75)at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:71)at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:91)at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:94)at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:66)at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:57)at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:57)at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1716)at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:56)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)Exception in thread "task-result-getter-2" java.lang.OutOfMemoryError: Java heap spaceat java.lang.reflect.Array.newArray(Native Method)at java.lang.reflect.Array.newInstance(Array.java:75)at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1671)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:71)at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:91)at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:94)at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:66)at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:57)at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:57)at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1716)at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:56)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)
  • 回答 导致这个问题的主要原因是,yarn-client和yarn-cluster模式在提交任务时setAppName的执行顺序不同导致,yarn-client中setAppName是在向yarn注册Application之前读取,yarn-cluser模式则是在向yarn注册Application之后读取,这就导致yarn-cluster模式设置的应用名不生效。 解决措施: 在spark-submit脚本提交任务时用--name设置应用名和sparkconf.setAppName(appname)里面的应用名一样。 比如我们代码里设置的应用名为Spark Pi,用yarn-cluster模式提交应用时可以这样设置,在--name后面添加应用名,执行的命令如下: ./spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --name SparkPi jars/original-spark-examples*.jar 10
  • Spark SQL常用接口 Spark SQL中重要的类有: SQLContext:是Spark SQL功能和DataFrame的主入口。 DataFrame:是一个以命名列方式组织的分布式数据集 DataFrameReader:从外部存储系统加载DataFrame的接口。 DataFrameStatFunctions:实现DataFrame的统计功能。 UserDefinedFunction:用户自定义的函数。 常见的Actions方法有: 表6 Spark SQL方法介绍 方法 说明 Row[] collect() 返回一个数组,包含DataFrame的所有列。 long count() 返回DataFrame的行数。 DataFrame describe(java.lang.String... cols) 计算统计信息,包含计数,平均值,标准差,最小值和最大值。 Row first() 返回第一行。 Row[] head(int n) 返回前n行。 void show() 用表格形式显示DataFrame的前20行。 Row[] take(int n) 返回DataFrame中的前n行。 表7 基本的DataFrame Functions介绍 方法 说明 void explain(boolean extended) 打印出SQL语句的逻辑计划和物理计划。 void printSchema() 打印schema信息到控制台。 registerTempTable 将DataFrame注册为一张临时表,其周期和SQLContext绑定在一起。 DataFrame toDF(java.lang.String... colNames) 返回一个列重命名的DataFrame。 DataFrame sort(java.lang.String sortCol,java.lang.String... sortCols) 根据不同的列,按照升序或者降序排序。 GroupedData rollup(Column... cols) 对当前的DataFrame特定列进行多维度的回滚操作。
  • 回答 问题原因: 在IBM JDK下建立的JDBC connection时间超过登录用户的认证超时时间(默认一天),导致认证失败。 IBM JDK的机制跟Oracle JDK的机制不同,IBM JDK在认证登录后的使用过程中做了时间检查却没有检测外部的时间更新,导致即使显式调用relogin也无法得到刷新。 解决措施: 通常情况下,在发现JDBC connection不可用的时候,可以关闭该connection,重新创建一个connection继续执行。
  • 回答 原因分析:显示该异常是因为“recoverFromCheckpointLocation”的值判定为false,但却配置了checkpoint目录。 参数“recoverFromCheckpointLocation”的值为代码中“outputMode == OutputMode.Complete()”语句的判断结果(outputMode的默认输出方式为“append”)。 处理方法:编写应用时,用户可以根据具体情况修改数据的输出方式。 将输出方式修改为“complete”,“recoverFromCheckpointLocation”的值会判定为true。此时配置了checkpoint目录时就不会显示异常。
  • 问题 Structured Streaming的cluster模式,在数据处理过程中终止ApplicationManager,执行应用时显示如下异常。 2017-05-09 20:46:02,393 | INFO | main | client token: Token { kind: YARN_CLIENT_TOKEN, service: } diagnostics: User class threw exception: org.apache.spark.sql.AnalysisException: This query does not support recovering from checkpoint location. Delete hdfs://hacluster/structuredtest/checkpoint/offsets to start over.; ApplicationMaster host: 10.96.101.170 ApplicationMaster RPC port: 0 queue: default start time: 1494333891969 final status: FAILED tracking URL: https://9-96-101-191:8090/proxy/application_1493689105146_0052/ user: spark2x | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)Exception in thread "main" org.apache.spark.SparkException: Application application_1493689105146_0052 finished with failed status
  • 回答 Spark任务在运行过程中,driver会创建一个spark-开头的本地临时目录,用于存放业务jar包,配置文件等,同时在本地创建一个blockmgr-开头的本地临时目录,用于存放block data。此两个目录会在Spark应用运行结束时自动删除。 此两个目录的存放路径优先通过SPARK_LOCAL_DIRS环境变量指定,若不存在该环境变量,则设置为spark.local.dir的值,若此配置还不存在,则使用java.io.tmpdir的值。客户端默认配置中spark.local.dir被设置为/tmp,因此默认使用系统/tmp目录。 但存在一些特殊情况,如driver进程未正常退出,比如被kill -9命令结束进程,或者Java虚拟机直接崩溃等场景,导致driver的退出流程未正常执行,则可能导致该部分目录无法被正常清理,残留在系统中。 当前只有yarn-client模式和local模式的driver进程会产生上述问题,在yarn-cluster模式中,已将container内进程的临时目录设置为container临时目录,当container退出时,由container自动清理该目录,因此yarn-cluster模式不存在此问题。
  • 回答 由于checkpoint中包含了spark应用的对象序列化信息、task执行状态信息、配置信息等,因此,当存在以下问题时,从checkpoint恢复spark应用将会失败。 业务代码变更且变更类未明确指定SerialVersionUID。 spark内部类变更,且变更类未明确指定SerialVersionUID。 另外,由于checkpoint保存了部分配置项,因此可能导致业务修改了部分配置项后,从checkpoint恢复时,配置项依然保持为旧值的情况。当前只有以下部分配置会在从checkpoint恢复时重新加载。 "spark.yarn.app.id", "spark.yarn.app.attemptId", "spark.driver.host", "spark.driver.bindAddress", "spark.driver.port", "spark.master", "spark.yarn.jars", "spark.yarn.keytab", "spark.yarn.principal", "spark.yarn.credentials.file", "spark.yarn.credentials.renewalTime", "spark.yarn.credentials.updateTime", "spark.ui.filters", "spark.mesos.driver.frameworkId", "spark.yarn.jars"
  • Structured Streaming不支持的功能 不支持多个流聚合。 不支持limit、first、take这些取N条Row的操作。 不支持Distinct。 只有当output mode为complete时才支持排序操作。 有条件地支持流和静态数据集之间的外连接。 不支持部分DataSet上立即运行查询并返回结果的操作: count():无法从流式Dataset返回单个计数,而是使用ds.groupBy().count()返回一个包含运行计数的streaming Dataset。 foreach():使用ds.writeStream.foreach(...)代替。 show():使用输出console sink代替。
  • Structured Streaming支持的功能 支持对流式数据的ETL操作。 支持流式DataFrames或Datasets的schema推断和分区。 流式DataFrames或Datasets上的操作:包括无类型,类似SQL的操作(比如select、where、groupBy),以及有类型的RDD操作(比如map、filter、flatMap)。 支持基于Event Time的聚合计算,支持对迟到数据的处理。 支持对流式数据的去除重复数据操作。 支持状态计算。 支持对流处理任务的监控。 支持批流join,流流join。 当前JOIN操作支持列表如下: 左表 右表 支持的Join类型 说明 Static Static 全部类型 即使在流处理中,不涉及流数据的join操作也能全部支持 Stream Static Inner 支持,但是无状态 Left Outer 支持,但是无状态 Right Outer 不支持 Full Outer 不支持 Stream Stream Inner 支持,左右表可选择使用watermark或者时间范围进行状态清理 Left Outer 有条件的支持,左表可选择使用watermark进行状态清理,右表必须使用watermark+时间范围 Right Outer 有条件的支持,右表可选择使用watermark进行状态清理,左表必须使用watermark+时间范围 Full Outer 不支持
  • Structured Streaming可靠性说明 Structured Streaming通过checkpoint和WAL机制,对可重放的sources,以及支持重复处理的幂等性sinks,可以提供端到端的exactly-once容错语义。 用户可在程序中设置option("checkpointLocation", "checkpoint路径")启用checkpoint。 从checkpoint恢复时,应用程序或者配置可能发生变更,有部分变更会导致从checkpoint恢复失败,具体限制如下: 不允许source的个数或者类型发生变化。 source的参数变化,这种情况是否能被支持,取决于source类型和查询语句,例如: 速率控制相关参数的添加、删除和修改,此种情况能被支持,如:spark.readStream.format("kafka").option("subscribe", "topic")变更为spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...) 修改消费的topic/files可能会出现不可预知的问题,如:spark.readStream.format("kafka").option("subscribe", "topic")变更为spark.readStream.format("kafka").option("subscribe", "newTopic") sink的类型发生变化:允许特定的几个sink的组合,具体场景需要验证确认,例如: File sink允许变更为kafka sink,kafka中只处理新数据。 kafka sink不允许变更为file sink。 kafka sink允许变更为foreach sink,反之亦然。 sink的参数变化,这种情况是否能被支持,取决于sink类型和查询语句,例如: 不允许file sink的输出路径发生变更。 允许Kafka sink的输出topic发生变更。 允许foreach sink中的自定义算子代码发生变更,但是变更结果取决于用户代码。 Projection、filter和map-like操作变更,局部场景下能够支持,例如: 支持Filter的添加和删除,如:sdf.selectExpr("a")变更为sdf.where(...).selectExpr("a").filter(...) Output schema相同时,projections允许变更,如:sdf.selectExpr("stringColumn AS json").writeStream变更为sdf.select(to_json(...).as("json")).writeStream Output schema不相同时,projections在部分条件下允许变更,如:sdf.selectExpr("a").writeStream变更为sdf.selectExpr("b").writeStream,只有当sink支持“a”到“b”的schema转换时才不会出错。 状态操作的变更,在部分场景下会导致状态恢复失败: Streaming aggregation:如sdf.groupBy("a").agg(...)操作中,不允许分组键或聚合键的类型或者数量发生变化。 Streaming deduplication:如:sdf.dropDuplicates("a")操作中,不允许分组键或聚合键的类型或者数量发生变化。 Stream-stream join:如sdf1.join(sdf2, ...)操作中,关联键的schema不允许发生变化,join类型不允许发生变化,其他join条件的变更可能导致不确定性结果。 任意状态计算:如sdf.groupByKey(...).mapGroupsWithState(...)或者sdf.groupByKey(...).flatMapGroupsWithState(...)操作中,用户自定义状态的shema或者超时类型都不允许发生变化;允许用户自定义state-mapping函数变化,但是变更结果取决于用户代码;如果需要支持schema变更,用户可以将状态数据编码/解码成二进制数据以支持schema迁移。 Source的容错性支持列表 Sources 支持的Options 容错支持 说明 File source path:必填,文件路径 maxFilesPerTrigger:每次trigger最大文件数(默认无限大) latestFirst:是否有限处理新文件(默认值: false) fileNameOnly:是否以文件名作为新文件校验,而不是使用完整路径进行判断(默认值: false) 支持 支持通配符路径,但不支持以逗号分隔的多个路径。 文件必须以原子方式放置在给定的目录中,这在大多数文件系统中可以通过文件移动操作实现。 Socket Source host:连接的节点ip,必填 port:连接的端口,必填 不支持 - Rate Source rowsPerSecond:每秒产生的行数,默认值1 rampUpTime:在达到rowsPerSecond速度之前的上升时间 numPartitions:生成数据行的并行度 支持 - Kafka Source 参见https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html 支持 - Sink的容错性支持列表 Sinks 支持的output模式 支持Options 容错性 说明 File Sink Append Path:必须指定 指定的文件格式,参见DataFrameWriter中的相关接口 exactly-once 支持写入分区表,按时间分区用处较大 Kafka Sink Append, Update, Complete 参见:https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html at-least-once 参见https://spark.apache.org/docs/3.1.1/structured-streaming-kafka-integration.html Foreach Sink Append, Update, Complete None 依赖于ForeachWriter实现 参见https://spark.apache.org/docs/3.1.1/structured-streaming-programming-guide.html#using-foreach ForeachBatch Sink Append, Update, Complete None 依赖于算子实现 参见https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch Console Sink Append, Update, Complete numRows:每轮打印的行数,默认20 truncate:输出太长时是否清空,默认true 不支持容错 - Memory Sink Append, Complete None 不支持容错,在complete模式下,重启query会重建整个表 -
  • 回答 第三方jar包(例如自定义udf)区分x86和TaiShan版本时,混合使用方案: 进入到服务端Spark2x SparkResource的安装目录(集群安装时,SparkResource可能会安装在多个节点上,登录任意一个SparkResource节点,进入到SparkResource的安装目录)。 准备好自己的jar包,例如xx.jar的x86版本和TaiShan版本。将x86版本和TaiShan版本的xx.jar分别复制到当前目录的x86文件夹和TaiShan文件夹里面。 在当前目录下执行以下命令将jar包打包: zip -qDj spark-archive-2x-x86.zip x86/* zip -qDj spark-archive-2x-arm.zip arm/* 执行以下命令查看hdfs上的spark2x依赖的jar包: hdfs dfs -ls /user/spark2x/jars/8.1.0.1 8.1.0.1是版本号,不同版本不同。 执行以下命令移动hdfs上旧的jar包文件到其他目录,例如移动到“tmp”目录。 hdfs dfs -mv /user/spark2x/jars/8.1.0.1/spark-archive-2x-arm.zip /tmp hdfs dfs -mv /user/spark2x/jars/8.1.0.1/spark-archive-2x-x86.zip /tmp 上传3中打包的spark-archive-2x-arm.zip和spark-archive-2x-x86.zip到hdfs的/user/spark2x/jars/8.1.0.1目录下,上传命令如下: hdfs dfs -put spark-archive-2x-arm.zip /user/spark2x/jars/8.1.0.1/ hdfs dfs -put spark-archive-2x-x86.zip /user/spark2x/jars/8.1.0.1/ 上传完毕后删除本地的spark-archive-2x-arm.zip,spark-archive-2x-x86.zip文件。 对其他的sparkResource安装节点执行1~2。 进入webUI重启spark2x的jdbcServer实例。 重启后,需要更新客户端配置。按照客户端所在的机器类型(x86、TaiShan)复制xx.jar的相应版本到客户端的Spark2x安装目录“${install_home}/Spark2x/spark/jars”文件夹中。${install_home}是用户的客户端安装路径,用户需要填写实际的安装目录;若本地的安装目录为“/opt/hadoopclient”,那么就复制相应版本xx.jar到“/opt/hadoopclient/Spark2x/spark/jars”文件夹里。
  • 解决方案 提交yarn-client模式的结构流任务时需要额外如下操作: 将Spark客户端目录下spark-default.conf文件中的spark.driver.extraClassPath配置复制出来,并将Kafka相关jar包路径追加到该配置项之后,提交结构流任务时需要通过--conf将该配置项给加上。例如:kafka相关jar包路径为“/kafkadir”,提交任务需要增加--conf spark.driver.extraClassPath=/opt/client/Spark2x/spark/conf/:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/x86/*:/kafkadir/*。 提交yarn-cluster模式的结构流任务时需要额外如下操作: 将Spark客户端目录下spark-default.conf文件中的spark.yarn.cluster.driver.extraClassPath配置给复制出来,并将Kafka相关jar包相对路径追加到该配置项之后,提交结构流任务时需要通过--conf 将该配置项给加上。例如:kafka相关包为kafka-clients-x.x.x.jar,kafka_2.11-x.x.x.jar,提交任务需要增加--conf spark.yarn.cluster.driver.extraClassPath=/home/huawei/Bigdata/common/runtime/security:./kafka-clients-x.x.x.jar:./kafka_2.11-x.x.x.jar。 当前版本Spark结构流部分不再支持kafka2.x之前的版本,对于升级场景请继续使用旧的客户端。
  • 数据规划 运行样例程序前,需要在Spark客户端的“spark-defaults.conf”配置文件中将配置项“spark.yarn.security.credentials.hbase.enabled”设置为“true”(该参数值默认为“false”,改为“true”后对已有业务没有影响。如果要卸载HBase服务,卸载前请将此参数值改回“false”)。 创建HBase表,构造数据,列需要包含key,modify_time,valid。其中每条数据key值全表唯一,modify_time代表修改时间,valid代表是否为有效数据(该样例中'1'为有效,'0'为无效数据)。 示例:进入hbase shell,执行如下命令: create 'hbase_table','key','info' put 'hbase_table','1','info:modify_time','2019-11-22 23:28:39' put 'hbase_table','1','info:valid','1' put 'hbase_table','2','info:modify_time','2019-11-22 23:28:39' put 'hbase_table','2','info:valid','1' put 'hbase_table','3','info:modify_time','2019-11-22 23:28:39' put 'hbase_table','3','info:valid','0' put 'hbase_table','4','info:modify_time','2019-11-22 23:28:39' put 'hbase_table','4','info:valid','1' 上述数据的modify_time列可设置为当前时间之前的值。 put 'hbase_table','5','info:modify_time','2021-03-03 15:20:39' put 'hbase_table','5','info:valid','1' put 'hbase_table','6','info:modify_time','2021-03-03 15:20:39' put 'hbase_table','6','info:valid','1' put 'hbase_table','7','info:modify_time','2021-03-03 15:20:39' put 'hbase_table','7','info:valid','0' put 'hbase_table','8','info:modify_time','2021-03-03 15:20:39' put 'hbase_table','8','info:valid','1' put 'hbase_table','4','info:valid','0' put 'hbase_table','4','info:modify_time','2021-03-03 15:20:39' 上述数据的modify_time列可设置为样例程序启动后30分钟内的时间值(此处的30分钟为样例程序默认的同步间隔时间,可修改)。 put 'hbase_table','9','info:modify_time','2021-03-03 15:32:39' put 'hbase_table','9','info:valid','1' put 'hbase_table','10','info:modify_time','2021-03-03 15:32:39' put 'hbase_table','10','info:valid','1' put 'hbase_table','11','info:modify_time','2021-03-03 15:32:39' put 'hbase_table','11','info:valid','0' put 'hbase_table','12','info:modify_time','2021-03-03 15:32:39' put 'hbase_table','12','info:valid','1' 上述数据的modify_time列可设置为样例程序启动后30分钟到60分钟内的时间值,即第二次同步周期。 在sparksql中创建HBase的hive外表,命令如下: create table external_hbase_table(key string ,modify_time STRING, valid STRING) using org.apache.spark.sql.hbase.HBaseSource options(hbaseTableName "hbase_table", keyCols "key", colsMapping "modify_time=info.modify_time,valid=info.valid"); 在sparksql中创建CarbonData表: create table carbon01(key string,modify_time STRING, valid STRING) stored as carbondata; 初始化加载当前hbase表中所有数据到CarbonData表; insert into table carbon01 select * from external_hbase_table where valid='1'; 用spark-submit提交命令: spark-submit --master yarn --deploy-mode client --class com.huawei.bigdata.spark.examples.HBaseExternalHivetoCarbon /opt/example/HBaseExternalHivetoCarbon-1.0.jar
  • 运行任务 登录Spark客户端节点,执行如下命令: source 客户端安装目录/bigdata_env source 客户端安装目录/Hudi/component_env 编译构建样例代码后可以使用spark-submit提交命令,执行命令后会依次执行写入、更新、查询、删除等操作: 运行Java样例程序: spark-submit --class com.huawei.bigdata.hudi.examples.HoodieWriteClientExample /opt/example/hudi-java-examples-1.0.jar hdfs://hacluster/tmp/example/hoodie_java hoodie_java 其中:“/opt/example/hudi-java-examples-1.0.jar”为jar包路径,“hdfs://hacluster/tmp/example/hoodie_java”为Hudi表的存储路径,“ hoodie_java”为Hudi表的表名。 运行Scala样例程序: spark-submit --class com.huawei.bigdata.hudi.examples.HoodieDataSourceExample /opt/example/hudi-scala-examples-1.0.jar hdfs://hacluster/tmp/example/hoodie_scala hoodie_scala 其中:“/opt/example/hudi-scala-examples-1.0.jar”为jar包路径,“hdfs://hacluster/tmp/example/hoodie_scala”为Hudi表的存储路径,“ hoodie_Scala”为Hudi表的表名。 运行Python样例程序: spark-submit /opt/example/HudiPythonExample.py hdfs://hacluster/tmp/huditest/example/python hudi_trips_cow 其中:“hdfs://hacluster/tmp/huditest/example/python”为Hudi表的存储路径,“ hudi_trips_cow”为Hudi表的表名。
  • Client HDFS Client主要包括五种方式:JAVA API、C API、Shell、HTTP REST API、WEB UI五种方式,可参考常用API介绍、Shell命令介绍。 JAVA API 提供HDFS文件系统的应用接口,本开发指南主要介绍如何使用Java API进行HDFS文件系统的应用开发。 C API 提供HDFS文件系统的应用接口,使用C语言开发的用户可参考C接口的描述进行应用开发。 Shell 提供shell命令完成HDFS文件系统的基本操作。 HTTP REST API 提供除Shell、Java API和C API以外的其他接口,可通过此接口监控HDFS状态等信息。 WEB UI 提供Web可视化组件管理界面。
  • NameNode 用于管理文件系统的命名空间、目录结构、元数据信息以及提供备份机制等,分为: Active NameNode:主NameNode,管理文件系统的命名空间、维护文件系统的目录结构树以及元数据信息;记录写入的每个“数据块”与其归属文件的对应关系。 Standby NameNode:备NameNode,与主NameNode中的数据保持同步;随时准备在主NameNode出现异常时接管其服务。
  • Hive简介 Hive是一个开源的,建立在Hadoop上的 数据仓库 框架,提供类似SQL的HQL语言操作结构化数据,其基本原理是将HQL语言自动转换成MapReduce任务或Spark任务,从而完成对Hadoop集群中存储的海量数据进行查询和分析。 Hive主要特点如下: 通过HQL语言非常容易的完成数据提取、转换和加载(ETL)。 通过HQL完成海量结构化数据分析。 灵活的数据存储格式,支持JSON、CSV、TEXTFILE、RCFILE、ORCFILE、SEQUENCEFILE等存储格式,并支持自定义扩展。 多种客户端连接方式,支持JDBC接口。 Hive的主要应用于海量数据的离线分析(如 日志分析 ,集群状态分析)、大规模的数据挖掘(用户行为分析,兴趣分区,区域展示)等场景下。 为保证Hive服务的高可用性、用户数据的安全及访问服务的可控制,在开源社区的Hive-3.1.0版本基础上,Hive新增如下特性: 数据文件加密机制。 开源社区的Hive特性,请参见https://cwiki.apache.org/confluence/display/hive/designdocs。
  • HDFS样例工程介绍 MRS 样例工程获取地址为https://github.com/huaweicloud/huaweicloud-mrs-example,切换分支为与MRS集群相匹配的版本分支,然后下载压缩包到本地后解压,即可获取各组件对应的样例代码工程。 当前MRS提供以下HDFS相关样例工程: 表1 HDFS相关样例工程 样例工程位置 描述 hdfs-example-normal HDFS文件操作的Java示例程序。 本工程主要给出了创建HDFS文件夹、写文件、追加文件内容、读文件和删除文件/文件夹等相关接口操作示例。相关业务场景介绍请参见样例代码说明。 hdfs-c-example HDFS C语言开发代码样例。 本示例提供了基于C语言的HDFS文件系统连接、文件操作如创建文件、读写文件、追加文件、删除文件等。相关业务场景介绍请参见C API。 父主题: 概述
  • 开发流程 开发流程中各阶段的说明如图1和表1所示。 图1 Hive应用程序开发流程 表1 Hive应用开发的流程说明 阶段 说明 参考文档 准备开发环境 在进行应用开发前,需首先准备开发环境,推荐使用Java语言进行开发,使用IntelliJ IDEA工具,同时完成JDK、Maven等初始配置。 准备开发环境 准备连接集群配置文件 应用程序开发或运行过程中,需通过集群相关配置文件信息连接MRS集群,配置文件通常包括集群组件信息文件,可从已创建好的MRS集群中获取相关内容。 用于程序调测或运行的节点,需要与MRS集群内节点网络互通,同时配置hosts域名信息。 准备连接集群配置文件 配置并导入样例工程 Hive提供了不同场景下的多种样例程序,用户可获取样例工程并导入本地开发环境中进行程序学习。 配置并导入样例工程 根据业务场景开发程序 根据实际业务场景开发程序,调用组件接口实现对应功能。 开发程序 编译并运行程序 开发好的程序编译运行,用户可在本地Windows开发环境中进行程序调测运行,也可以将程序编译为Jar包后,提交到Linux节点上运行。 调测程序 父主题: 概述
共100000条