精选文章 flink源码解读2 jobGraph的形成

flink源码解读2 jobGraph的形成

作者:代码届彭于晏 时间: 2019-11-04 03:24:56
代码届彭于晏 2019-11-04 03:24:56

接着上一篇:

LocalStrteamEnviroment#execute()

JobGraph jobGraph = streamGraph.getJobGraph(); 追踪导

StreamingJobGraphGenerator#createJobGraph

private JobGraph createJobGraph() {

   // make sure that all vertices start immediately
   jobGraph.setScheduleMode(ScheduleMode.EAGER);

   // Generate deterministic hashes for the nodes in order to identify them across
   // submission iff they didn't change.
   Map hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

   // Generate legacy version hashes for backwards compatibility
   List> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
   for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
      legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
   }

   Map>> chainedOperatorHashes = new HashMap<>();

   setChaining(hashes, legacyHashes, chainedOperatorHashes);

   setPhysicalEdges();

   setSlotSharingAndCoLocation();

   configureCheckpointing();

   JobGraphGenerator.addUserArtifactEntries(streamGraph.getEnvironment().getCachedFiles(), jobGraph);

   // set the ExecutionConfig last when it has been finalized
   try {
      jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
   }
   catch (IOException e) {
      throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
            "This indicates that non-serializable types (like custom serializers) were registered");
   }

   return jobGraph;
}

这段代码主要进行了四个大逻辑:

1.遍历stream graph

2。生成operatorChain

3.设置物理边

4.设置SlotSharing Group

先看第一段逻辑:

map hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

遍历StreamGraph 会从source开始遍历求每一个StreamNode的hash码,在计算的时候,一定会确保一个StremNode的所有输入Node都已经计算过了之后才会计算当前的StreamNode

第二段逻辑

StreamingJobGraphGenerator#setChaining()

for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
   createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
}

这里是一个for 循环调用createChain。循环次数根据整个streamGraph的source个数,我们这边只有一个

这边穿插一个介绍

opearor chain 及作用

在StreamGraph中可以知道一个Operator对应一个StreamNode, 考虑一个日常经常遇到的问题,一个DataStream.map().filter() 这个关系中map和filter Operator会组成不同的StreamNode,最后生成Task, 如果这两个Task不在同一个Slot或在不同一个TaskManager中,数据会经过网络从map传到filter,执行性能会很差,考虑到这一点,flink引入 operator chain的概念, 一个operator chain 代表一组可以在同一个Slot执行的Operator串

 什么样的情况可以chain在一起

根据源码信息,如果一个上游opeartor A与下游满足以下关系则可以串在一起

  • 下游的input只有一个即上游
  • 属于同一个SlotSharingGroup
  • 允许Chain打开
  • Partitioner 为ForwardPartitioner
  • 并行度一致
  • ChainingStrategy允许chain在起

当然一个chain可以chain多个operator,只要连续的两个operator满足以下关系

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
   StreamNode upStreamVertex = edge.getSourceVertex();
   StreamNode downStreamVertex = edge.getTargetVertex();

   StreamOperator headOperator = upStreamVertex.getOperator();
   StreamOperator outOperator = downStreamVertex.getOperator();

   return downStreamVertex.getInEdges().size() == 1
         && outOperator != null
         && headOperator != null
         && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
         && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
         && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
            headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
         && (edge.getPartitioner() instanceof ForwardPartitioner)
         && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
         && streamGraph.isChainingEnabled();
}

接着走上面的代码。进入createChain方法

for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
   if (isChainable(outEdge, streamGraph)) {
      chainableOutputs.add(outEdge);
   } else {
      nonChainableOutputs.add(outEdge);
   }
}

这里会遍历streamNode的输出边,然后用上面说的方法判断是不是可以合并在一起,分别放在

chainableOutputs和nonChainableOutputs两个集合中
for (StreamEdge chainable : chainableOutputs) {
   transitiveOutEdges.addAll(
         createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
}

for (StreamEdge nonChainable : nonChainableOutputs) {
   transitiveOutEdges.add(nonChainable);
   createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
}

这里会根据两个集合中的数据情况递归调用createChain方法。最终会把形成的chain放到

chainedOperatorHashes中

。回到

createJobGraph方法。

flink源码解读2 jobGraph的形成1

可以看到这里被合并成了三个节点。

setPhysicalEdges();

这行代码会构建物理边。

setSlotSharingAndCoLocation();

这里会设置

SlotSharing Group

总结一下:

从当前StreamNode开始,一直遍历到结点不能与其串在一起(从代码逻辑上看,StreamNode与其本身是永远可以串在一起), 记录这些能串在一起的结点,递归翻译当前结点的输出后, 然后将保存下来可以串在一起的StreamNode生成一个JobVertex, 最后将JobVertex的输出设置成之前已经翻译的输出JobVertex。

可以发现JobGraph相对于StreamGraph的最主要区别是将一些StreamNode合并成一个JobVertex, 而JobVertex通过JobEdge(物理边)相连, 最大程度的优化了StreamGraph

看一下形成的jobGraph

flink源码解读2 jobGraph的形成2

flink源码解读2 jobGraph的形成3 第二张图中可以看到第二个chain中包含了两个算子。回想下之前的spark源码,是不是有点类似rdd合并成一个stage?

 

勿删,copyright占位
分享文章到微博
分享文章到朋友圈

上一篇:spark中的jvm调优

下一篇:(4)springcloud创建服务消费者(Ribbon)

您可能感兴趣

  • 阿里巴巴高级技术专家章剑锋:大数据发展的 8 个要点

    【与数据同行】已开通综合、数据仓库、数据分析、产品经理、数据治理及机器学习六大专业群,新开招聘交流群,为数据专业求职者和招聘方提供连接机会,扫描以下二维码入群。 正文开始 笔者从 2008 年开始工作到现在也有 11 个年头了,一路走来都在和数据打交道,做过大数据底层框架内核的开发(Hadoop,Pig,Tez,Spark,Livy),也做过上层大数据应用开发(写 MapReduce Job...

  • Flink 新场景:OLAP 引擎性能优化及应用案例

    摘要:本文由阿里巴巴技术专家贺小令(晓令)分享,主要介绍 Apache Flink 新场景 OLAP 引擎,内容分为以下四部分: 背景介绍 Flink OLAP 引擎 案例介绍 未来计划 Tips:点击「阅读原文」可下载作者分享 PPT~ 一、背景介绍 1.OLAP 及其分类 OLAP 是一种让用户可以用从不同视角方便快捷的分析数据的计算方法。主流的 OLAP 可以分为3类:多维 OLAP ...

  • Apache Flink 的迁移之路,2 年处理效果提升 5 倍

    一、背景与痛点 在 2017 年上半年以前,TalkingData 的 App Analytics 和 Game Analytics 两个产品,流式框架使用的是自研的 td-etl-framework。该框架降低了开发流式任务的复杂度,对于不同的任务只需要实现一个 changer 链即可,并且支持水平扩展,性能尚可,曾经可以满足业务需求。 但是到了 2016 年底和 2017 年上半年,发现...

  • SLA 99.99%以上!饿了么实时计算平台3年演进历程

    http://www.dataguru.cn/article-13319-1.html 作者介绍 倪增光,饿了么BDI-大数据平台研发高级技术经理,曾先后就职于PPTV、唯品会。15年加入饿了么,组建数据架构team,整体负责离线平台、实时平台、平台工具的开发和运维,先后经历了唯品会、饿了么数据平台从无到有到不断完善的过程。 一、背景 饿了么BDI-大数据平台研发团队目前共有20人左右,主要...

  • Flink state、checkPoint

    状态(State) 我们前面写的word count的例子,没有包含状态管理,如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算,从容错和消息处理的语义上,flink引入了state和checkpoint. 首先取分两个概念 1)state一般指一个具体的task/operator的状态【state数据默认保存在java的堆内存中】 2)而checkp...

  • Apache Flink 零基础入门(一):基础概念解析

    一、Apache Flink 的定义、架构及原理 Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行快速计算。 1. Flink Application 了解 Flink 应用开发需要先理解 Flink 的 Streams、State、Time 等基础处理语义以及 Flink 兼顾灵活性和...

  • 日均百亿级日志处理:微博基于Flink的实时计算平台建设

    作者介绍 吕永卫,微博广告资深数据开发工程师,实时数据项目组负责人。 黄鹏,微博广告实时数据开发工程师,负责法拉第实验平台数据开发、实时数据关联平台、实时算法特征数据计算、实时数据仓库、实时数据清洗组件开发工作。 林发明,微博广告资深数据开发工程师,负责算法实时特征数据计算、实时数据关联平台、实时数据仓库、FlinkStream组件开发工作。 崔泽峰,微博广告资深数据开发工程师,负责实时算法...

  • SpringBoot系列-整合Mybatis(注解方式)

    点击上方“Java碎碎念”,关注公众号 优质文章,第一时间送达 上一篇文章《SpringBoot系列-整合Mybatis(XML配置方式)》介绍了XML配置方式整合的过程,本文介绍下SpringBoot通过注解方式整合Mybatis的过程。 本文目录 一、常用注解说明二、实战三、测试四、注意事项 一、常用注解说明 @Mapper 可以给接口自动生成一个实现类,让spring对mapper接口...

华为云40多款云服务产品0元试用活动

免费套餐,马上领取!
CSDN

CSDN

中国开发者社区CSDN (Chinese Software Developer Network) 创立于1999年,致力为中国开发者提供知识传播、在线学习、职业发展等全生命周期服务。