精选文章 Spark常用API

Spark常用API

作者:sinat_32176267 时间: 2020-04-18 10:16:08
sinat_32176267 2020-04-18 10:16:08

1.Transformation操作

主要做的是就是将一个已有的RDD生成另外一个RDD。Transformation具有lazy特性(延迟加载)。Transformation算子的代码不会真正被执行。只有当我们的程序里面遇到一个action算子的时候,代码才会真正的被执行对

1.1一个RDD进行转换操作

举例:对一个数组为{1, 2, 3, 3}的RDD进行基本的RDD转化操作

函数名

含义

示例

结果

map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 rdd.map(x => x + 1) {2, 3, 4, 4}
flatMap() 将函数应用于RDD 中的每个元素,将返回的迭代器的所有内容构成新的RDD。通常用来切分单词 rdd.flatMap(x->Arrays.asList(x.split(",")).iterator()) {1, 2, 3,3}
parallelize()
使用已经存在的迭代器或者集合通过调用spark驱动程序提供的parallelize函数来创建并行集合
JavaRDD rdd = jsc.parallelize(Arrays.asList("haimie","jojo"));
 
filter() 返回一个由通过传给filter()的函数的元素组成的RDD rdd.filter(x->x!=1) {2,3,3,}
distinct() 去重 rdd.distinct() {1,2,3}
sample(s,faraction,[seed]) 对RDD采样,以及是否替换 rdd.sample(false,0.5) 非确定的

Java实现

 

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.sql.SparkSession;

 

/**

* @author sjy on 2019-12-19

* @version 1.0

*/

public class SparkCoreApiMaster {

private static JavaSparkContext jsc;

public static void main(String[] args) {

SparkConf conf = new SparkConf();

SparkSession session = new SparkSession.Builder()

.config(conf)

.appName("RunModifyiedFilm:shengjunyang@maoyan.com").master("local")

.getOrCreate();

jsc = new JavaSparkContext(session.sparkContext());

SparkApiCore master = new SparkApiCore();

master.testSparkCoreApiMap(jsc);

}

}


1.1.1Map

单个RDDmap 折叠源码

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import java.io.Serializable;

import java.util.Arrays;

 

/**

 * @author sjy on 2019-12-19

 * @version 1.0

 */

public class SparkApiCore implements Serializable {

     

    public void testSparkCoreApiMap(JavaSparkContext jsc) {

        JavaRDD rdd = jsc.parallelize(Arrays.asList("a","b","c"));// 构造数据源,String的数据

        //java表达式

        JavaRDD logData = rdd.map((Function) v1 -> v1+"!");

        //lamda形式 等效

        System.out.println("----MapAPI start----");

        logData.collect().stream().forEach(s-> System.out.println(s.toString()));

        System.out.println("----MapAPI end----");

         

        logData = rdd.map(p -> p + '!');

        System.out.println("----MapAPI start----");

        logData.collect().stream().forEach(s-> System.out.println(s.toString()));

        System.out.println("----MapAPI end----");

    }

}

执行结果:

----MapAPI start----

a!
b!
c!
----MapAPI end----

1.1.2FlatMap

单个RDDflatMap 折叠源码

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import java.io.Serializable;

import java.util.Arrays;

 

/**

 * @author sjy on 2019-12-19

 * @version 1.0

 */

public class SparkApiCore implements Serializable {

     

     /**

     * flatMap 一行变多行,多用于分词之类的操作

     * @param jsc

     */

    public void testSparkCoreAPiFlatMap(JavaSparkContext jsc) {

        JavaRDD rdd = jsc.parallelize(Arrays.asList("a,b,cde""12,34,5"));

 

        JavaRDD words = rdd.flatMap(new FlatMapFunction() {

            @Override

            public Iterator call(String s) throws Exception {

                List strings = Arrays.asList(s.split(","));

                return ((Iterable) strings).iterator();

            }

        });

        //java表达式

        System.out.println("----FlatMapAPI start----");

        words.collect().stream().forEach(s -> System.out.println(s));

        System.out.println("----FlatMapAPI end----");

        //lambda表达式

        words = rdd.flatMap(x -> Arrays.asList(x.split(",")).iterator());

        System.out.println("----FlatMapAPI start----");

        words.collect().stream().forEach(s -> System.out.println(s));

        System.out.println("----FlatMapAPI end----");

    }

}

执行结果:
----FlatMapAPI start----
a
b
cde
12
34
5
----FlatMapAPI end----


1.1.3Fliter

单个RDDFliter 折叠源码

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import java.io.Serializable;

import java.util.Arrays;

 

/**

 * @author sjy on 2019-12-19

 * @version 1.0

 */

public class SparkApiCore implements Serializable {

 /**

     * 过滤a就是  只要满足fliter条件的那项,输出该行的RDD

     * @param jsc

     */

    public void testSparkCoreApiFilter(JavaSparkContext jsc) {

        JavaRDD<String> rdd = jsc.parallelize(Arrays.asList("a,b,cde""12,34,5"));

        //java表达式

        JavaRDD<String> logData = rdd.filter((Function<StringBoolean>) s -> {

            if (s.contains("1")) {

                return true;

            }

            return false;

        });

        //lambda表达式版

        logData =    rdd.filter(p->p.contains("12"));

        System.out.println("----FliterAPI start----");

        logData.collect().stream().forEach(s -> System.out.println(s));

        System.out.println("----Fliter end----");

    }

 

 

}

 

输出:

----FliterAPI start----

12,34,5
----Fliter end----

1.1.4CountByValue

单个RDDFliter 折叠源码

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import java.io.Serializable;

import java.util.Arrays;

 

/**

 * @author sjy on 2019-12-19

 * @version 1.0

 */

public class SparkApiCore implements Serializable {

     /**

     * countByValue 各元素在RDD中出现的次数

     */

    public void testSparkCoreAPiCountByValue(JavaSparkContext jsc) {

        JavaRDD<String> rdd = jsc.parallelize(Arrays.asList("1","2","3","3"));

        Map<StringLong> logData = rdd.countByValue();

        System.out.println("----UnionAPI end----");

        logData.entrySet().stream().forEach(s-> System.out.println(s.getKey()+","+s.getValue()));

        System.out.println("----UnionAPI end----");

    }

}

----CountByValueAPI end----
2,1
3,2
1,1
----CountByValueAPI end----

1.1.5ReduceByKey

ReduceByKey 折叠源码

/**

    * reduceByKey 就是将key相同的键值对,按照Function进行计算最后得出各个key和个数的二元组 一行关系String

    * [(cde,1), (2,1),多个 (3,2), (1,1)]

    * 在对大数据进行复杂计算时,reduceByKey优于groupByKey。reduceByKey会在端前先对共用的key进行结合。

    * @param

    */

   public void testSparkCoreAPiReduce(JavaSparkContext jsc) {

       JavaRDD rdd = jsc.parallelize(Arrays.asList(1,2,3,1));

       rdd.foreach(a-> System.out.println(a.toString()));

       //1

       //2

       //3

       //1

       //reduce的需要是int类型,reduce 并行整合RDD中所有数据  输入1,2,3,4 输出10 ,将各个数累加,把多行变成一行

       Integer reduceResult = rdd.reduce((a, b) -> a + b);//7

       System.out.println(reduceResult);

       //reduceBykey

       //1.转换为k-v形式

       JavaPairRDD pairRdd = rdd.mapToPair(k -> new Tuple2<>(String.valueOf(k), 1));

       List list = pairRdd.reduceByKey((x, y) -> x + y).collect();

       System.out.println(list);//输出[(2,1), (3,1), (1,2)]

   }

 

1.1.6GroupByKey

groupByKey 折叠源码

/**

     * groupByKey  对pair中的

     * 进行按键值合并对pair中的key进行group by操作

     * pair1RDD数据集("a 1"),("b 2"),("a 3"),("b 4")

     * result=(a,[1,3])

     * (b,[2,4])

     */

    public static void testSparkCoreApiGroupByKey(JavaSparkContext jsc) {

        JavaRDD rdd = jsc.parallelize(Arrays.asList("a 1","b 2","c 3","d 3","a 2"));

        rdd.foreach(s -> System.out.println(s.toString()));

        JavaPairRDD pairRDD = rdd.mapToPair((PairFunction) s -> {

            String[] st = s.split(" ");

            return new Tuple2(st[0], Integer.valueOf(st[1]));

        });

        pairRDD.groupBy(p -> p._1()).collect().forEach(p -> System.out.println(p.toString()));

    }

输出:   

a 1
b 2
c 3
d 3
a 2

(d,[(d,3)])
(a,[(a,1), (a,2)])
(b,[(b,2)])
(c,[(c,3)])

1.2 两个RDD的转换操作

1.Java实现

JavaCoreAPI 折叠源码

/**

 * 测试map操作

 * @param rdd

 */

private static void testSparkCoreApiMap(JavaRDD rdd){

    JavaRDD logData1 =rdd.map(new Function() {

        @Override

        public String call(String s) throws Exception {

            return s;

        }

    });

    List list = logData1.collect();

    for (int i = 0; i < list.size(); i++) {

        System.out.println(list.get(i));

    }

}

 

/**

 * 过滤a就是  只要满足fliter条件的那项。

 * @param rdd

 */

private static void testSparkCoreApiFilter(JavaRDD rdd){

    JavaRDD logData1 = rdd.filter(new Function() {

        @Override

        public Boolean call(String s) throws Exception {

            return s.split(" ")[0].equals("a");

        }

    });

    List list = logData1.collect();

    for (int i = 0; i < list.size(); i++) {

        System.out.println(list.get(i));

    }

}

 

/**

 * flatMap 有问题

 * @param rdd

 */

private static void testSparkCoreAPiFlatMap(JavaRDD rdd) {

    JavaRDD words = rdd.flatMap(

            new FlatMapFunction() {

                @Override

                public Iterator call(String s) throws Exception {

                    return (Iterator) Arrays.asList(s.split(" "));

                }

            });

    List list = words.collect();

    for (int i = 0; i < list.size(); i++) {

        System.out.println(list.get(i));

    }

}

 

/**

 * 合并两个RDD

 * @param rdd

 */

private static void testSparkCoreAPiUnion(JavaRDD rdd){

    JavaRDD unionRdd = rdd.union(rdd);

    unionRdd.foreach(new VoidFunction() {

        @Override

        public void call(String s) throws Exception {

            System.out.println(s);

        }

    });

}

 

/**

 * pairRDD

 * @param rdd

 */

private static void testSparkCoreAPiMaptoPair(JavaRDD rdd){

    JavaPairRDD pairRdd = rdd.mapToPair(new PairFunction() {

        @Override

        public Tuple2 call(String s) throws Exception {

            String[] st = s.split(" ");

            return new Tuple2(st[0],st[1]);

        }

    });

    pairRdd.foreach(new VoidFunction>() {

        @Override

        public void call(Tuple2 t) throws Exception {

            System.out.println(t._2+":"+t._1);

        }

    });

}

/**

 进行按键值合并

 *

 */

private static void testSparkCoreApiGroupByKey(JavaRDD rdd){

    JavaPairRDD pairRDD = rdd.mapToPair(new PairFunction() {

        @Override

        public Tuple2 call(String s) throws Exception {

            String[] st = s.split(" ");

            return new Tuple2(st[0],Integer.valueOf(st[1]));

        }

    });

 

    JavaPairRDD pairRDD2 = pairRDD.union(pairRDD).reduceByKey(new Function2() {

        @Override

        public Integer call(Integer integer, Integer integer2) throws Exception {

            return integer+integer2;

        }

    }).sortByKey();

    pairRDD2.foreach(new VoidFunction>() {

        @Override

        public void call(Tuple2 stringIntegerTuple2) throws Exception {

            System.out.println(stringIntegerTuple2._2);

        }

    });

}

 

/**

 * 对RDD进行递归调用

 */

private static void testSParkRDDCoreAPiReduce(JavaRDD rdd){

    JavaRDD rdd1 = rdd.map(new Function() {

        @Override

        public Integer call(String s) throws Exception {

            return Integer.valueOf(s.split(" ")[1]);

        }

    });

    Integer a = rdd1.reduce(new Function2() {

        @Override

        public Integer call(Integer v1, Integer v2) throws Exception {

            return v1+v2;

        }

    });

    System.out.println(a);

}

Action操作

行动操作会把最终求得的结果返回到驱动器程序,或者写入外部存储系统中。它往往会触发转化RDD的运行。
调试小数据集的情况下,我们可以使用RDD的collection()函数,获取整个RDD中的数据。但这又一个前提,要单机driver内存能够成功存下这所有的数据才行。生产环境下这显然是不现实的。我们一般将数据写到HDFS或Amazon S3这种分布式的存储系统中。

函数名

目的

示例

结果

collect() 返回RDD中的所有元素 rdd.collect() {1,2,3,3}
count() RDD 中的元素个数 rdd.count() 4
countByValue() 各元素在RDD中出现的次数 rdd.countByValue() {(1,1),(2,1),(3,2)}
take() 从RDD中返回的num个数 rdd.take(2) {1,2}
reduce(func) 并行整合RDD中的所有数据,例如Sum rdd.reduce((x,y)=>x+y) 9
fold(zero)(func) 和reduce()一样,但是需要提供初始值 rdd.fold(0).((x,y)=>x+y) 9
aggregate(zeroValue)(seqOp,combOp) 和reduce一样,但是通常返回不同类型的函数 rdd.aggregate((0, 0)) ((x, y) =>(x._1 + y, x._2 + 1),(x, y) =>(x._1 + y._1, x._2 + y._2)) (9,4)
foreach(func) 对RDD中的每个元素使用给定的函数 rdd.foreach(func)

 

Java 中的reduce()

Integer sum = rdd.reduce(new Function2() {
public Integer call(Integer x, Integer y) { return x + y; }
});

在Spark2.1的Java版中,使用JavaRDD来代替List<>类型的列表数组,里面可以盛放String,Integer,Long等类型

JavaPairRDD类似于一个用来存储某些不好详细定义key的数据,其value可以是键值对类型的数据。Spark 为包含键值对类型的 RDD 提供了一些专有的操作,称为 pair RDD。它们提供了并行操作各个键或跨节点重新进行数据分组 的操作接口。
通常从一个 RDD 中提取某些字段(例如代表事件时间、用户 ID 或者其他标识符的字段), 使用这些字段作为 pair RDD 操作中的键。

备注tuple._1()和tuple._1是一样

Stackflow一个转换方法

JxavaRDD> myRdd // This is my JavaRDD
JavaPairRDD pairRDD = JavaPairRDD.fromJavaRDD(myRdd);

 

键值对RDD

众所周知,Hadoop的MapReduce一般处理键值对数据。Map输出,Reduce输入和输出都是Key-Value形式。在Spark中我们也支持键值对形式。并且,Spark的键值对Rdd相对MapReduce的键值对来说,操作更为简单,形式更加复杂多样。Spark 为包含键值对类型的RDD 提供了一些专有的操作。这些RDD 被称为pair RDD。

创建键值对RDD

Java 没有自带的二元组类型,因此Spark 的Java API 让用户使用scala.Tuple2 类来创建二
元组。这个类很简单:Java 用户可以通过new Tuple2(elem1, elem2) 来创建一个新的二元
组,并且可以通过._1() 和._2() 方法访问其中的元素。
在Java 中使用第一个单词作为键创建出一个pair RDD

很多存储键值对的数据格式会 在读取时直接返回由其键值对数据组成的 pair RDD。 RDD 转 为 pair RDD 时,可以调用 map() 函数来实现,传递的函数需要返回键值对。

Java 没有自带的二元组类型,因此 Spark 的 Java API 让用户使用 scala.Tuple2 类来创建二 元组。可以通过 new Tuple2(elem1, elem2) 来创建一个新的二元 组,并且可以通过 ._1() 和 ._2() 方法访问其中的元素

测试Demo

打印JavaPairRDD内容 折叠源码

SparkSession spark = SparkSession.builder().master("local").appName("JavaQuery").getOrCreate();

JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

JavaRDD rdd = jsc.parallelize(Arrays.asList("haimie","jojo"));

//把javaRDD转换成JavaPairRDD

JavaPairRDD pairRDD = rdd.mapToPair(x->{

    return new Tuple2(x,x+"123");

});

//打印JavaPairRDD每一行数据

 List> tuple2s =   pairRDD.collect();

 for (Tuple2 tuple:tuple2s){

     System.out.println(String.format("tuple is:%s %s",tuple._1,tuple._2));

 }

打印结果:

tuple is:haimie haimie123

tuple is:jojo jojo123

例子

1.flatMap

https://www.jianshu.com/p/d588f98d336a

勿删,copyright占位
分享文章到微博
分享文章到朋友圈

上一篇:vue的methods,watch,computed

下一篇:git小技巧总结

您可能感兴趣

  • 前端路 - Webpack

    概述 本质 JavaScript 应用程序的静态模块打包器 核心 加载器(Loader)机制 工作流程 配置初始化 webpack 会首先读取配置文件,执行默认配置 编译前准备 webpack 会实例化 compiler,注册 plugins、resolverFactory、hooks。 reslove 前准备 webpack 实例化 compilation、NormalModuleFact...

  • Java异常面试题(2020最新版)

    文章目录 Java异常架构与异常关键字 Java异常简介 Java异常架构 1. Throwable 2. Error(错误) 3. Exception(异常) 运行时异常 编译时异常 4. 受检异常与非受检异常 受检异常 非受检异常 Java异常关键字 Java异常处理 声明异常 抛出异常 捕获异常 如何选择异常类型 常见异常处理方式 直接抛出异常 封装异常再抛出 捕获异常 自定义异常 t...

  • 爱了,3174页实战pdf集锦:Redis+多线程+Dubbo+JVM+kafka+MySQL

    写在前面 作为一名Java开发者,在现在这个信息化时代很快的时代,很少会有人停下脚步去思考以及去总结,忽略了很重要的一个步骤,没有反思和总结,只会用原来固有的想法去做事情,所以还是需要隔一段时间去总结。LZ今天总结了自己在平时会用到的一些: 01—Redis实战 在Redis诞生数年之后的今天,这个项目已经发生了显著的变化:我们现在拥有了一个更为健壮的系统,并且随着Redis 2.6的发布,...

  • 【STM32F429开发板用户手册】第38章 STM32F429的FMC总线应用之是32路高速IO扩展

    最新教程下载:http://www.armbbs.cn/forum.php?mod=viewthread&tid=93255 第38章 STM32F429的FMC总线应用之是32路高速IO扩展 本章教程为大家讲解利用STM32429的FMC总线扩展出32路高速IO,且使用简单,实际项目中也比较有实用价值。 目录 第38章 STM32F429的FMC总线应用之是32路高速IO扩展 38.1 初...

  • Web前端常见跨域解决方案

    一、通过Jsonp跨域 二、document.domain+iframe 跨域 三、location.hash+iframe 跨域 四、window.name+iframe 跨域 五、postMessage 跨域 六、跨域共享资源 (CORS) 七、nginx 代理跨域 八、node js中间件代理跨域 九、WebSocket 协议跨域 1、通过Jsonp跨域: 通常为了减轻Web服务器的负...

  • 为什么要使用Feign

    简介: Feign是一种负载均衡的HTTP客户端, 使用Feign调用API就像调用本地方法一样,从避免了调用目标微服务时,需要不断的解析/封装json 数据的繁琐。Feign集成了Ribbon。Ribbon+eureka是面向微服务编程,而Feign是面向接口编程。 Fegin是一个声明似的web服务客户端,它使得编写web服务客户端变得更加容易。使用Fegin创建一个接口并对它进行注解。...

  • WFC能带来多大的惊喜?WFC能否超越XRP成为下一个传奇?

    XRP在区块链+金融中,是实至名归的佼佼者,在数字货币市场的投资回报率一度逼近了5000%,许久以来众多区块链+金融的公链都声称要挑战XRP支付之王的地位,但目前还没有哪条公链能在市值上与其比肩。而随着WFC的出现越来越多的人将目光投想了WFC和XRP,甚至大多数人看好WFC必然可以稳妥超越未来的XRP。 为何WFC会有如此的潜力被人看好呢?接下来我们就一起分析下WFC的底气何在! 1.项目...

  • 关于微服务的一些了解

    1.什么叫微服务架构 微服务可以在“自己的程序”中运行,并通过“轻量级设备与HTTP型API进行沟通”。关键在于该服务可以在自己的程序中运行。通过这一点我们就可以将服务公开与微服务架构(在现有系统中分布一个API)区分开来。在服务公开中,许多服务都可以被内部独立进程所限制。如果其中任何一个服务需要增加某种功能,那么就必须缩小进程范围。在微服务架构中,只需要在特定的某种服务中增加所需功能,而不...

华为云40多款云服务产品0元试用活动

免费套餐,马上领取!
CSDN

CSDN

中国开发者社区CSDN (Chinese Software Developer Network) 创立于1999年,致力为中国开发者提供知识传播、在线学习、职业发展等全生命周期服务。