[快速入门]Mapreduce框架
MapReduce服务定价

MapReduce服务 定价与计费 价格计算器 1对1咨询 计费项 购买MRS集群的费用包含两个部分: 1、MRS服务管理费用 2、IaaS基础设施资源费用(弹性云服务器,云硬盘,弹性IP/带宽等) MRS服务管理费用详情,请参见产品价格详情。您可以通过MRS提供的价格计算器,选

MapReduce服务入门

购买 文档 专家咨询 4步快速使用MapReduce服务 4步快速使用MapReduce服务 上传数据 如果您没有购买集群,请上传程序和数据到对象存储服务(OBS)中;如果您已经购买了集群,请上传程序和数据到HDFS。 购买集群 进入MapReduce管理控制台,单击“购买集群”并配

MapReduce服务学习与资源

MapReduce服务 MRS 资源 MapReduce服务 MRS 资源 提供Hudi、Doris、Spark、HBase、Flink、Clickhouse、Hadoop等开源大数据组件,支持湖仓一体、灵活的弹性控制能力 提供Hudi、Doris、Spark、HBase、Fli

云数据迁移 CDM

景 支持近20种常用数据源,满足数据在云上和云下的不同迁移场景 迁移效率高 基于分布式计算框架进行数据任务执行和数据传输优化,并针对特定数据源写入做了专项优化,迁移效率高 基于分布式计算框架进行数据任务执行和数据传输优化,并针对特定数据源写入做了专项优化,迁移效率高 简单易用 免

华为云数据湖探索服务 DLI

统一的管理机制 使用统一的IAM管理用户(无需单独创建DLI用户),支持IAM细粒度授权 搭配使用 MapReduce服务 MRS 基因行业 基因数据处理 现在基因行业有很多基于Spark分布式框架的第三方分析库,如ADAM、Hail等 优势 支持自定义镜像 支持基于基础镜像打包ADA

数据治理中心

服务,对象存储等30+同构/异构数据源,基于分布式计算框架,稳定高效地对海量数据进行迁移,帮助用户快速完成数据入湖开发 统一集成工具 入湖统一配置:支持全量迁移+增量集成以及实时同步任务 高可靠高稳定:分布式数据迁移同步框架,计算资源Serverless化 全域数据类型 支持丰富多样的数据源,快速实现数据自由流动

对象存储服务 OBS功能-BigData Pro

存算分离资源利用率更高 存算分离资源利用率更高 OBS具备海量数据存储能力,结合华为云MapReduce服务,为美图提供BigData Pro大数据解决方案 OBS具备海量数据存储能力,结合华为云MapReduce服务,为美图提供BigData Pro大数据解决方案 典型业务场景-大数据离线分析

华为云Astro企业应用

支持领域驱动设计的建模,以及复杂关系建模,生成架构规范的代码框架,降低开发门槛 安全可靠,内置业界领先的华为10+安全防护能力 内置业界优秀开发规范实践,内置编码规范、API规范、数据规范,提升企业研发质量 框架组件:提供标准框架能力,开发人员聚焦业务开发 框架组件:提供标准框架能力,开发人员聚焦业务开发 中

应用管理与运维平台 ServiceStage-产品功能

微服务管理 ServiceStage微服务引擎支持主流微服务框架接入和治理,您可以灵活选择最适合的微服务技术,快速开发云应用,适应复杂多变的业务需求。 1、支持原生ServiceComb微服务框架:使用ServiceComb框架开发的微服务,可以无缝接入微服务引擎。微服务引擎采用的注册发现中心Apache

[相关产品]Mapreduce框架
云框架-软件系统框架

使用云框架的两个常见原因: 在表格中安排数据,从而用来呈现数据间的关系;或者在ap上组织图形和文本,也就是用于app布局。1.增大系统容量。我们的业务量越来越大,而要能应对越来越大的业务量,普通框架的性能已经无法满足了,我们需要专业的框架才能应对大规模的应用场景。所以,我们需要垂

应用使能框架专业服务

业务一站式服务。(4)提供个人协同服务、团队协同服务、会议协同服务服务、业务协同服务等协同办公服务。2、开发框架使能服务:(1)提供移动框架服务、能力开放服务、微服务框架服务、中间件服务、通用基础服务等通用基础服务。(2)提供事件监控域服务、指标体系域服务、专题分析域服务、协同办

UbuntuPHP5.6运行环境(ThinkPHP框架)

ThinkPHP是一个快速、简单的基于MVC和面向对象的轻量级PHP开发框架。ThinkPHP是一个快速、简单的基于MVC和面向对象的轻量级PHP开发框架,遵循Apache2开源协议发布,从诞生以来一直秉承简洁实用的设计原则,在保持出色的性能和至简的代码的同时,尤其注重开发体验和

北交联合云分布式服务框架系统

北交联合云分布式服务框架帮助企业构建 SOA 化的服务平台,同时快速积累企业软件资产,提升企业的业务能力。不仅是企业架构、开放服务、集中运维平台,更是从开发到运维的全生命周期的服务技术平台,帮助企业留下软件资产,保证服务能力、保证业务共享和业务的灵活性。产品功能服务发布及管理原有

hadoop生态组件

rk:spark是个开源的数据 分析集群计算框架,最初由加州大学伯克利分校AMPLab,建立于HDFS之上。spark与hadoop一样,用于构建大规模,延迟低的数据分析应用。spark采用Scala语言实现,使用Scala作为应用框架。spark采用基于内存的分布式数据集,优化

智慧城市运营中心(IOC)共享交换模块解决方案

3、统一建设,资源整合 4、稳定可靠,安全运行 5、面向服务,规范管理国家电子政务总体框架由服务与应用系统、信息资源、基础设施、法律法规与标准化体系、管理体制等部分组成。政务信息资源目录体系与交换体系是国家电子政务总体框架中的重要组成部分,是电子政务的基础设施之一。政务信息资源目录与交换体系在

科蓝分布式批量任务调度平台配套服务

志。 3. 支持多种定时触发策略,CRON、固定频率、固定延迟、API、手动触发等; 4. 支持多种调度模式,单机、广播、Map、MapReduce; 5. 可视化配置任务组-工作流,可拖拉拽的方式完成任务组的编排;支持简单判断逻辑及嵌套子任务组;支持上下游任务间的参数传递; 6

大数据集群搭建

CDH大数据集群安装,包括HDFS、MapReduce、Hive、Hbase、Zookeeper、Sqoop,Kafka简化了大数据平台的安装、使用难度。1. CDH简介      在商业应用中,对于企业成百上千的机器集群进行安装hadoop一系列组件费时费力,而且hadoop各

科蓝分布式批量任务调度平台

看日志。3. 支持多种定时触发策略,CRON、固定频率、固定延迟、API、手动触发等;4. 支持多种调度模式,单机、广播、Map、MapReduce;5. 可视化配置任务组-工作流,可拖拉拽的方式完成任务组的编排;支持简单判断逻辑及嵌套子任务组;支持上下游任务间的参数传递;6. 支持多种执行器,Spring

[相似文章]Mapreduce框架
MapReduce工作原理_MapReduce是什么意思_MapReduce流程_MRS_华为云

站式运维能力。 MapReduce相关精选推荐 MapReduce服务 MapReduce服务入门 MapReduce服务定价 MapReduce服务学习与资源 MapReduce 使用Mapreduce MapReduce Action 使用MapReduce 查看更多 收起

深圳MES系统_MES框架_MES集成

深圳MES系统 深圳MES系统 聚焦行业化,深度专业化的MES系统,满足规上企业、中大型企业、专精特新企业所需的“一站式”数字化工厂解决方案。 聚焦行业化,深度专业化的MES系统,满足规上企业、中大型企业、专精特新企业所需的“一站式”数字化工厂解决方案。 欧软云MES立即购买 免费试用

MapReduce服务_什么是MapReduce服务_什么是HBase

使用Hive客户端创建外部表 MapReduce服务 MRS 03:44 MapReduce服务 MRS 安装及使用MRS客户端 MapReduce服务 MRS 03:22 MapReduce服务 MRS 使用HBase客户端创建表 MapReduce服务 MRS 04:20 MapReduce服务 MRS

MapReduce服务_什么是Yarn_如何使用Yarn

会处理此应用的资源申请。其优先级从高到低依次为:本地资源的申请、同机架的申请,任意机器的申请。 Yarn原理 新的Hadoop MapReduce框架被命名为MRv2或Yarn。Yarn主要包括ResourceManager、ApplicationMaster与NodeManager三个部分。

MRS备份恢复_MapReduce备份_数据备份

MRS精选文章推荐 大数据分析是什么_使用MapReduce_创建MRS服务 什么是Manager_Manager的功能_MRS运维管理 MapReduce工作原理_MapReduce是什么意思_MapReduce流程 MapReduce服务_什么是HetuEngine_如何使用HetuEngine

OA协同办公_OA数据库_OA框架

OA协同办公 OA协同办公 浩辰云图CAD网页版软件,包括后端各种API能力+前端JS SDK,专门为企业量身定制的CAD在线看图产品,无需安装任何插件,可无缝集成到各类BS系统、APP软件、微信小程序中,实现企业图纸管理、协同办公等私有化。 浩辰云图CAD网页版软件,包括后端各种API能力+前端JS

MapReduce服务_如何使用MapReduce服务_MRS集群客户端安装与使用

使用Hive客户端创建外部表 MapReduce服务 MRS 03:44 MapReduce服务 MRS 安装及使用MRS客户端 MapReduce服务 MRS 03:22 MapReduce服务 MRS 使用HBase客户端创建表 MapReduce服务 MRS 04:20 MapReduce服务 MRS

MapReduce服务_什么是HetuEngine_如何使用HetuEngine

个文件最大值、日志归档的最大保留数目等。 MRS精选文章推荐 大数据分析是什么_使用MapReduce_创建MRS服务 MapReduce工作原理_MapReduce是什么意思_MapReduce流程 ECS-服务器-云服务器-华为ECS-弹性云服务器试用 免费云服务器_个人免费

MapReduce服务_什么是Hue_如何使用Hue

MRS精选文章推荐 MRS优势_什么是MRS_MRS功能 MapReduce工作原理_MapReduce是什么意思_MapReduce流程 MapReduce服务_什么是HetuEngine_如何使用HetuEngine MRS备份恢复_MapReduce备份_数据备份 怎样选择弹性云服务器_ECS哪家强_华为ECS

Mapreduce框架

简介

Spark是基于内存的分布式计算框架。在迭代计算的场景下,数据处理过程中的数据可以存储在内存中,提供了比MapReduce高10到100倍的计算能力。Spark可以使用HDFS作为底层存储,使用户能够快速地从MapReduce切换到Spark计算平台上去。Spark提供一站式数据分析能力,包括小批量流式处理、离线批处理、SQL查询、数据挖掘等,用户可以在同一个应用中无缝结合使用这些能力。Spark2x的开源新特性请参考Spark2x开源新特性说明

更多关于Spark2x组件操作指导,请参考使用Spark/Spark2x

Spark的特点如下:

  • 通过分布式内存计算和DAG(无回路有向图)执行引擎提升数据处理能力,比MapReduce性能高10倍到100倍。
  • 提供多种语言开发接口(Scala/Java/Python),并且提供几十种高度抽象算子,可以很方便构建分布式的数据处理应用。
  • 结合SQLStreaming等形成数据处理栈,提供一站式数据处理能力。
  • 支持契合Hadoop生态环境,Spark应用可以运行在Standalone、Mesos或者YARN上,能够接入HDFS、HBase、Hive等多种数据源,支持MapReduce程序平滑转接。

结构

Spark的架构如图1所示,各模块的说明如表1所示。

图1 Spark架构
表1 基本概念说明

模块

说明

Cluster Manager

集群管理器,管理集群中的资源。Spark支持多种集群管理器,Spark自带的Standalone集群管理器、Mesos或YARN。Spark集群默认采用YARN模式。

Application

Spark应用,由一个Driver Program和多个Executor组成。

Deploy Mode

部署模式,分为cluster和client模式。cluster模式下,Driver会在集群内的节点运行;而在client模式下,Driver在客户端运行(集群外)。

Driver Program

是Spark应用程序的主进程,运行Application的main()函数并创建SparkContext。负责应用程序的解析、生成Stage并调度Task到Executor上。通常SparkContext代表Driver Program。

Executor

在Work Node上启动的进程,用来执行Task,管理并处理应用中使用到的数据。一个Spark应用一般包含多个Executor,每个Executor接收Driver的命令,并执行一到多个Task。

Worker Node

集群中负责启动并管理Executor以及资源的节点。

Job

一个Action算子(比如collect算子)对应一个Job,由并行计算的多个Task组成。

Stage

每个Job由多个Stage组成,每个Stage是一个Task集合,由DAG分割而成。

Task

承载业务逻辑的运算单元,是Spark平台上可执行的最小工作单元。一个应用根据执行计划以及计算量分为多个Task。

Spark原理

Spark的应用运行架构如图2所示,运行流程如下所示:

  1. 应用程序(Application)是作为一个进程的集合运行在集群上的,由Driver进行协调。
  2. 在运行一个应用时,Driver会去连接集群管理器(Standalone、Mesos、YARN)申请运行Executor资源,并启动ExecutorBackend。然后由集群管理器在不同的应用之间调度资源。Driver同时会启动应用程序DAG调度、Stage划分、Task生成。
  3. 然后Spark会把应用的代码(传递给SparkContext的JAR或者Python定义的代码)发送到Executor上。
  4. 所有的Task执行完成后,用户的应用程序运行结束。
图2 Spark应用运行架构

Spark采用Master和Worker的模式,如图3所示。用户在Spark客户端提交应用程序,调度器将Job分解为多个Task发送到各个Worker中执行,各个Worker将计算的结果上报给Driver(即Master),Driver聚合结果返回给客户端。

图3 Spark的Master和Worker

在此结构中,有几个说明点:

  • 应用之间是独立的。

    每个应用有自己的executor进程,Executor启动多个线程,并行地执行任务。无论是在调度方面,或者是executor方面。各个Driver独立调度自己的任务;不同的应用任务运行在不同的JVM上,即不同的Executor。

  • 不同Spark应用之间是不共享数据的,除非把数据存储在外部的存储系统上(比如HDFS)。
  • 因为Driver程序在集群上调度任务,所以Driver程序需要和worker节点比较近,比如在一个相同的局部网络内。

Spark on YARN有两种部署模式:

  • YARN-Cluster模式下,Spark的Driver会运行在YARN集群内的ApplicationMaster进程中,ApplicationMaster已经启动之后,提交任务的客户端退出也不会影响任务的运行。
  • YARN-Client模式下,Driver启动在客户端进程内,ApplicationMaster进程只用来向YARN集群申请资源。

Spark Streaming原理

Spark Streaming是一种构建在Spark上的实时计算框架,扩展了Spark处理大规模流式数据的能力。当前Spark支持两种数据处理方式:Direct Streaming和Receiver方式。

Direct Streaming计算流程

Direct Streaming方式主要通过采用Direct API对数据进行处理。以Kafka Direct接口为例,与启动一个Receiver来连续不断地从Kafka中接收数据并写入到WAL中相比,Direct API简单地给出每个batch区间需要读取的偏移量位置。然后,每个batch的Job被运行,而对应偏移量的数据在Kafka中已准备好。这些偏移量信息也被可靠地存储在checkpoint文件中,应用失败重启时可以直接读取偏移量信息。

图4 Direct Kafka接口数据传输

需要注意的是,Spark Streaming可以在失败后重新从Kafka中读取并处理数据段。然而,由于语义仅被处理一次,重新处理的结果和没有失败处理的结果是一致的。

因此,Direct API消除了需要使用WAL和Receivers的情况,且确保每个Kafka记录仅被接收一次,这种接收更加高效。使得Spark Streaming和Kafka可以很好地整合在一起。总体来说,这些特性使得流处理管道拥有高容错性、高效性及易用性,因此推荐使用Direct Streaming方式处理数据。

Receiver计算流程

在一个Spark Streaming应用开始时(也就是Driver开始时),相关的StreamingContext(所有流功能的基础)使用SparkContext启动Receiver成为长驻运行任务。这些Receiver接收并保存流数据到Spark内存中以供处理。用户传送数据的生命周期如图5所示:

图5 数据传输生命周期
  1. 接收数据(蓝色箭头)

    Receiver将数据流分成一系列小块,存储到Executor内存中。另外,在启用预写日志(Write-ahead Log,简称WAL)以后,数据同时还写入到容错文件系统的预写日志中。

  2. 通知Driver(绿色箭头)

    接收块中的元数据(Metadata)被发送到Driver的StreamingContext。这个元数据包括:

    • 定位其在Executor内存中数据位置的块Reference ID。
    • 若启用了WAL,还包括块数据在日志中的偏移信息。
  3. 处理数据(红色箭头)

    对每个批次的数据,StreamingContext使用Block信息产生RDD及其Job。StreamingContext通过运行任务处理Executor内存中的Block来执行Job。

  4. 周期性地设置检查点(橙色箭头)
  5. 为了容错的需要,StreamingContext会周期性地设置检查点,并保存到外部文件系统中。

容错性

Spark及其RDD允许无缝地处理集群中任何Worker节点的故障。鉴于Spark Streaming建立于Spark之上,因此其Worker节点也具备了同样的容错能力。然而,由于Spark Streaming的长正常运行需求,其应用程序必须也具备从Driver进程(协调各个Worker的主要应用进程)故障中恢复的能力。使Spark Driver能够容错是件很棘手的事情,因为可能是任意计算模式实现的任意用户程序。不过Spark Streaming应用程序在计算上有一个内在的结构:在每批次数据周期性地执行同样的Spark计算。这种结构允许把应用的状态(亦称Checkpoint)周期性地保存到可靠的存储空间中,并在Driver重新启动时恢复该状态。

对于文件这样的源数据,这个Driver恢复机制足以做到零数据丢失,因为所有的数据都保存在了像HDFS这样的容错文件系统中。但对于像Kafka和Flume等其他数据源,有些接收到的数据还只缓存在内存中,尚未被处理,就有可能会丢失。这是由于Spark应用的分布操作方式引起的。当Driver进程失败时,所有在Cluster Manager中运行的Executor,连同在内存中的所有数据,也同时被终止。为了避免这种数据损失,Spark Streaming引进了WAL功能。

WAL通常被用于 数据库 和文件系统中,用来保证任何数据操作的持久性,即先将操作记入一个持久的日志,再对数据施加这个操作。若施加操作的过程中执行失败了,则通过读取日志并重新施加前面指定的操作,系统就得到了恢复。下面介绍了如何利用这样的概念保证接收到的数据的持久性。

Kafka数据源使用Receiver来接收数据,是Executor中的长运行任务,负责从数据源接收数据,并且在数据源支持时还负责确认收到数据的结果(收到的数据被保存在Executor的内存中,然后Driver在Executor中运行来处理任务)。

当启用了预写日志以后,所有收到的数据同时还保存到了容错文件系统的日志文件中。此时即使Spark Streaming失败,这些接收到的数据也不会丢失。另外,接收数据的正确性只在数据被预写到日志以后Receiver才会确认,已经缓存但还没有保存的数据可以在Driver重新启动之后由数据源再发送一次。这两个机制确保了零数据丢失,即所有的数据或者从日志中恢复,或者由数据源重发。

如果需要启用预写日志功能,可以通过如下动作实现:

  • 通过“streamingContext.checkpoint”(path-to-directory)设置checkpoint的目录,这个目录是一个HDFS的文件路径,既用作保存流的checkpoint,又用作保存预写日志。
  • 设置SparkConf的属性“spark.streaming.receiver.writeAheadLog.enable”“true”(默认值是“false”)。

在WAL被启用以后,所有Receiver都获得了能够从可靠收到的数据中恢复的优势。建议缓存RDD时不采取多备份选项,因为用于预写日志的容错文件系统很可能也复制了数据。

在启用了预写日志以后,数据接收吞吐率会有降低。由于所有数据都被写入容错文件系统,文件系统的写入吞吐率和用于 数据复制 的网络带宽,可能就是潜在的瓶颈了。在此情况下,需要创建更多的Receiver增加数据接收的并行度,或使用更好的硬件以增加容错文件系统的吞吐率。

恢复流程

当一个失败的Driver重启时,按如下流程启动:
图6 计算恢复流程
  1. 恢复计算(橙色箭头)

    使用checkpoint信息重启Driver,重新构造SparkContext并重启Receiver。

  2. 恢复元数据块(绿色箭头)

    为了保证能够继续下去所必备的全部元数据块都被恢复。

  3. 未完成作业的重新形成(红色箭头)

    由于失败而没有处理完成的批处理,将使用恢复的元数据再次产生RDD和对应的作业。

  4. 读取保存在日志中的块数据(蓝色箭头)

    在这些作业执行时,块数据直接从预写日志中读出。这将恢复在日志中可靠地保存的所有必要数据。

  5. 重发尚未确认的数据(紫色箭头)

    失败时没有保存到日志中的缓存数据将由数据源再次发送。因为Receiver尚未对其确认。

因此通过预写日志和可靠的Receiver,Spark Streaming就可以保证没有输入数据会由于Driver的失败而丢失。

SparkSQL和DataSet原理

SparkSQL

图7 SparkSQL和DataSet

Spark SQL是Spark中用于结构化数据处理的模块。在Spark应用中,可以无缝地使用SQL语句亦或是DataSet API对结构化数据进行查询。

Spark SQL以及DataSet还提供了一种通用的访问多数据源的方式,可访问的数据源包括Hive、 CS V、Parquet、ORC、JSON和JDBC数据源,这些不同的数据源之间也可以实现互相操作。Spark SQL复用了Hive的前端处理逻辑和元数据处理模块,使用Spark SQL可以直接对已有的Hive数据进行查询。

另外,SparkSQL还提供了诸如API、CLI、JDBC等诸多接口,对客户端提供多样接入形式。

Spark SQL Native DDL/DML

Spark1.5将很多DDL/DML命令下压到Hive执行,造成了与Hive的耦合,且在一定程度上不够灵活(比如报错不符合预期、结果与预期不一致等)。

Spark2x实现了命令的本地化,使用Spark SQL Native DDL/DML取代Hive执行DDL/DML命令。一方面实现和Hive的解耦,另一方面可以对命令进行定制化。

DataSet

DataSet是一个由特定域的对象组成的强类型集合,可通过功能或关系操作并行转换其中的对象。 每个Dataset还有一个非类型视图,即由多个列组成的DataSet,称为DataFrame。

DataFrame是一个由多个列组成的结构化的分布式数据集合,等同于关系数据库中的一张表,或者是R/Python中的data frame。DataFrame是Spark SQL中的最基本的概念,可以通过多种方式创建,例如结构化的数据集、Hive表、外部数据库或者是RDD。

可用于DataSet的操作分为Transformation和Action。

  • Transformation操作可生成新的DataSet。

    如map、filter、select和aggregate (groupBy)。

  • Action操作可触发计算及返回记结果。

    如count、show或向文件系统写数据。

通常使用两种方法创建一个DataSet:

  • 最常见的方法是通过使用SparkSession上的read函数将Spark指向存储系统上的某些文件。
    val people = spark.read.parquet("...").as[Person]  // Scala
    DataSet<Person> people = spark.read().parquet("...").as(Encoders.bean(Person.class));//Java
  • 还可通过已存在的DataSet上可用的transformation操作来创建数据集。 例如,在已存在的DataSet上应用map操作来创建新的DataSet:
    val names = people.map(_.name)  // 使用Scala语言,且names为一个Dataset
    Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING)); // Java

CLI和JD BCS erver

除了API编程接口之外,Spark SQL还对外提供CLI/JDBC接口:

  • spark-shell和spark-sql脚本均可以提供CLI,以便于调试。
  • JDBCServer提供JDBC接口,外部可直接通过发送JDBC请求来完成结构化数据的计算和解析。

SparkSession原理

SparkSession是Spark2x编程的统一API,也可看作是读取数据的统一入口。SparkSession提供了一个统一的入口点来执行以前分散在多个类中的许多操作,并且还为那些较旧的类提供了访问器方法,以实现最大的兼容性。

使用构建器模式创建SparkSession。如果存在SparkSession,构建器将自动重用现有的SparkSession;如果不存在则会创建一个SparkSession。 在I/O期间,在构建器中设置的配置项将自动同步到Spark和Hadoop。

import org.apache.spark.sql.SparkSession
val sparkSession = SparkSession.builder
  .master("local")
  .appName("my-spark-app")
  .config("spark.some.config.option", "config-value")
  .getOrCreate()
  • SparkSession可以用于对数据执行SQL查询,将结果返回为DataFrame。
    sparkSession.sql("select * from person").show
  • SparkSession可以用于设置运行时的配置项,这些配置项可以在SQL中使用变量替换。
    sparkSession.conf.set("spark.some.config", "abcd")
    sparkSession.conf.get("spark.some.config")
    sparkSession.sql("select ${spark.some.config}")
  • SparkSession包括一个“catalog”方法,其中包含使用Metastore(即数据目录)的方法。方法返回值为数据集,可以使用相同的Dataset API来运行。
    val tables = sparkSession.catalog.listTables()
    val columns = sparkSession.catalog.listColumns("myTable")
  • 底层SparkContext可以通过SparkSession的SparkContext API访问。
    val sparkContext = sparkSession.sparkContext

Structured Streaming原理

Structured Streaming是构建在Spark SQL引擎上的流式数据处理引擎,用户可以使用Scala、Java、Python或R中的Dataset/DataFrame API进行流数据聚合运算、按事件时间窗口计算、流流Join等操作。当流数据连续不断地产生时,Spark SQL将会增量的、持续不断地处理这些数据并将结果更新到结果集中。同时,系统通过checkpoint和Write Ahead Logs确保端到端的完全一次性容错保证。

Structured Streaming的核心是将流式的数据看成一张不断增加的数据库表,这种流式的数据处理模型类似于数据块处理模型,可以把静态数据库表的一些查询操作应用在流式计算中,Spark执行标准的SQL查询,从不断增加的无边界表中获取数据。
图8 Structured Streaming无边界表

每一条查询的操作都会产生一个结果集Result Table。每一个触发间隔,当新的数据新增到表中,都会最终更新Result Table。无论何时结果集发生了更新,都能将变化的结果写入一个外部的存储系统。

图9 Structured Streaming数据处理模型

Structured Streaming在OutPut阶段可以定义不同的存储方式,有如下3种:

  • Complete Mode:整个更新的结果集都会写入外部存储。整张表的写入操作将由外部存储系统的连接器完成。
  • Append Mode:当时间间隔触发时,只有在Result Table中新增加的数据行会被写入外部存储。这种方式只适用于结果集中已经存在的内容不希望发生改变的情况下,如果已经存在的数据会被更新,不适合使用此种方式。
  • Update Mode:当时间间隔触发时,只有在Result Table中被更新的数据才会被写入外部存储系统。注意,和Complete Mode方式的不同之处是不更新的结果集不会写入外部存储。

基本概念

  • RDD

    即弹性分布数据集(Resilient Distributed Dataset),是Spark的核心概念。指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用。

    RDD的生成:

    • 从HDFS输入创建,或从与Hadoop兼容的其他存储系统中输入创建。
    • 从父RDD转换得到新RDD。
    • 从数据集合转换而来,通过编码实现。

    RDD的存储:

    • 用户可以选择不同的存储级别缓存RDD以便重用(RDD有11种存储级别)。
    • 当前RDD默认是存储于内存,但当内存不足时,RDD会溢出到磁盘中。
  • Dependency(RDD的依赖)

    RDD的依赖分别为:窄依赖和宽依赖。

    图10 RDD的依赖
    • 窄依赖:指父RDD的每一个分区最多被一个子RDD的分区所用。
    • 宽依赖:指子RDD的分区依赖于父RDD的所有分区。

    窄依赖对优化很有利。逻辑上,每个RDD的算子都是一个fork/join(此join非上文的join算子,而是指同步多个并行任务的barrier):把计算fork到每个分区,算完后join,然后fork/join下一个RDD的算子。如果直接翻译到物理实现,是很不经济的:一是每一个RDD(即使是中间结果)都需要物化到内存或存储中,费时费空间;二是join作为全局的barrier,是很昂贵的,会被最慢的那个节点拖死。如果子RDD的分区到父RDD的分区是窄依赖,就可以实施经典的fusion优化,把两个fork/join合为一个;如果连续的变换算子序列都是窄依赖,就可以把很多个fork/join并为一个,不但减少了大量的全局barrier,而且无需物化很多中间结果RDD,这将极大地提升性能。Spark把这个叫做流水线(pipeline)优化。

  • Transformation和Action(RDD的操作)

    对RDD的操作包含Transformation(返回值还是一个RDD)和Action(返回值不是一个RDD)两种。RDD的操作流程如图11所示。其中Transformation操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。

    图11 RDD操作示例

    RDD看起来与Scala集合类型没有太大差别,但数据和运行模型大相迥异。

    val file = sc.textFile("hdfs://...")
    val errors = file.filter(_.contains("ERROR"))
    errors.cache()
    errors.count()
    1. textFile算子从HDFS读取日志文件,返回file(作为RDD)。
    2. filter算子筛出带“ERROR”的行,赋给errors(新RDD)。filter算子是一个Transformation操作。
    3. cache算子缓存下来以备未来使用。
    4. count算子返回errors的行数。count算子是一个Action操作。
    Transformation操作可以分为如下几种类型:
    • 视RDD的元素为简单元素。

      输入输出一对一,且结果RDD的分区结构不变,主要是map。

      输入输出一对多,且结果RDD的分区结构不变,如flatMap(map后由一个元素变为一个包含多个元素的序列,然后展平为一个个的元素)。

      输入输出一对一,但结果RDD的分区结构发生了变化,如union(两个RDD合为一个,分区数变为两个RDD分区数之和)、coalesce(分区减少)。

      从输入中选择部分元素的算子,如filter、distinct(去除重复元素)、subtract(本RDD有、其他RDD无的元素留下来)和sample(采样)。

    • 视RDD的元素为Key-Value对。

      对单个RDD做一对一运算,如mapValues(保持源RDD的分区方式,这与map不同);

      对单个RDD重排,如sort、partitionBy(实现一致性的分区划分,这个对数据本地性优化很重要);

      对单个RDD基于key进行重组和reduce,如groupByKey、reduceByKey;

      对两个RDD基于key进行join和重组,如join、cogroup。

      后三种操作都涉及重排,称为shuffle类操作。

    Action操作可以分为如下几种:

    • 生成标量,如count(返回RDD中元素的个数)、reduce、fold/aggregate(返回几个标量)、take(返回前几个元素)。
    • 生成Scala集合类型,如collect(把RDD中的所有元素导入Scala集合类型)、lookup(查找对应key的所有值)。
    • 写入存储,如与前文textFile对应的saveAsTextFile。
    • 还有一个检查点算子checkpoint。当Lineage特别长时(这在图计算中时常发生),出错时重新执行整个序列要很长时间,可以主动调用checkpoint把当前数据写入稳定存储,作为检查点。
  • Shuffle

    Shuffle是MapReduce框架中的一个特定的phase,介于Map phase和Reduce phase之间,当Map的输出结果要被Reduce使用时,每一条输出结果需要按key哈希,并且分发到对应的Reducer上去,这个过程就是shuffle。由于shuffle涉及到了磁盘的读写和网络的传输,因此shuffle性能的高低直接影响到了整个程序的运行效率。

    下图清晰地描述了MapReduce算法的整个流程。

    图12 算法流程

    概念上shuffle就是一个沟通数据连接的桥梁,实际上shuffle这一部分是如何实现的呢,下面就以Spark为例讲解shuffle在Spark中的实现。

    Shuffle操作将一个Spark的Job分成多个Stage,前面的stages会包括一个或多个ShuffleMapTasks,最后一个stage会包括一个或多个ResultTask。

  • Spark Application的结构

    Spark Application的结构可分为两部分:初始化SparkContext和主体程序。

    • 初始化SparkContext:构建Spark Application的运行环境。

      构建SparkContext对象,如:

      new SparkContext(master, appName, [SparkHome], [jars])

      参数介绍:

      master:连接字符串,连接方式有local、yarn-cluster、yarn-client等。

      appName:构建的Application名称。

      SparkHome:集群中安装Spark的目录。

      jars:应用程序代码和依赖包。

    • 主体程序:处理数据

    提交Application的描述请参见:https://archive.apache.org/dist/spark/docs/3.1.1/submitting-applications.html

  • Spark shell命令

    Spark基本shell命令,支持提交Spark应用。命令为:

    ./bin/spark-submit \
      --class <main-class> \
      --master <master-url> \
      ... # other options
      <application-jar> \
      [application-arguments]

    参数解释:

    --class:Spark应用的类名。

    --master:Spark用于所连接的master,如yarn-client,yarn-cluster等。

    application-jar:Spark应用的jar包的路径。

    application-arguments:提交Spark应用的所需要的参数(可以为空)。

  • Spark JobHistory Server

    用于监控正在运行的或者历史的Spark作业在Spark框架各个阶段的细节以及提供日志显示,帮助用户更细粒度地去开发、配置和调优作业。

Mapreduce框架常见问题

更多常见问题 >>
  • MapReduce是Hadoop的核心,是Google提出的一个软件架构,用于大规模数据集(大于1TB)的并行运算。概念“Map(映射)”和“Reduce(化简)”,及他们的主要思想,都是从函数式编程语言借来的,还有从矢量编程语言借来的特性。

  • 框架管理器协同TBE为神经网络生成可执行的离线模型。在神经网络执行之前,框架管理器与昇腾AI处理器紧密结合生成硬件匹配的高性能离线模型,并拉通了流程编排器和运行管理器使得离线模型和昇腾AI处理器进行深度融合。

  • MapReduce服务(MRS)打造了高可靠、高安全、易使用的运行维护平台,对外提供大容量的数据存储和分析能力,可解决各大企业的数据存储和处理需求

  • GaussDB是华为自主创新研发的分布式关系型数据库。具备企业级复杂事务混合负载能力,同时支持分布式事务,同城跨AZ部署,数据0丢失,支持1000+节点的扩展能力,PB级海量存储。

  • MapReduce服务(MapReduce Service)提供租户完全可控的企业级大数据集群云服务,轻松运行Hadoop、Spark、HBase、KafKa、Storm等大数据组件。

  • 学完本课程后,您将能够:描述深度学习框架是什么;列举主流深度学习框架有哪些;了解Pytorch的特点;了解TensorFlow的特点;区别TensorFlow 1.X与2.X版本;掌握TensorFlow 2的基本语法与常用模块;掌握MNIST手写体数字识别实验的流程。