云服务器内容精选

  • 配置参数 安装Spark客户端。 详细操作请参考安装 MRS 客户端。 使用客户端安装用户登录Spark客户端节点。 在Spark客户端的“{客户端安装目录}/Spark/spark/conf/spark-defaults.conf”配置文件中进行设置。 参数 说明 取值示例 spark.sql.streaming.stateStore.providerClass 用于管理有状态流查询中的状态数据的类。此类必须是StateStoreProvider的子类,并且必须具有零参数构造函数。 配置参数值为org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider即可选择RocksDB作为状态后端。 org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
  • 操作场景 状态信息存储在默认的HDFS BackedStateStore,随着状态数据的增多,垃圾回收器需要处理更多的对象,从而增加了垃圾回收的时间和开销,导致JVM GC占用大量时间。可以通过配置参数spark.sql.streaming.stateStore.providerClass,选择RocksDB作为状态后端。 RocksDB是一个嵌入式的键值存储引擎,它将数据存储在本地磁盘上,支持高效的读写操作。具有可定制的内存管理和压缩算法,能根据不同的应用场景进行优化。常用于存储大量的结构化或半结构化数据,能有效处理大规模的状态数据。
  • 回答 问题原因: ApplicationMaster进程中有1个Credential Refresh Thread会根据token renew周期 * 0.75的时间比例上传更新后的Credential文件到HDFS上。 Executor进程中有1个Credential Refresh Thread会根据token renew周期 *0.8的时间比例去HDFS上获取更新后的Credential文件,用来刷新UserGroupInformation中的token,避免token失效。 当Executor进程的Credential Refresh Thread发现当前时间已经超过Credential文件更新时间(即token renew周期 *0.8)时,会等待1分钟再去HDFS上面获取最新的Credential文件,以确保AM端已经将更新后的Credential文件放到HDFS上。 当“dfs.namenode.delegation.token.renew-interval”配置值小于60秒,Executor进程起来时发现当前时间已经超过Credential文件更新时间,等待1分钟再去HDFS上面获取最新的Credential文件,而此时token已经失效,task运行失败,然后在其他Executor上重试,由于重试时间都是在1分钟内完成,所以task在其他Executor上也运行失败,导致运行失败的Executor加入到黑名单,没有可用的Executor,应用退出。 修改方案: 在Spark使用场景下,需设置“dfs.namenode.delegation.token.renew-interval”大于80秒。“dfs.namenode.delegation.token.renew-interval”参数描述请参表1考。 表1 参数说明 参数 描述 取值示例 dfs.namenode.delegation.token.renew-interval 该参数为服务器端参数,设置token renew的时间间隔,单位为毫秒。 86400000
  • 数据规划 Spark Streaming样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户)。 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。 本地新建文件“input_data1.txt”,将“log1.txt”的内容复制保存到“input_data1.txt”。 在客户端安装节点下创建文件目录:“/home/data”。将上述文件上传到此“/home/data”目录下。 将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”。 创建Topic。 {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。 $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic} 启动Kafka的Producer,向Kafka发送数据。 java -cp {ClassPath} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic} 其中,ClassPath除样例jar包路径外,还应包含Spark客户端Kafka jar包的绝对路径,例如:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/*:{ClassPath}
  • 打包项目 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中编包并运行Spark程序。 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/female/user.keytab”,“/opt/female/krb5.conf”。 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt” )下。
  • 数据规划 Spark Streaming样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有kafka权限用户)。 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。 本地新建文件“input_data1.txt”,将“log1.txt”的内容复制保存到“input_data1.txt”。 在客户端安装节点下创建文件目录:“/home/data”。将上述文件上传到此“/home/data”目录下。 将kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”(普通集群不需配置)。 创建Topic。 {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。 $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic} 启动Kafka的Producer,向Kafka发送数据。 java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:$KAFKA_HOME/libs/*:{JAR_PATH} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic} JAR_PATH为程序jar包所在路径,BrokerList格式为brokerIp:9092。 需要修改程序SecurityKafkaWordCount类中kerberos.domain.name的值为$KAFKA_HOME/config/consumer.properties文件中kerberos.domain.name配置项的值。 若用户需要对接安全Kafka,则还需要在spark客户端的conf目录下的“jaas.conf”文件中增加“KafkaClient”的配置信息,示例如下: KafkaClient {com.sun.security.auth.module.Krb5LoginModule requireduseKeyTab=truekeyTab = "./user.keytab"principal="leoB@HADOOP.COM"useTicketCache=falsestoreKey=truedebug=true;}; 在Spark on YARN模式下,jaas.conf和user.keytab通过YARN分发到Spark on YARN的container目录下,因此KafkaClient中对于“keyTab”的配置路径必须为相对jaas.conf的所在路径,例如“./user.keytab”。principal修改为自己创建的用户名及集群 域名
  • 场景说明 假定用户有某个周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Spark应用程序实现如下功能: 实时统计连续网购时间超过半个小时的女性网民信息。 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志 LiuYang,female,20YuanJing,male,10GuoYijun,male,5CaiXuyu,female,50Liyuan,male,20FangBo,female,50LiuYang,female,20YuanJing,male,10GuoYijun,male,50CaiXuyu,female,50FangBo,female,60 log2.txt:周日网民停留日志 LiuYang,female,20YuanJing,male,10CaiXuyu,female,50FangBo,female,50GuoYijun,male,5CaiXuyu,female,50Liyuan,male,20CaiXuyu,female,50FangBo,female,50LiuYang,female,20YuanJing,male,10FangBo,female,50GuoYijun,male,50CaiXuyu,female,50FangBo,female,60
  • 数据规划 Spark Streaming样例工程的数据存储在Kafka组件中(需要有Kafka权限用户)。 本地新建两个文本文件input_data1.txt和input_data2.txt,将log1.txt的内容复制保存到input_data1.txt,将log2.txt的内容复制保存到input_data2.txt。 在客户端安装节点下创建文件目录:“/home/data”。将上述两个文件上传到此“/home/data”目录下。 将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”值设置为“true”(普通集群不需配置)。 启动样例代码的Producer,向Kafka发送数据。 java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient/*:{JAR_PATH} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic} JAR_PATH为程序jar包所在路径。 brokerlist格式为brokerIp:9092。
  • 配置场景 当Spark Streaming应用与Kafka对接,Spark Streaming应用异常终止并从checkpoint恢复重启后,对于进入Kafka数据的任务,系统默认优先处理应用终止前(A段时间)未完成的任务和应用终止到重启完成这段时间内(B段时间)进入Kafka数据生成的任务,最后再处理应用重启完成后(C段时间)进入Kafka数据生成的任务。并且对于B段时间进入Kafka的数据,Spark将按照终止时间(batch时间)生成相应个数的任务,其中第一个任务读取全部数据,其余任务可能不读取数据,造成任务处理压力不均匀。 若A段时间的任务和B段时间任务处理得较慢,则会影响C段时间任务的处理。针对上述场景,Spark提供Kafka后进先出功能。 图1 Spark Streaming应用重启时间轴 开启此功能后,Spark将优先调度C段时间内的任务,若存在多个C段任务,则按照任务产生的先后顺序调度执行,再执行A段时间和B段时间的任务。另外,对于B段时间进入Kafka的数据,Spark除了按照终止时间生成相应任务,还将这个期间进入Kafka的所有数据均匀分配到各个任务,避免任务处理压力不均匀。 约束条件: 目前该功能只适用于Spark Streaming中的Direct方式,且执行结果与上一个batch时间处理结果没有依赖关系(即无state操作,如updatestatebykey)。对多条数据输入流,需要相对独立无依赖的状态,否则可能导致数据切分后结果发生变化。 Kafka后进先出功能的开启要求应用只能对接Kafka输入源。 若提交应用的同时开启Kafka后进先出和流控功能,对于B段时间进入Kafka的数据,将不启动流控功能,以确保读取这些数据的任务调度优先级最低。应用重新启动后C段时间的任务启用流控功能。
  • 配置描述 在Spark Driver端的“spark-defaults.conf”配置文件中进行设置。 表1 参数说明 参数 说明 默认值 spark.streaming.kafka.direct.lifo 配置是否开启Kafka后进先出功能。 false spark.streaming.kafka010.inputstream.class 获取解耦在 FusionInsight 侧的类 org.apache.spark.streaming.kafka010.HWDirectKafkaInputDStream
  • 回答 Streaming Context启动时,若应用设置了checkpoint,则需要对应用中的DStream checkpoint对象进行序列化,序列化时会用到dstream.context。 dstream.context是Streaming Context启动时从output Streams反向查找所依赖的DStream,逐个设置context。若Spark Streaming应用创建1个输入流,但该输入流无输出逻辑时,则不会给它设置context。所以在序列化时报“NullPointerException”。 解决办法:应用中如果有无输出逻辑的输入流,则在代码中删除该输入流,或添加该输入流的相关输出逻辑。
  • 问题 Spark Streaming应用创建1个输入流,但该输入流无输出逻辑。应用从checkpoint恢复启动失败,报错如下: 17/04/24 10:13:57 ERROR Utils: Exception encounteredjava.lang.NullPointerExceptionat org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply$mcV$sp(DStreamCheckpointData.scala:125)at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:123)at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:123)at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1195)at org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DStreamCheckpointData.scala:123)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:515)at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:510)at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:510)at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1195)at org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:510)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:191)at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:186)at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:186)at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1195)at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:186)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:142)at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:142)at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:142)at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1230)at org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:143)at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:566)at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:612)at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:611)at com.spark.test.kafka08LifoTwoInkfk$.main(kafka08LifoTwoInkfk.scala:21)at com.spark.test.kafka08LifoTwoInkfk.main(kafka08LifoTwoInkfk.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:772)at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:183)at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:208)at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:123)at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
提示

您即将访问非华为云网站,请注意账号财产安全