精选文章 flink源码解析3 ExecutionGraph的形成与物理执行

flink源码解析3 ExecutionGraph的形成与物理执行

作者:代码届彭于晏 时间: 2019-11-04 08:10:29
代码届彭于晏 2019-11-04 08:10:29

flink在client端形成jobGraph之后会提交给JobMaster ,在这里会形成ExecutionGraph

JobMaster的构造函数中有这么一句话:

this.executionGraph = this.createAndRestoreExecutionGraph(this.jobManagerJobMetricGroup);

一直追踪导EecutionGraphBulider#buildGraph

这个方法中有比较重要的一句话

  // 根据JobVertex列表,生成execution graph
        executionGraph.attachJobGraph(sortedTopology);

 根据jobGraph生成executionGraph的大部分逻辑都在这个方法中

for (JobVertex jobVertex : topologiallySorted) {

   if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
      this.isStoppable = false;
   }

   // create the execution job vertex and attach it to the graph
   ExecutionJobVertex ejv = new ExecutionJobVertex(
      this,
      jobVertex,
      1,
      rpcTimeout,
      globalModVersion,
      createTimestamp);

   ejv.connectToPredecessors(this.intermediateResults);

   ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
   if (previousTask != null) {
      throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
            jobVertex.getID(), ejv, previousTask));
   }

看上面这段代码的逻辑:

首先会遍历所有的JobVertex,根据每一个JobVertex生成一个ExecutionJobVertex。重点在ExecutionJobVertex的构造函数中:

重要的代码片段:

this.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets
for (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {
   final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);

   this.producedDataSets[i] = new IntermediateResult(
         result.getId(),
         this,
         numTaskVertices,
         result.getResultType());
}

首先会创建一个producedDataSets列表,然后根据JobVertext中的ProducedDataSet变量,给produceDatSets列表赋值

for (int i = 0; i < numTaskVertices; i++) {
   ExecutionVertex vertex = new ExecutionVertex(
         this,
         i,
         producedDataSets,
         timeout,
         initialGlobalModVersion,
         createTimestamp,
         maxPriorAttemptsHistoryLength);

   this.taskVertices[i] = vertex;
}

这里则是根据并行度,创建一个ExecutionVertex,每个并行度就是一个ExecutionVertex,然后放在taskVertices数组中。

ExecutionJobVertex创建完毕之后会进入
ejv.connectToPredecessors(this.intermediateResults);

追踪导ExectionVertex#connectSource

public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {

   final DistributionPattern pattern = edge.getDistributionPattern();
   final IntermediateResultPartition[] sourcePartitions = source.getPartitions();

   ExecutionEdge[] edges;

   switch (pattern) {
      case POINTWISE:
         edges = connectPointwise(sourcePartitions, inputNumber);
         break;

      case ALL_TO_ALL:
         edges = connectAllToAll(sourcePartitions, inputNumber);
         break;

      default:
         throw new RuntimeException("Unrecognized distribution pattern.");

   }

这里根据switch case 有两个分支,先看第一个分支

private ExecutionEdge[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
   final int numSources = sourcePartitions.length;
   final int parallelism = getTotalNumberOfParallelSubtasks();

   // simple case same number of sources as targets
   if (numSources == parallelism) {
      return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) };
   }
   else if (numSources < parallelism) {

      int sourcePartition;

      // check if the pattern is regular or irregular
      // we use int arithmetics for regular, and floating point with rounding for irregular
      if (parallelism % numSources == 0) {
         // same number of targets per source
         int factor = parallelism / numSources;
         sourcePartition = subTaskIndex / factor;
      }
      else {
         // different number of targets per source
         float factor = ((float) parallelism) / numSources;
         sourcePartition = (int) (subTaskIndex / factor);
      }

      return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[sourcePartition], this, inputNumber) };
   }
   else {
      if (numSources % parallelism == 0) {
         // same number of targets per source
         int factor = numSources / parallelism;
         int startIndex = subTaskIndex * factor;

         ExecutionEdge[] edges = new ExecutionEdge[factor];
         for (int i = 0; i < factor; i++) {
            edges[i] = new ExecutionEdge(sourcePartitions[startIndex + i], this, inputNumber);
         }
         return edges;
      }
      else {
         float factor = ((float) numSources) / parallelism;

         int start = (int) (subTaskIndex * factor);
         int end = (subTaskIndex == getTotalNumberOfParallelSubtasks() - 1) ?
               sourcePartitions.length :
               (int) ((subTaskIndex + 1) * factor);

         ExecutionEdge[] edges = new ExecutionEdge[end - start];
         for (int i = 0; i < edges.length; i++) {
            edges[i] = new ExecutionEdge(sourcePartitions[start + i], this, inputNumber);
         }

         return edges;
      }
   }
}

逻辑比较复杂,描述一下:

这里会获取到ExecutionVertex的并行度和上游的IntermediateResultPartition的数目来执行不同的策略:

(1) 如果并发数等于partition数,则一对一进行连接。如下图所示:
numSources == parallelism

flink源码解析3 ExecutionGraph的形成与物理执行1

(2) 如果并发数大于partition数,则一对多进行连接。如下图所示:
numSources < parallelism,且parallelism % numSources == 0

flink源码解析3 ExecutionGraph的形成与物理执行2

(3) 如果并发数小于partition数,则多对一进行连接。如下图所示:
numSources > parallelism,且numSources % parallelism == 0

flink源码解析3 ExecutionGraph的形成与物理执行3

 

再看conectAlltoAll,全连接模式

        ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length];
 
        for (int i = 0; i < sourcePartitions.length; i++) {
            IntermediateResultPartition irp = sourcePartitions[i];
            edges[i] = new ExecutionEdge(irp, this, inputNumber);
        }
 
        return edges;

这就有点类似sql中的join操作的笛卡尔积模式

ExecutionVertex有两个不同的输入:输入A和B。其中输入A的partition=1, 输入B的partition=8,那么这个二维数组inputEdges如下(为简短,以irp代替IntermediateResultPartition)

[ ExecutionEdge[ A.irp[0]] ]
[ ExecutionEdge[ B.irp[0], B.irp[1], ..., B.irp[7] ]

------------------------------------

接着看物理执行图

找到ExecutionGraph#scheduleForExecution方法

通常都是

case EAGER:
   newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout);

这种模式

private CompletableFuture scheduleEager(SlotProvider slotProvider, final Time timeout) {
   checkState(state == JobStatus.RUNNING, "job is not running currently");

   // Important: reserve all the space we need up front.
   // that way we do not have any operation that can fail between allocating the slots
   // and adding them to the list. If we had a failure in between there, that would
   // cause the slots to get lost
   final boolean queued = allowQueuedScheduling;

   // collecting all the slots may resize and fail in that operation without slots getting lost
   final ArrayList> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());

   // allocate the slots (obtain all their futures
   for (ExecutionJobVertex ejv : getVerticesTopologically()) {
      // these calls are not blocking, they only return futures
      Collection> allocationFutures = ejv.allocateResourcesForAll(
         slotProvider,
         queued,
         LocationPreferenceConstraint.ALL,
         allocationTimeout);

      allAllocationFutures.addAll(allocationFutures);
   }

   // this future is complete once all slot futures are complete.
   // the future fails once one slot future fails.
   final ConjunctFuture> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);

   final CompletableFuture currentSchedulingFuture = allAllocationsFuture
      .thenAccept(
         (Collection executionsToDeploy) -> {
            for (Execution execution : executionsToDeploy) {
               try {
                  execution.deploy();
               } catch (Throwable t) {
                  throw new CompletionException(
                     new FlinkException(
                        String.format("Could not deploy execution %s.", execution),
                        t));
               }
            }
         })
      // Generate a more specific failure message for the eager scheduling
      .exceptionally(
         (Throwable throwable) -> {
            final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
            final Throwable resultThrowable;

            if (strippedThrowable instanceof TimeoutException) {
               int numTotal = allAllocationsFuture.getNumFuturesTotal();
               int numComplete = allAllocationsFuture.getNumFuturesCompleted();
               String message = "Could not allocate all requires slots within timeout of " +
                  timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete;

               resultThrowable = new NoResourceAvailableException(message);
            } else {
               resultThrowable = strippedThrowable;
            }

            throw new CompletionException(resultThrowable);
         });

   return currentSchedulingFuture;
}

这段代码大量使用jdk8新增的CompleteFuture特性,这里不做介绍,网上有大量文章介绍这个组件

这里会根据

ExecutionJobVertices的数量创建异步任务。并且给每个ExecutionJobVertices分配适当的slot,然后调用
execution.deploy();方法

上面方法进入之后截取主要的几句话:

final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
   attemptId,
   slot,
   taskRestore,
   attemptNumber);
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

final CompletableFuture submitResultFuture = taskManagerGateway.submitTask(deployment, rpcTimeout);

第一句话

包含了从Execution Graph到真正物理执行图的转换。如将IntermediateResultPartition转化成ResultPartition,ExecutionEdge转成InputChannelDeploymentDescriptor(最终会在执行时转化成InputGate)。

最后通过RPC方法提交task,实际会调用到TaskExecutor.submitTask方法中。
这个方法会创建真正的Task,然后调用task.startTaskThread();开始task的执行。

在Task构造函数中,会根据输入的参数,创建InputGate, ResultPartition, ResultPartitionWriter等。

startTaskThread方法,则会执行executingThread.start,从而调用Task.run方法。

进入TaskExecutor#submitTask

找到task.startTaskThread();

进入之后找到

executingThread.start();

我们要看executingThread的run方法

找到几句核心代码:

invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

这里的invokable即为operator对象实例,通过反射创建

那么用户真正写的逻辑代码在哪里呢?比如word count中的Tokenizer,去了哪里呢?
OneInputStreamTask的基类StreamTask,包含了headOperator和operatorChain。当我们调用dataStream.flatMap(new Tokenizer())的时候,会生成一个StreamFlatMap的operator,这个operator是一个AbstractUdfStreamOperator,而用户的代码new Tokenizer,即为它的userFunction。

所以再串回来,以OneInputStreamTask为例,Task的核心执行代码即为OneInputStreamTask.invoke方法,它会调用StreamTask.run方法,这是个抽象方法,最终会调用其派生类的run方法,即OneInputStreamTask, SourceStreamTask等。

OneInputStreamTask的run方法代码如下:

final OneInputStreamOperator operator = this.headOperator;
    final StreamInputProcessor inputProcessor = this.inputProcessor;
    final Object lock = getCheckpointLock();
        
    while (running && inputProcessor.processInput(operator, lock)) {
        // all the work happens in the "processInput" method
    }

就是一直不停地循环调用inputProcessor.processInput(operator, lock)方法,即StreamInputProcessor.processInput方法:

   public boolean processInput(OneInputStreamOperator streamOperator, final Object lock) throws Exception {
     // ...
     
        while (true) {
            if (currentRecordDeserializer != null) {
           // ...
           
                if (result.isFullRecord()) {
                    StreamElement recordOrMark = deserializationDelegate.getInstance();
                    
              // 处理watermark,则框架处理
                    if (recordOrMark.isWatermark()) {
                       // watermark处理逻辑
                       // ...
                        continue;
                    } else if(recordOrMark.isLatencyMarker()) {
                        // 处理latency mark,也是由框架处理
                        synchronized (lock) {
                            streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                        }
                        continue;
                    } else {
                        // ***** 这里是真正的用户逻辑代码 *****
                        StreamRecord record = recordOrMark.asRecord();
                        synchronized (lock) {
                            numRecordsIn.inc();
                            streamOperator.setKeyContextElement1(record);
                            streamOperator.processElement(record);
                        }
                        return true;
                    }
                }
            }

        // 其他处理逻辑
        // ...
        }
    }

上面的代码中,streamOperator.processElement(record);才是真正处理用户逻辑的代码,以StreamFlatMap为例,即为它的processElement方法:

 public void processElement(StreamRecord element) throws Exception {
        collector.setTimestamp(element);
        userFunction.flatMap(element.getValue(), collector);
    }
勿删,copyright占位
分享文章到微博
分享文章到朋友圈

上一篇:多线程处理集合

下一篇:(转载)用 VUE.JS 做一套 增删改查 CRUD

您可能感兴趣

  • 单日课程超10万节!VIPKID 如何通过实时计算提升上课体验?

    行业解决方案、产品招募中!想赚钱就来传!>>> 摘要:对于以互联网形式的在线教育来说,实时计算应用的场景非常之多,如何通过数据计算来更快、更高效地反馈学习效果保证课程质量是在线教育领域不断探索的主题。本文将从以下四个部分分享,与大家探讨在直播上课过程中如何通过实时计算来提高人效以及系统处理能力。 VIPKID 介绍 核心业务场景 技术实现 总结 Tips: 查看更多最佳实践文章可点击文末链接...

  • 63个Linux基础自测题

    目录 第1部分 1、什么是Linux内核,有什么作用? 2、什么是shell,有什么作用? 3、安装Linux至少需要几个分区? 4、Swap分区的作用是什么? 5、什么是运行级别?Linux有几个运行级别? 6、如何更改默认启动级别? 第2部分 1、什么是Linux终端?Linux终端又称为什么? 2、默认情况下,Linux有几个虚拟终端?如何在不同终端之间切换?如何在X Window与终...

  • JAVA面试(全)

    Java 八大基本数据类型 八大基本类型 Byte,short,long,int,double,float,boolean,char 占用大小及其长度 数据类型 空间(字节B) 取值范围 byte 1 -2^7 ~ 2^7-1 short 2 -2^15~ 2^15-1 char 2 0 ~ 2^16-1 char无需符号位 int 4 -2^31 ~ 2^31-1 float 4 -2^3...

  • 人工智能那么火~如今AI的应用场景都有哪些?

    作者:新智元 链接:https://www.zhihu.com/question/282715644/answer/1329782546 来源:知乎 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 【未来5年AI应用报告】谷歌、DeepMind、英伟达科学家支招企业AI应用 ReWork的一份最新AI落地应用报告,阐述了企业该如何使用AI技术。谷歌的Ian GoodFe...

  • 新职业教育的三节课,凭什么做到今天这样

    历时7天、翻遍15个平台渠道、访谈25位参与课程的从业者、挖掘了136条推文的标题和内容,我们得到了12500字的拆解。可以点击右上角☝:收藏、分享、在看,不用担心看一半,找不到文章。 本文信息公开来源:三节课官方公众号、虎嗅网、36氪、深网、东方财富网、新榜、知乎、简书、增长黑盒、短书··· 我们认为,这可能比任何官方复盘更能诠释:「三节课」是如何在3年内,做到互联网职业教育(Almost...

  • 研报 | 区块链新基建:物联网+区块链如何打造差异化竞争优势?

    感谢分布式资本提供研究支持,以及摩联科技等代表性企业的交流分享。 基于区块链的物联网市场前景:万物互联时代,数据价值越发重要,物联网+区块链的融合创新将成为新的行业趋势。当前物联网模组厂商都在快速铺量,未来如何打造差异化竞争之路?利用物联网终端设备安全可信执行环境,可以将物联网设备可信上链,从而解决物联网终端身份确认与数据确权的问题,保证链上数据与应用场景深度绑定。区块链能够确保数据安全与隐...

  • Netflix 微服务架构设计解析

    1 概述 数年来,Netflix 一直是全球体验最好的在线订阅制视频流媒体服务,其流量占全球互联网带宽容量的 15%以上。 在过去的2019 年,Netflix 已经有 1.67 亿名订阅用户,平均每个季度新增 500 万订户,服务覆盖全球 200 多个国家 / 地区。 Netflix 用户每天在 4000 多部电影和 47000 集电视剧上花费超过 1.65 亿小时的时间。从工程角度看,这...

  • jvm总结(1)

    jvm总体结构 1、类加载子系统 (1)加载阶段 会将.java文件动态编译为.class文件,然后通过类的全限定名,将类的二进制字节流的静态存储结构转化为方法区里的动态数据结构,在内存中生成一个代表该类的java.lang.Class,并提供数据访问入口。 (2)连接阶段 验证:主要确保加载类的正确性 准备:为类分配内存,并且为其静态变量赋默认值 解析:将常量池中的符号引用替换为直接引用(...

华为云40多款云服务产品0元试用活动

免费套餐,马上领取!
CSDN

CSDN

中国开发者社区CSDN (Chinese Software Developer Network) 创立于1999年,致力为中国开发者提供知识传播、在线学习、职业发展等全生命周期服务。