华为云用户手册

  • 操作步骤 Spark CBO的设计思路是,基于表和列的统计信息,对各个操作算子(Operator)产生的中间结果集大小进行估算,最后根据估算的结果来选择最优的执行计划。 设置配置项。 在“spark-defaults.conf”配置文件中增加配置项“spark.sql.cbo”,将其设置为true,默认为false。 在客户端执行SQL语句set spark.sql.cbo=true进行配置。 执行统计信息生成命令,得到统计信息。 此步骤只需在运行所有SQL前执行一次。如果数据集发生了变化(插入、更新或删除),为保证CBO的优化效果,需要对有变化的表或者列再次执行统计信息生成命令重新生成统计信息,以得到最新的数据分布情况。 表:执行COMPUTE STATS FOR TABLE src命令计算表的统计信息,统计信息包括记录条数、文件数和物理存储总大小。 列: 执行COMPUTE STATS FOR TABLE src ON COLUMNS命令计算所有列的统计信息。 执行COMPUTE STATS FOR TABLE src ON COLUMNS name,age命令计算表中name和age两个字段的统计信息。 当前列的统计信息支持四种类型:数值类型、日期类型、时间类型和字符串类型。对于数值类型、日期类型和时间类型,统计信息包括:Max、Min、不同值个数(Number of Distinct Value,NDV)、空值个数(Number of Null)和Histogram(支持等宽、等高直方图);对于字符串类型,统计信息包括:Max、Min、Max Length、Average Length、不同值个数(Number of Distinct Value,NDV)、空值个数(Number of Null)和Histogram(支持等宽直方图)。 CBO调优 自动优化:用户根据自己的业务场景,输入SQL语句查询,程序会自动去判断输入的SQL语句是否符合优化的场景,从而自动选择Join优化算法。 手动优化:用户可以通过DESC FORMATTED src命令查看统计信息,根据统计信息的分布,人工优化SQL语句。
  • 操作步骤 配置Driver内存。 Driver负责任务的调度,和Executor、AM之间的消息通信。当任务数变多,任务平行度增大时,Driver内存都需要相应增大。 您可以根据实际任务数量的多少,为Driver设置一个合适的内存。 将“spark-defaults.conf”中的“spark.driver.memory”配置项或者“spark-env.sh”中的“SPARK_DRIVER_MEMORY”配置项设置为合适大小。 在使用spark-submit命令时,添加“--driver-memory MEM”参数设置内存。 配置Executor个数。 每个Executor每个核同时能跑一个task,所以增加了Executor的个数相当于增大了任务的并发度。在资源充足的情况下,可以相应增加Executor的个数,以提高运行效率。 将“spark-defaults.conf”中的“spark.executor.instance”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_INSTAN CES ”配置项设置为合适大小。您还可以设置动态资源调度功能进行优化。 在使用spark-submit命令时,添加“--num-executors NUM”参数设置Executor个数。 配置Executor核数。 每个Executor多个核同时能跑多个task,相当于增大了任务的并发度。但是由于所有核共用Executor的内存,所以要在内存和核数之间做好平衡。 将“spark-defaults.conf”中的“spark.executor.cores”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_CORES”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-cores NUM”参数设置核数。 配置Executor内存。 Executor的内存主要用于任务执行、通信等。当一个任务很大的时候,可能需要较多资源,因而内存也可以做相应的增加;当一个任务较小运行较快时,就可以增大并发度减少内存。 将“spark-defaults.conf”中的“spark.executor.memory”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_MEMORY”配置项设置为合适大小。 在使用spark-submit命令时,添加“--executor-memory MEM”参数设置内存。
  • 操作场景 Spark on YARN模式下,有Driver、ApplicationMaster、Executor三种进程。在任务调度和运行的过程中,Driver和Executor承担了很大的责任,而ApplicationMaster主要负责container的启停。 因而Driver和Executor的参数配置对spark应用的执行有着很大的影响意义。用户可通过如下操作对Spark集群性能做优化。
  • 示例 在执行spark wordcount计算中。1.6T数据,250个executor。 在默认参数下执行失败,出现Futures timed out和OOM错误。 因为数据量大,task数多,而wordcount每个task都比较小,完成速度快。当task数多时driver端相应的一些对象就变大了,而且每个task完成时executor和driver都要通信,这就会导致由于内存不足,进程之间通信断连等问题。 当把Driver的内存设置到4g时,应用成功跑完。 使用ThriftServer执行TPC-DS测试套,默认参数配置下也报了很多错误:Executor Lost等。而当配置Driver内存为30g,executor核数为2,executor个数为125,executor内存为6g时,所有任务才执行成功。
  • 配置文件介绍 登录HDFS时会使用到如表1所示的配置文件。这些文件均已导入到“hdfs-example”工程的“conf”目录。 表1 配置文件 文件名称 作用 获取地址 core-site.xml 配置HDFS详细参数。 MRS _Services_ClientConfig\HDFS\config\core-site.xml hdfs-site.xml 配置HDFS详细参数。 MRS_Services_ClientConfig\HDFS\config\hdfs-site.xml user.keytab 对于Kerberos安全认证提供HDFS用户信息。 如果是安全模式集群,您可以联系管理员获取相应账号对应权限的keytab文件和krb5文件。 krb5.conf Kerberos server配置信息。 不同集群的“user.keytab”、“krb5.conf”不能共用。 “conf”目录下的“log4j.properties”文件可根据自己的需要进行配置。
  • 代码样例 如下是代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsMain类。 在Linux客户端运行应用的初始化代码,代码样例如下所示。 /** * 初始化,获取一个FileSystem实例 * * @throws IOException */ private void init() throws IOException { confLoad(); authentication(); instanceBuild(); } /** * * 如果程序运行在Linux上,则需要core-site.xml、hdfs-site.xml的路径, * 修改为在Linux下客户端文件的绝对路径。 * */ private void confLoad() throws IOException { conf = new Configuration(); // conf file conf.addResource(new Path(PATH_TO_HDFS_SITE_XML)); conf.addResource(new Path(PATH_TO_CORE_SITE_XML)); } /** * kerberos security authentication * 如果程序运行在Linux上,则需要krb5.conf和keytab文件的路径, * 修改为在Linux下客户端文件的绝对路径。并且需要将样例代码中的keytab文件和principal文件 * 分别修改为当前用户的keytab文件名和用户名。 * */ private void authentication() throws IOException { // 安全模式 if ("kerberos".equalsIgnoreCase(conf.get("hadoop.security.authentication"))) { System.setProperty("java.security.krb5.conf", PATH_TO_KRB5_CONF); LoginUtil.login(PRNCIPAL_NAME, PATH_TO_KEYTAB, PATH_TO_KRB5_CONF, conf); } } /** * build HDFS instance */ private void instanceBuild() throws IOException { // get filesystem fSystem = FileSystem.get(conf); }
  • 场景说明 假定用户有某个周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Spark应用程序实现如下功能: 实时统计连续网购时间超过半个小时的女性网民信息。 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志 LiuYang,female,20YuanJing,male,10GuoYijun,male,5CaiXuyu,female,50Liyuan,male,20FangBo,female,50LiuYang,female,20YuanJing,male,10GuoYijun,male,50CaiXuyu,female,50FangBo,female,60 log2.txt:周日网民停留日志 LiuYang,female,20YuanJing,male,10CaiXuyu,female,50FangBo,female,50GuoYijun,male,5CaiXuyu,female,50Liyuan,male,20CaiXuyu,female,50FangBo,female,50LiuYang,female,20YuanJing,male,10FangBo,female,50GuoYijun,male,50CaiXuyu,female,50FangBo,female,60
  • 数据规划 Spark Streaming样例工程的数据存储在Kafka组件中(需要有Kafka权限用户)。 本地新建两个文本文件input_data1.txt和input_data2.txt,将log1.txt的内容复制保存到input_data1.txt,将log2.txt的内容复制保存到input_data2.txt。 在客户端安装节点下创建文件目录:“/home/data”。将上述两个文件上传到此“/home/data”目录下。 将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”值设置为“true”(普通集群不需配置)。 启动样例代码的Producer,向Kafka发送数据。 java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient/*:{JAR_PATH} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic} JAR_PATH为程序jar包所在路径。 brokerlist格式为brokerIp:9092。
  • 开发思路 数据准备。 创建三张表,雇员信息表“employees_info”、雇员联络信息表“employees_contact”、雇员信息扩展表“employees_info_extended”。 雇员信息表“employees_info”的字段为雇员编号、姓名、支付薪水币种、薪水金额、缴税税种、工作地、入职时间,其中支付薪水币种“R”代表人民币,“D”代表美元。 雇员联络信息表“employees_contact”的字段为雇员编号、电话号码、e-mail。 雇员信息扩展表“employees_info_extended”的字段为雇员编号、姓名、电话号码、e-mail、支付薪水币种、薪水金额、缴税税种、工作地,分区字段为入职时间。 创建表代码实现请见创建Hive表。 加载雇员信息数据到雇员信息表“employees_info”中。 加载数据代码实现请见加载Hive数据。 雇员信息数据如表1所示。 表1 雇员信息数据 编号 姓名 支付薪水币种 薪水金额 缴税税种 工作地 入职时间 1 Wang R 8000.01 personal income tax&0.05 China:Shenzhen 2014 3 Tom D 12000.02 personal income tax&0.09 America:NewYork 2014 4 Jack D 24000.03 personal income tax&0.09 America:Manhattan 2014 6 Linda D 36000.04 personal income tax&0.09 America:NewYork 2014 8 Zhang R 9000.05 personal income tax&0.05 China:Shanghai 2014 加载雇员联络信息数据到雇员联络信息表“employees_contact”中。 雇员联络信息数据如表2所示。 表2 雇员联络信息数据 编号 电话号码 e-mail 1 135 XXXX XXXX xxxx@xx.com 3 159 XXXX XXXX xxxxx@xx.com.cn 4 186 XXXX XXXX xxxx@xx.org 6 189 XXXX XXXX xxxx@xxx.cn 8 134 XXXX XXXX xxxx@xxxx.cn 数据分析。 数据分析代码实现,请见查询Hive数据。 查看薪水支付币种为美元的雇员联系方式。 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中。 统计表employees_info中有多少条记录。 查询使用以“cn”结尾的邮箱的员工信息。 提交数据分析任务,统计表employees_info中有多少条记录。实现请见分析Hive数据。
  • 功能介绍 本小节介绍了如何使用HQL创建内部表、外部表的基本操作。创建表主要有以下三种方式。 自定义表结构,以关键字EXTERNAL区分创建内部表和外部表。 内部表,如果对数据的处理都由Hive完成,则应该使用内部表。在删除内部表时,元数据和数据一起被删除。 外部表,如果数据要被多种工具共同处理,则应该使用外部表,可避免对该数据的误操作。删除外部表时,只删除掉元数据。 根据已有表创建新表,使用CREATE LIKE句式,完全复制原有的表结构,包括表的存储格式。 根据查询结果创建新表,使用CREATE AS SELECT句式。 这种方式比较灵活,可以在复制原表表结构的同时指定要复制哪些字段,不包括表的存储格式。
  • 样例代码 -- 查看薪水支付币种为美元的雇员联系方式. SELECT a.name, b.tel_phone, b.email FROM employees_info a JOIN employees_contact b ON(a.id = b.id) WHERE usd_flag='D'; -- 查询入职时间为2014年的雇员编号、姓名等字段,并将查询结果加载进表employees_info_extended中的入职时间为2014的分区中. INSERT OVERWRITE TABLE employees_info_extended PARTITION (entrytime = '2014') SELECT a.id, a.name, a.usd_flag, a.salary, a.deductions, a.address, b.tel_phone, b.email FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE a.entrytime = '2014'; -- 使用Hive中已有的函数COUNT(),统计表employees_info中有多少条记录. SELECT COUNT(*) FROM employees_info; -- 查询使用以“cn”结尾的邮箱的员工信息. SELECT a.name, b.tel_phone FROM employees_info a JOIN employees_contact b ON (a.id = b.id) WHERE b.email like '%cn';
  • 代码样例 如下是删除文件的代码片段,详细代码请参考com.huawei.bigdata.hdfs.examples中的HdfsMain类。 /** * 删除文件 * * @throws IOException */ private void delete() throws IOException { Path beDeletedPath = new Path(DEST_PATH + File.separator + FILE_NAME); fSystem.deleteOnExit(beDeletedPath); System.out.println("succeed to delete the file " + DEST_PATH + File.separator + FILE_NAME); }
  • 扩展使用 配置Hive中间过程的 数据加密 指定表的格式为RCFile(推荐使用)或SequenceFile,加密算法为ARC4Codec。SequenceFile是Hadoop特有的文件格式,RCFile是Hive优化的文件格式。RCFile优化了列存储,在对大表进行查询时,综合性能表现比SequenceFile更优。 set hive.exec.compress.output=true; set hive.exec.compress.intermediate=true; set hive.intermediate.compression.codec=org.apache.hadoop.io.encryption.arc4.ARC4Codec; 自定义函数,具体内容请参见开发Hive用户自定义函数。
  • Hive应用开发环境简介 在进行应用开发时,要准备的本地开发环境如表1所示。同时需要准备运行调测的Linux环境,用于验证应用程序运行正常。 表1 开发环境 准备项 说明 操作系统 开发环境:Windows系统,推荐Windows7以上版本。 运行环境:Linux系统。 安装JDK 开发和运行环境的基本配置。版本要求如下: MRS集群的服务端和客户端仅支持自带的Oracle JDK(版本为1.8),不允许替换。 对于客户应用需引用SDK类的Jar包运行在客户应用进程中的,支持Oracle JDK和IBM JDK。 Oracle JDK:支持1.7和1.8版本。 IBM JDK:推荐1.7.8.10、1.7.9.40和1.8.3.0版本。 说明: 在HCatalog的开发环境中,基于安全考虑,MRS服务端只支持TLS 1.1和TLS 1.2加密协议,IBM JDK默认TLS只支持1.0,若使用IBM JDK,请配置启动参数“com.ibm.jsse2.overrideDefaultTLS”为“true”,设置后可以同时支持TLS1.0/1.1/1.2。 详情请参见:https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.security.component.80.doc/security-component/jsse2Docs/matchsslcontext_tls.html#matchsslcontext_tls。 安装和配置Eclipse 用于开发Hive应用程序的工具。版本要求如下: JDK使用1.7版本,Eclipse使用3.7.1及以上版本。 JDK使用1.8版本,Eclipse使用4.3.2及以上版本。 说明: 若使用IBM JDK,请确保Eclipse中的JDK配置为IBM JDK。 若使用Oracle JDK,请确保Eclipse中的JDK配置为Oracle JDK。 不同的Eclipse不要使用相同的workspace和相同路径下的示例工程。 网络 确保客户端与Hive服务主机在网络上互通。 父主题: 准备Hive应用开发环境
  • 提交SparkLauncher应用程序 在工程目录下执行mvn package命令生成jar包,在工程目录target目录下获取,比如:FemaleInfoCollection.jar 将生成的Jar包(如CollectFemaleInfo.jar)拷贝到Spark运行环境下(即Spark客户端),如“/opt/female”。开启Kerberos认证的安全集群下把从准备Spark应用开发用户中获取的user.keytab和krb5.conf文件拷贝到Spark客户端conf目录下,如:/opt/client/Spark/spark/conf;未开启Kerberos认证集群可不必拷贝user.keytab和krb5.conf文件。 提交SparkLauncher应用程序。 在Spark任务运行过程中禁止重启HDFS服务或者重启所有DataNode实例,否则可能会导致任务失败,并可能导致JobHistory部分数据丢失。 运行程序时可根据需要选择运行模式: --deploy-mode client:driver进程在客户端运行,运行结果在程序运行后直接输出。 --deploy-mode cluster:driver进程在Yarn的ApplicationMaster(AM)中运行,运行结果和日志在Yarn的WebUI界面输出。 java -cp $SPARK_HOME/jars/*:{JAR_PATH} com.huawei.bigdata.spark.examples.SparkLauncherExample yarn-client {TARGET_JAR_PATH} { TARGET_JAR_MAIN_CLASS} {args} JAR_PATH为SparkLauncher应用程序jar包所在路径。 TARGET_JAR_PATH为待提交的spark application应用程序jar包所在路径。 args为待提交的spark application应用程序的参数。
  • 运行“通过JDBC访问Spark SQL”样例程序 在工程目录下执行mvn package命令生成jar包,在工程目录target目录下获取,比如:FemaleInfoCollection.jar 将生成的Jar包(如CollectFemaleInfo.jar)拷贝到Spark运行环境下(即Spark客户端),如“/opt/female”。开启Kerberos认证的安全集群下把从准备Spark应用开发用户中获取的user.keytab和krb5.conf文件拷贝到Spark客户端conf目录下,如:/opt/client/Spark/spark/conf;未开启Kerberos认证集群可不必拷贝user.keytab和krb5.conf文件。 运行“通过JDBC访问Spark SQL”样例程序(Scala和Java语言)。 在Spark任务运行过程中禁止重启HDFS服务或者重启所有DataNode实例,否则可能会导致任务失败,并可能导致JobHistory部分数据丢失。 运行程序时可根据需要选择运行模式: --deploy-mode client:driver进程在客户端运行,运行结果在程序运行后直接输出。 --deploy-mode cluster:driver进程在Yarn的ApplicationMaster(AM)中运行,运行结果和日志在Yarn的WebUI界面输出。 进入Spark客户端目录,使用java -cp命令运行代码。 java -cp ${SPARK_HOME}/jars/*:${SPARK_HOME}/conf:/opt/female/SparkThriftServerJavaExample-*.jar com.huawei.bigdata.spark.examples.ThriftServerQueriesTest ${SPARK_HOME}/conf/hive-site.xml ${SPARK_HOME}/conf/spark-defaults.conf 普通集群需要注释掉安全配置部分代码,详情请参见2和2。 上面的命令行中,您可以根据不同样例工程,最小化选择其对应的运行依赖包。样例工程对应的运行依赖包详情,请参见1。
  • Spark Streaming常用接口 Spark Streaming中常见的类有: StreamingContext:是Spark Streaming功能的主入口,负责提供创建DStreams的方法,入参中需要设置批次的时间间隔。 dstream.DStream:是一种代表RDDs连续序列的数据类型,代表连续数据流。 dstream.PariDStreamFunctions:键值对的DStream,常见的操作如groupByKey和reduceByKey。 对应的Spark Streaming的JAVA API是JavaSteamingContext,JavaDStream和JavaPairDStream。 Spark Streaming的常见方法与Spark Core类似,下表罗列了Spark Streaming特有的一些方法。 表3 Spark Streaming方法介绍 方法 说明 socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String] 用TCP协议(源主机:端口)创建一个输入流。 start():Unit 启动Streaming计算。 awaitTermination(timeout: long):Unit 当前进程等待终止,如Ctrl+C等。 stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit 终止Streaming计算。 transform[T](dstreams: Seq[DStream[_]], transformFunc: (Seq[RDD[_]], Time) ? RDD[T])(implicit arg0: ClassTag[T]): DStream[T] 对每一个RDD应用function操作得到一个新的DStream。 UpdateStateByKey(func) 更新DStream的状态。使用此方法,需要定义状态和状态更新函数。 window(windowLength, slideInterval) 根据源DStream的窗口批次计算得到一个新的DStream。 countByWindow(windowLength, slideInterval) 返回流中滑动窗口元素的个数。 reduceByWindow(func, windowLength, slideInterval) 当调用在DStream的KV对上,返回一个新的DStream的KV对,其中每个Key的Value根据滑动窗口中批次的reduce函数聚合得到。 join(otherStream, [numTasks]) 实现不同的Spark Streaming之间做合并操作。 DStreamKafkaWriter.writeToKafka() 支持将DStream中的数据批量写入到Kafka。 DStreamKafkaWriter.writeToKafkaBySingle() 支持将DStream中的数据逐条写入到Kafka。 表4 Streaming增强特性接口 方法 说明 DStreamKafkaWriter.writeToKafka() 支持将DStream中的数据批量写入到Kafka。 DStreamKafkaWriter.writeToKafkaBySingle() 支持将DStream中的数据逐条写入到Kafka。
  • SparkSQL常用接口 Spark SQL中常用的类有: SQLContext:是Spark SQL功能和DataFrame的主入口。 DataFrame:是一个以命名列方式组织的分布式数据集。 HiveContext:获取存储在Hive中数据的主入口。 表5 常用的Actions方法 方法 说明 collect(): Array[Row] 返回一个数组,包含DataFrame的所有列。 count(): Long 返回DataFrame中的行数。 describe(cols: String*): DataFrame 计算统计信息,包含计数,平均值,标准差,最小值和最大值。 first(): Row 返回第一行。 Head(n:Int): Row 返回前n行。 show(numRows: Int, truncate: Boolean): Unit 用表格形式显示DataFrame。 take(n:Int): Array[Row] 返回DataFrame中的前n行。 表6 基本的DataFrame Functions 方法 说明 explain(): Unit 打印出SQL语句的逻辑计划和物理计划。 printSchema(): Unit 打印schema信息到控制台。 registerTempTable(tableName: String): Unit 将DataFrame注册为一张临时表,其周期和SQLContext绑定在一起。 toDF(colNames: String*): DataFrame 返回一个列重命名的DataFrame。
  • Eclipse代码样例 创建Topology。 private static final String DEFAULT_FS_URL = "obs://mybucket";public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); // 分隔符格式,当前采用“|”代替默认的“,”对tuple中的field进行分隔 // HdfsBolt必选参数 RecordFormat format = new DelimitedRecordFormat() .withFieldDelimiter("|"); // 同步策略,每1000个tuple对文件系统进行一次同步 // HdfsBolt必选参数 SyncPolicy syncPolicy = new CountSyncPolicy(1000); // 文件大小循环策略,当文件大小到达5M时,从头开始写 // HdfsBolt必选参数 FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.KB); // 写入HDFS的目的文件 // HdfsBolt必选参数 FileNameFormat fileNameFormat = new DefaultFileNameFormat() .withPath("/user/foo/"); //创建HdfsBolt HdfsBolt bolt = new HdfsBolt() .withFsUrl(DEFAULT_FS_URL) .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy); //Spout生成随机语句 builder.setSpout("spout", new RandomSentenceSpout(), 1); builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout"); builder.setBolt("count", bolt, 1).fieldsGrouping("split", new Fields("word")); Config conf = new Config(); //命令行提交拓扑 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); }
  • 部署运行及结果查看 在Storm示例代码根目录执行如下命令打包:"mvn package"。执行成功后,将会在target目录生成storm-examples-1.0.jar。 执行命令提交拓扑。 提交命令示例(拓扑名为obs-test)。 storm jar /opt/jartarget/storm-examples-1.0.jar com.huawei.storm.example.obs.SimpleOBSTopology obs://my-bucket obs-test 拓扑提交成功后请登录OBS Browser查看。
  • Spark Streaming常用接口 Spark Streaming中常见的类有: pyspark.streaming.StreamingContext:是Spark Streaming功能的主入口,负责提供创建DStreams的方法,入参中需要设置批次的时间间隔。 pyspark.streaming.DStream:是一种代表RDDs连续序列的数据类型,代表连续数据流。 dstream.PariDStreamFunctions:键值对的DStream,常见的操作如groupByKey和reduceByKey。 对应的Spark Streaming的JAVA API是JavaSteamingContext,JavaDStream和JavaPairDStream。 Spark Streaming的常见方法与Spark Core类似,下表罗列了Spark Streaming特有的一些方法。 表3 Spark Streaming常用接口介绍 方法 说明 socketTextStream(hostname, port, storageLevel) 从TCP源主机:端口创建一个输入流。 start() 启动Streaming计算。 awaitTermination(timeout) 当前进程等待终止,如Ctrl+C等。 stop(stopSparkContext, stopGraceFully) 终止Streaming计算,stopSparkContext用于判断是否需要终止相关的SparkContext,StopGracefully用于判断是否需要等待所有接收到的数据处理完成。 UpdateStateByKey(func) 更新DStream的状态。使用此方法,需要定义State和状态更新函数。 window(windowLength, slideInterval) 根据源DStream的窗口批次计算得到一个新的DStream。 countByWindow(windowLength, slideInterval) 返回流中滑动窗口元素的个数。 reduceByWindow(func, windowLength, slideInterval) 当调用在DStream的KV对上,返回一个新的DStream的KV对,其中每个Key的Value根据滑动窗口中批次的reduce函数聚合得到。 join(other,numPartitions) 实现不同的Spark Streaming之间做合并操作。
  • 代码样例 以下代码片段在com.huawei.bigdata.hbase.examples包的“HIndexExample”类的scanDataByHIndex方法中。 样例代码获取方式请参考获取MRS应用开发样例工程。 代码样例: public void scanDataByHIndex() { LOG .info("Entering HIndex-based Query."); Table table = null; ResultScanner rScanner = null; try { table = conn.getTable(tableName); // Create a filter for indexed column. SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("age"), CompareOp.GREATER_OR_EQUAL, Bytes.toBytes("26")); filter.setFilterIfMissing(true); Scan scan = new Scan(); scan.setFilter(filter); rScanner = table.getScanner(scan); // Scan the data LOG.info("Scan data using indices.."); for (Result result : rScanner) { LOG.info("Scanned row is:"); for (Cell cell : result.rawCells()) { LOG.info(Bytes.toString(CellUtil.cloneRow(cell)) + ":" + Bytes.toString(CellUtil.cloneFamily(cell)) + "," + Bytes.toString(CellUtil.cloneQualifier(cell)) + "," + Bytes.toString(CellUtil.cloneValue(cell))); } } LOG.info("Successfully scanned data using indices for table " + tableName + "."); } catch (IOException e) { LOG.error("Failed to scan data using indices for table " + tableName + "." + e); } finally { if (rScanner != null) { rScanner.close(); } if (table != null) { try { table.close(); } catch (IOException e) { LOG.error("failed to close table, ", e); } } } LOG.info("Entering HIndex-based Query."); }
  • SparkSQL常用接口 Spark SQL中在Python中重要的类有: pyspark.sql.SQLContext:是Spark SQL功能和DataFrame的主入口。 pyspark.sql.DataFrame:是一个以命名列方式组织的分布式数据集。 pyspark.sql.HiveContext:获取存储在Hive中数据的主入口。 pyspark.sql.DataFrameStatFunctions:统计功能中一些函数。 pyspark.sql.functions:DataFrame中内嵌的函数。 pyspark.sql.Window:sql中提供窗口功能。 表4 Spark SQL常用的Action 方法 说明 collect() 返回一个数组,包含DataFrame的所有列。 count() 返回DataFrame中的行数。 describe() 计算统计信息,包含计数,平均值,标准差,最小值和最大值。 first() 返回第一行。 head(n) 返回前n行。 show() 用表格形式显示DataFrame。 take(num) 返回DataFrame中的前num行。 表5 基本的DataFrame Functions 方法 说明 explain() 打印出SQL语句的逻辑计划和物理计划。 printSchema() 打印schema信息到控制台。 registerTempTable(name) 将DataFrame注册为一张临时表,命名为name,其周期和SQLContext绑定在一起。 toDF() 返回一个列重命名的DataFrame。
  • Spark SQL常用接口 Spark SQL中重要的类有: SQLContext:是Spark SQL功能和DataFrame的主入口。 DataFrame:是一个以命名列方式组织的分布式数据集 DataFrameReader:从外部存储系统加载DataFrame的接口。 DataFrameStatFunctions:实现DataFrame的统计功能。 UserDefinedFunction:用户自定义的函数。 常见的Actions方法有: 表5 Spark SQL方法介绍 方法 说明 Row[] collect() 返回一个数组,包含DataFrame的所有列。 long count() 返回DataFrame的行数。 DataFrame describe(java.lang.String... cols) 计算统计信息,包含计数,平均值,标准差,最小值和最大值。 Row first() 返回第一行。 Row[] head(int n) 返回前n行。 void show() 用表格形式显示DataFrame的前20行。 Row[] take(int n) 返回DataFrame中的前n行。 表6 基本的DataFrame Functions介绍 方法 说明 void explain(boolean extended) 打印出SQL语句的逻辑计划和物理计划。 void printSchema() 打印schema信息到控制台。 registerTempTable 将DataFrame注册为一张临时表,其周期和SQLContext绑定在一起。 DataFrame toDF(java.lang.String... colNames) 返回一个列重命名的DataFrame。 DataFrame sort(java.lang.String sortCol,java.lang.String... sortCols) 根据不同的列,按照升序或者降序排序。 GroupedData rollup(Column... cols) 对当前的DataFrame特定列进行多维度的回滚操作。
  • 数据库配置—Derby数据库配置过程 首先应下载一个数据库,可根据具体场景选择最适合的数据库。 该任务以Derby数据库为例。Derby是一个小型的,java编写的,易于使用却适合大多数应用程序的开放源码数据库。 Derby数据库的获取。在官网下载最新版的Derby数据库(本示例使用10.14.1.0),通过WinScp等工具传入Linux客户端,并解压。 在Derby的安装目录下,进入bin目录,输入如下命令。 export DERBY_INSTALL=/opt/db-derby-10.14.1.0-bin export CLASSPATH=$DERBY_INSTALL/lib/derbytools.jar:$DERBY_INSTALL\lib\derbynet.jar:. export DERBY_HOME=/opt/db-derby-10.14.1.0-bin . setNetworkServerCP ./startNetworkServer -h 主机名 执行./ij命令,输入connect 'jdbc:derby://主机名:1527/example;create=true';,建立连接。 执行./ij命令前,需要确保已配置java_home,可通过which java命令检查是否已配置。 数据库建立好后,可以执行sql语句进行操作,需要建立两张表ORIGINAL和GOAL,并向ORIGINAL中插入一组数据,命令如下:(表名仅供参考,可自行设定) CREATE TABLE GOAL(WORD VARCHAR(12),COUNT INT ); CREATE TABLE ORIGINAL(WORD VARCHAR(12),COUNT INT ); INSERT INTO ORIGINAL VALUES('orange',1),('pineapple',1),('banana',1),('watermelon',1);
  • 操作场景 本文档主要说明如何使用开源Storm-JDBC工具包,完成Storm和JDBC之间的交互。Storm-JDBC中包含两类Bolt:JdbcInsertBolt和JdbcLookupBolt。其中,JdbcLookupBolt主要负责从数据库中查数据,JdbcInsertBolt主要向数据库中存数据。当然,JdbcLookupBolt和JdbcInsertBolt中也可以增加处理逻辑对数据进行处理。 本章节只适用于MRS产品Storm与JDBC组件间的访问。本章中描述的jar包的具体版本信息请以实际情况为准。
  • 部署运行 在Storm示例代码根目录执行如下命令打包:"mvn package"。执行成功后,将会在target目录生成storm-examples-1.0.jar。 执行命令提交拓扑。提交命令示例(拓扑名为jdbc-test)。 storm jar /opt/jartarget/storm-examples-1.0.jar com.huawei.storm.example.jdbc.SimpleJDBCTopology jdbc-test
  • 操作场景 Broadcast(广播)可以把数据集合分发到每一个节点上,Spark任务在执行过程中要使用这个数据集合时,就会在本地查找Broadcast过来的数据集合。如果不使用Broadcast,每次任务需要数据集合时,都会把数据序列化到任务里面,不但耗时,还使任务变得很大。 每个任务分片在执行中都需要同一份数据集合时,就可以把公共数据集Broadcast到每个节点,让每个节点在本地都保存一份。 大表和小表做join操作时可以把小表Broadcast到各个节点,从而就可以把join操作转变成普通的操作,减少了shuffle操作。
  • 操作步骤 一个简单的流处理系统由以下三部分组件组成:数据源 + 接收器 + 处理器。数据源为Kafka,接收器为Streaming中的Kafka数据源接收器,处理器为Streaming。 对Streaming调优,就必须使三个部件的性能都最优化。 数据源调优 在实际的应用场景中,数据源为了保证数据的容错性,会将数据保存在本地磁盘中,而Streaming的计算结果往往全部在内存中完成,数据源很有可能成为流式系统的最大瓶颈点。 对Kafka的性能调优,有以下几个点: 使用Kafka-0.8.2以后版本,可以使用异步模式的新Producer接口。 配置多个Broker的目录,设置多个IO线程,配置Topic合理的Partition个数。 详情请参见Kafka开源文档中的“性能调优”部分:http://kafka.apache.org/documentation.html。 接收器调优 Streaming中已有多种数据源的接收器,例如Kafka、Flume、MQTT、ZeroMQ等,其中Kafka的接收器类型最多,也是最成熟一套接收器。 Kafka包括三种模式的接收器API: KafkaReceiver:直接接收Kafka数据,进程异常后,可能出现数据丢失。 ReliableKafkaReceiver:通过ZooKeeper记录接收数据位移。 DirectKafka:直接通过RDD读取Kafka每个Partition中的数据,数据高可靠。 从实现上来看,DirectKafka的性能会是最好的,实际测试上来看,DirectKafka也确实比其他两个API性能好。因此推荐使用DirectKafka的API实现接收器。 数据接收器作为一个Kafka的消费者,对于它的配置优化,请参见Kafka开源文档:http://kafka.apache.org/documentation.html。 处理器调优 Streaming的底层由Spark执行,因此大部分对于Spark的调优措施,都可以应用在Streaming之中,例如: 数据序列化 配置内存 设置并行度 使用External Shuffle Service提升性能 在做Spark Streaming的性能优化时需注意一点,越追求性能上的优化,Streaming整体的可靠性会越差。例如: “spark.streaming.receiver.writeAheadLog.enable”配置为“false”的时候,会明显减少磁盘的操作,提高性能,但由于缺少WAL机制,会出现异常恢复时,数据丢失。 因此,在调优Streaming的时候,这些保证数据可靠性的配置项,在生产环境中是不能关闭的。
  • 操作场景 Spark系统在运行含shuffle过程的应用时,Executor进程除了运行task,还要负责写shuffle数据,给其他Executor提供shuffle数据。当Executor进程任务过重,导致GC而不能为其他Executor提供shuffle数据时,会影响任务运行。 External shuffle Service是长期存在于NodeManager进程中的一个辅助服务。通过该服务来抓取shuffle数据,减少了Executor的压力,在Executor GC的时候也不会影响其他Executor的任务运行。
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全