MAPREDUCE服务 MRS-Spark基本原理:Spark Streaming原理

时间:2023-11-07 14:09:49

Spark Streaming原理

Spark Streaming是一种构建在Spark上的实时计算框架,扩展了Spark处理大规模流式数据的能力。当前Spark支持两种数据处理方式:

  • Direct Streaming

    Direct Streaming方式主要通过采用Direct API对数据进行处理。以Kafka Direct接口为例,与启动一个Receiver来连续不断地从Kafka中接收数据并写入到WAL中相比,Direct API简单地给出每个batch区间需要读取的偏移量位置。然后,每个batch的Job被运行,而对应偏移量的数据在Kafka中已准备好。这些偏移量信息也被可靠地存储在checkpoint文件中,应用失败重启时可以直接读取偏移量信息。

    图4 Direct Kafka接口数据传输

    需要注意的是,Spark Streaming可以在失败后重新从Kafka中读取并处理数据段。然而,由于语义仅被处理一次,重新处理的结果和没有失败处理的结果是一致的。

    因此,Direct API消除了需要使用WAL和Receivers的情况,且确保每个Kafka记录仅被接收一次,这种接收更加高效。使得Spark Streaming和Kafka可以很好地整合在一起。总体来说,这些特性使得流处理管道拥有高容错性、高效性及易用性,因此推荐使用Direct Streaming方式处理数据。

  • Receiver

    在一个Spark Streaming应用开始时(也就是Driver开始时),相关的StreamingContext(所有流功能的基础)使用SparkContext启动Receiver成为长驻运行任务。这些Receiver接收并保存流数据到Spark内存中以供处理。用户传送数据的生命周期如图5所示:

    图5 数据传输生命周期
    1. 接收数据(蓝色箭头)

      Receiver将数据流分成一系列小块,存储到Executor内存中。另外,在启用预写日志(Write-ahead Log,简称WAL)以后,数据同时还写入到容错文件系统的预写日志中。

    2. 通知Driver(绿色箭头)

      接收块中的元数据被发送到Driver的StreamingContext。这个元数据包括:

      • 定位其在Executor内存中数据位置的块Reference ID。
      • 若启用了WAL,还包括块数据在日志中的偏移信息。
    3. 处理数据(红色箭头)

      对每个批次的数据,StreamingContext使用Block信息产生RDD及其Job。StreamingContext通过运行任务处理Executor内存中的Block来执行Job。

    4. 周期性的设置检查点(橙色箭头)

      为了容错的需要,StreamingContext会周期性的设置检查点,并保存到外部文件系统中。

容错性

Spark及其RDD允许无缝的处理集群中任何Worker节点的故障。鉴于Spark Streaming建立于Spark之上,因此其Worker节点也具备了同样的容错能力。然而,由于Spark Streaming的长正常运行需求,其应用程序必须也具备从Driver进程(协调各个Worker的主要应用进程)故障中恢复的能力。使Spark Driver能够容错是件很棘手的事情,因为可能是任意计算模式实现的任意用户程序。不过Spark Streaming应用程序在计算上有一个内在的结构:在每批次数据周期性地执行同样的Spark计算。这种结构允许把应用的状态(也叫做Checkpoint)周期性地保存到可靠的存储空间中,并在Driver重新启动时恢复该状态。

对于文件这样的源数据,这个Driver恢复机制足以做到零数据丢失,因为所有的数据都保存在了像HDFS这样的容错文件系统中。但对于像Kafka和Flume等其他数据源,有些接收到的数据还只缓存在内存中,尚未被处理,就有可能会丢失。这是由于Spark应用的分布操作方式引起的。当Driver进程失败时,所有在Cluster Manager中运行的Executor,连同在内存中的所有数据,也同时被终止。为了避免这种数据损失,Spark Streaming引进了WAL功能。

WAL通常被用于数据库和文件系统中,用来保证任何数据操作的持久性,即先将操作记入一个持久的日志,再对数据施加这个操作。若施加操作的过程中执行失败了,则通过读取日志并重新施加前面指定的操作,系统就得到了恢复。下面介绍了如何利用这样的概念保证接收到的数据的持久性。

Kafka数据源使用Receiver来接收数据,是Executor中的长运行任务,负责从数据源接收数据,并且在数据源支持时还负责确认收到数据的结果(收到的数据被保存在Executor的内存中,然后Driver在Executor中运行来处理任务)。

当启用了预写日志以后,所有收到的数据同时还保存到了容错文件系统的日志文件中。此时即使Spark Streaming失败,这些接收到的数据也不会丢失。另外,接收数据的正确性只在数据被预写到日志以后Receiver才会确认,已经缓存但还没有保存的数据可以在Driver重新启动之后由数据源再发送一次。这两个机制确保了零数据丢失,即所有的数据或者从日志中恢复,或者由数据源重发。

如果需要启用预写日志功能,可以通过如下动作实现:

  • 通过“streamingContext.checkpoint”设置checkpoint的目录,这个目录是一个HDFS的文件路径,既用作保存流的checkpoint,又用作保存预写日志。
  • 设置SparkConf的属性“spark.streaming.receiver.writeAheadLog.enable”“true”(默认值是“false”)。

在WAL被启用以后,所有Receiver都获得了能够从可靠收到的数据中恢复的优势。建议缓存RDD时不采取多备份选项,因为用于预写日志的容错文件系统很可能也复制了数据。

在启用了预写日志以后,数据接收吞吐率会有降低。由于所有数据都被写入容错文件系统,文件系统的写入吞吐率和用于数据复制的网络带宽,可能就是潜在的瓶颈了。在此情况下,建议创建更多的Recevier增加数据接收的并行度,或使用更好的硬件以增加容错文件系统的吞吐率。

恢复流程

当一个失败的Driver重启时,按如下流程启动:

图6 计算恢复流程
  1. 恢复计算(橙色箭头)

    使用checkpoint信息重启Driver,重新构造SparkContext并重启Receiver。

  2. 恢复元数据块(绿色箭头)

    为了保证能够继续下去所必备的全部元数据块都被恢复。

  3. 未完成作业的重新形成(红色箭头)

    由于失败而没有处理完成的批处理,将使用恢复的元数据再次产生RDD和对应的作业。

  4. 读取保存在日志中的块数据(蓝色箭头)

    在这些作业执行时,块数据直接从预写日志中读出。这将恢复在日志中可靠地保存的所有必要数据。

  5. 重发尚未确认的数据(紫色箭头)

    失败时没有保存到日志中的缓存数据将由数据源再次发送。因为Receiver尚未对其确认。

因此通过预写日志和可靠的Receiver,Spark Streaming就可以保证没有输入数据会由于Driver的失败而丢失。

support.huaweicloud.com/productdesc-mrs/mrs_08_000801.html