1.Transformation操作
主要做的是就是将一个已有的RDD生成另外一个RDD。Transformation具有lazy特性(延迟加载)。Transformation算子的代码不会真正被执行。只有当我们的程序里面遇到一个action算子的时候,代码才会真正的被执行对
1.1一个RDD进行转换操作
举例:对一个数组为{1, 2, 3, 3}的RDD进行基本的RDD转化操作
函数名 |
含义 |
示例 |
结果 |
---|---|---|---|
map(func) | 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 | rdd.map(x => x + 1) | {2, 3, 4, 4} |
flatMap() | 将函数应用于RDD 中的每个元素,将返回的迭代器的所有内容构成新的RDD。通常用来切分单词 | rdd.flatMap(x->Arrays.asList(x.split(",")).iterator()) | {1, 2, 3,3} |
parallelize() |
使用已经存在的迭代器或者集合通过调用spark驱动程序提供的parallelize函数来创建并行集合 |
JavaRDD |
|
filter() | 返回一个由通过传给filter()的函数的元素组成的RDD | rdd.filter(x->x!=1) | {2,3,3,} |
distinct() | 去重 | rdd.distinct() | {1,2,3} |
sample(s,faraction,[seed]) | 对RDD采样,以及是否替换 | rdd.sample(false,0.5) | 非确定的 |
Java实现
|
1.1.1Map
单个RDDmap 折叠源码
|
执行结果:
----MapAPI start----
a!
b!
c!
----MapAPI end----
1.1.2FlatMap
单个RDDflatMap 折叠源码
|
执行结果:
----FlatMapAPI start----
a
b
cde
12
34
5
----FlatMapAPI end----
1.1.3Fliter
单个RDDFliter 折叠源码
|
输出:
----FliterAPI start----
12,34,5
----Fliter end----
1.1.4CountByValue
单个RDDFliter 折叠源码
|
----CountByValueAPI end----
2,1
3,2
1,1
----CountByValueAPI end----
1.1.5ReduceByKey
ReduceByKey 折叠源码
|
1.1.6GroupByKey
groupByKey 折叠源码
|
输出:
a 1
b 2
c 3
d 3
a 2
(d,[(d,3)])
(a,[(a,1), (a,2)])
(b,[(b,2)])
(c,[(c,3)])
1.2 两个RDD的转换操作
1.Java实现
JavaCoreAPI 折叠源码
|
Action操作
行动操作会把最终求得的结果返回到驱动器程序,或者写入外部存储系统中。它往往会触发转化RDD的运行。
调试小数据集的情况下,我们可以使用RDD的collection()函数,获取整个RDD中的数据。但这又一个前提,要单机driver内存能够成功存下这所有的数据才行。生产环境下这显然是不现实的。我们一般将数据写到HDFS或Amazon S3这种分布式的存储系统中。
函数名 |
目的 |
示例 |
结果 |
---|---|---|---|
collect() | 返回RDD中的所有元素 | rdd.collect() | {1,2,3,3} |
count() | RDD 中的元素个数 | rdd.count() | 4 |
countByValue() | 各元素在RDD中出现的次数 | rdd.countByValue() | {(1,1),(2,1),(3,2)} |
take() | 从RDD中返回的num个数 | rdd.take(2) | {1,2} |
reduce(func) | 并行整合RDD中的所有数据,例如Sum | rdd.reduce((x,y)=>x+y) | 9 |
fold(zero)(func) | 和reduce()一样,但是需要提供初始值 | rdd.fold(0).((x,y)=>x+y) | 9 |
aggregate(zeroValue)(seqOp,combOp) | 和reduce一样,但是通常返回不同类型的函数 | rdd.aggregate((0, 0)) ((x, y) =>(x._1 + y, x._2 + 1),(x, y) =>(x._1 + y._1, x._2 + y._2)) | (9,4) |
foreach(func) | 对RDD中的每个元素使用给定的函数 | rdd.foreach(func) | 无 |
Java 中的reduce()
Integer sum = rdd.reduce(new Function2() {
public Integer call(Integer x, Integer y) { return x + y; }
});
在Spark2.1的Java版中,使用JavaRDD来代替List<>类型的列表数组,里面可以盛放String,Integer,Long等类型
JavaPairRDD类似于一个用来存储某些不好详细定义key的数据,其value可以是键值对类型的数据。Spark 为包含键值对类型的 RDD 提供了一些专有的操作,称为 pair RDD。它们提供了并行操作各个键或跨节点重新进行数据分组 的操作接口。
通常从一个 RDD 中提取某些字段(例如代表事件时间、用户 ID 或者其他标识符的字段), 使用这些字段作为 pair RDD 操作中的键。
备注tuple._1()和tuple._1是一样
Stackflow一个转换方法
JxavaRDD> myRdd // This is my JavaRDD
JavaPairRDD pairRDD = JavaPairRDD.fromJavaRDD(myRdd);
键值对RDD
众所周知,Hadoop的MapReduce一般处理键值对数据。Map输出,Reduce输入和输出都是Key-Value形式。在Spark中我们也支持键值对形式。并且,Spark的键值对Rdd相对MapReduce的键值对来说,操作更为简单,形式更加复杂多样。Spark 为包含键值对类型的RDD 提供了一些专有的操作。这些RDD 被称为pair RDD。
创建键值对RDD
Java 没有自带的二元组类型,因此Spark 的Java API 让用户使用scala.Tuple2 类来创建二
元组。这个类很简单:Java 用户可以通过new Tuple2(elem1, elem2) 来创建一个新的二元
组,并且可以通过._1() 和._2() 方法访问其中的元素。
在Java 中使用第一个单词作为键创建出一个pair RDD
很多存储键值对的数据格式会 在读取时直接返回由其键值对数据组成的 pair RDD。 RDD 转 为 pair RDD 时,可以调用 map() 函数来实现,传递的函数需要返回键值对。
Java 没有自带的二元组类型,因此 Spark 的 Java API 让用户使用 scala.Tuple2 类来创建二 元组。可以通过 new Tuple2(elem1, elem2) 来创建一个新的二元 组,并且可以通过 ._1() 和 ._2() 方法访问其中的元素
测试Demo
打印JavaPairRDD内容 折叠源码
|