精选文章 Flink DataStream中CoGroup实现原理与三种 join 实现

Flink DataStream中CoGroup实现原理与三种 join 实现

作者:pucheung 时间: 2019-11-05 01:10:44
pucheung 2019-11-05 01:10:44

点击上方蓝

字关注~

       

CoGroup

CoGroup 表示联合分组,将两个不同的DataStream联合起来,在相同的窗口内按照相同的key分组处理,先通过一个demo了解其使用方式:


   
  1. case class Order(id:String, gdsId:String, amount:Double)

  2. case class Gds(id:String, name:String)

  3. case class RsInfo(orderId:String, gdsId:String, amount:Double, gdsName:String)

  4. object CoGroupDemo{

  5. def main(args:Array[String]):Unit={

  6. val env =StreamExecutionEnvironment.getExecutionEnvironment

  7. env.setParallelism(1)

  8. val kafkaConfig =newProperties();

  9. kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

  10. kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");

  11. val orderConsumer =newFlinkKafkaConsumer011[String]("topic1",newSimpleStringSchema, kafkaConfig)

  12. val gdsConsumer =newFlinkKafkaConsumer011[String]("topic2",newSimpleStringSchema, kafkaConfig)

  13. val orderDs = env.addSource(orderConsumer)

  14. .map(x =>{

  15. val a = x.split(",")

  16. Order(a(0), a(1), a(2).toDouble)

  17. })

  18. val gdsDs = env.addSource(gdsConsumer)

  19. .map(x =>{

  20. val a = x.split(",")

  21. Gds(a(0), a(1))

  22. })

  23. orderDs.coGroup(gdsDs)

  24. .where(_.gdsId)// orderDs 中选择key

  25. .equalTo(_.id)//gdsDs中选择key

  26. .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))

  27. .apply(newCoGroupFunction[Order,Gds,RsInfo]{

  28. overridedef coGroup(first: lang.Iterable[Order], second: lang.Iterable[Gds],out:Collector[RsInfo]):Unit={

  29. //得到两个流中相同key的集合

  30. }

  31. })

  32. env.execute()

  33. }}

从源码角度分析CoGrop的实现

  1. 两个DataStream进行CoGroup得到的是一个CoGroupedStreams类型,后面的where、equalTo、window、apply之间的一些转换,最终得到一个WithWindow类型,包含两个dataStream、key选择、where条件、window等属性

  2. 重点:WithWindow 的apply方法Flink DataStream中CoGroup实现原理与三种 join 实现1

  • 对两个DataStream打标签进行区分,得到TaggedUnion,TaggedUnion包含one、two两个属性,分别对应两个流

  • 将两个打标签后的流TaggedUnion 进行union操作合并为一个DataStream类型流unionStream

  • unionStream根据不同的流选择对应where/equalTo条件进行keyBy 得到KeyedStream流

  • 通过指定的window方式得到一个WindowedStream,然后apply一个被CoGroupWindowFunction包装之后的function,后续就是window的操作

    Flink DataStream中CoGroup实现原理与三种 join 实现2

到这里已经将一个CoGroup操作转换为window操作,接着看后续是如何将相同的key的两个流的数据如何组合在一起的

Flink DataStream中CoGroup实现原理与三种 join 实现3

1.  在用户定义CoGroupFunction 被CoGroupWindowFunction包装之后,会接着被InternalIterableWindowFunction包装,一个窗口相同key的所有数据都会在一个Iterable中, 会将其传给CoGroupWindowFunction

2.  在CoGroupWindowFunction中,会将不同流的数据区分开来得到两个list,传给用户自定义的CoGroupFunction中

JOIN

在理解了coGroup的实现后,join实现原理也就比较简单,DataStream join 同样表示连接两个流,也是基于窗口实现,其内部调用了CoGroup的调用链,使用姿势p与调用流程跟CoGroup及其相似,主要有以下两点不同:

  1. 不在使用CoGroupFunction,而是JoinFunction,在JoinFunction里面得到的是来自不同两个流的相同key的每一对数据

  2. 函数调用链

    Flink DataStream中CoGroup实现原理与三种 join 实现4

    中间增加了FlatJoinCoGroupFunction函数调用,使用嵌套遍历方式得到两个流的笛卡尔积传给用户自定义函数

    Flink DataStream中CoGroup实现原理与三种 join 实现5

Left/Right join实现分析

Flink 中DataStream 只提供了inner join 的实现,并未提供left join 与 right join 的实现,那么同样可以通过CoGroup来实现这两种join,以left join 为例,处理逻辑在CoGroupFunction中,实现如下:


   
  1. overridedef coGroup(first: lang.Iterable[Order], second: lang.Iterable[Gds],out:Collector[RsInfo]):Unit={

  2. first.foreach(x =>{

  3. if(!second.isEmpty){

  4. second.foreach(y=>{

  5. out.collect(newRsInfo(x.id,x.gdsId,x.amount,y.name))

  6. })

  7. }

  8. if(second.isEmpty){

  9. out.collect(newRsInfo(x.id,x.gdsId,x.amount,null))

  10. }

  11. })

  12. }

Flink DataStream中CoGroup实现原理与三种 join 实现6

Flink DataStream中CoGroup实现原理与三种 join 实现7

end

关注回复Flink获取更多信息~

Flink DataStream中CoGroup实现原理与三种 join 实现8

勿删,copyright占位
分享文章到微博
分享文章到朋友圈

上一篇:List-练习(11/4)

下一篇:利用Visual Studio提供的编译工具进行代码开发

您可能感兴趣

  • Spark--RDD

    一、RDD的概述 1.1 什么是RDD? RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。 R...

  • 日均百亿级日志处理:微博基于Flink的实时计算平台建设

    作者介绍 吕永卫,微博广告资深数据开发工程师,实时数据项目组负责人。 黄鹏,微博广告实时数据开发工程师,负责法拉第实验平台数据开发、实时数据关联平台、实时算法特征数据计算、实时数据仓库、实时数据清洗组件开发工作。 林发明,微博广告资深数据开发工程师,负责算法实时特征数据计算、实时数据关联平台、实时数据仓库、FlinkStream组件开发工作。 崔泽峰,微博广告资深数据开发工程师,负责实时算法...

  • flinkx架构

    FlinkX是在袋鼠云内部广泛使用的一个基于Flink的异构数据源离线同步工具,用于在多种数据源(MySQL、Oracle、SqlServer、Ftp、Hdfs,HBase、Hive、Elasticsearch等)之间进行高效稳定的数据同步。 FlinkX简化了数据同步任务的开发过程,用户只需提供一份数据同步任务的配置,FlinkX会将配置转化为Flink任务,并自动提交到Flink集群上执...

  • Spark学习之路 (九)SparkCore的调优之数据倾斜调优

    目录 调优概述 数据倾斜发生时的现象 数据倾斜发生的原理 如何定位导致数据倾斜的代码 某个task执行特别慢的情况 某个task莫名其妙内存溢出的情况 查看导致数据倾斜的key的数据分布情况 数据倾斜的解决方案 解决方案一:使用Hive ETL预处理数据 解决方案二:过滤少数导致倾斜的key 解决方案三:提高shuffle操作的并行度 解决方案四:两阶段聚合(局部聚合+全局聚合) 解决方案五...

  • BAT 大厂Java 面试题集锦之核心篇附参考答案

    核心篇 数据结构与算法 网路:TCP/IP, HTTP 操作系统, 文件, shell, CPU, IO, epoll, 非阻塞IO, 进程/线程/协程,锁 HashMap, ConcurrentHashMap实现原理, 链表, 红黑树 git maven 缓存:各种缓存, redis zset与跳跃表 高并发,高可用,降级,限流,容灾,弱依赖 分布式框架 架构设计, clean code,...

  • Spark之路(2、RDD)

    参考https://www.cnblogs.com/qingyunzong/p/8899715.html https://blog.csdn.net/lovehuangjiaju/article/details/48580863 一、RDD概述 1.1什么是RDD RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表...

  • 37.DataSet API之Transformations

    flink 1.8 数据集转换DataSet Transformations 本文档深入研究了数据集上可用的转换。有关Flink Java API的一般介绍,请参阅编程指南 Programming Guide。 有关在具有密集索引的数据集中压缩元素,请参阅 Zip Elements Guide。 Map Map转换在DataSet的每个元素上应用用户定义的map函数。它实现了一对一的映射,也...

  • 36.DataSet API之Overview

    Flink 1.8 Flink DataSet API Programming Guide Flink中的DataSet程序是实现数据集转换的常规程序(例如,Filter,映射,连接,分组)。数据集最初是从某些source数据源创建的(例如,通过读取文件或从本地集合创建)。结果通过接收器sink返回,例如通过sink将数据写入(分布式)文件或标准输出(例如命令行终端)。Flink程序可以在各...

华为云40多款云服务产品0元试用活动

免费套餐,马上领取!
CSDN

CSDN

中国开发者社区CSDN (Chinese Software Developer Network) 创立于1999年,致力为中国开发者提供知识传播、在线学习、职业发展等全生命周期服务。