RDD到底是什么?RDD的API_rdd bank-程序员宅基地

技术标签: Spark  分布式  大数据  

RDD到底是什么?RDD的API

大家好,我是W

今天给大家带来一篇关于Spark和RDD的博客,由于我也是初学者,所以没法带来那么深刻的东西,但是我希望用我的感性认知带给大家一点灵感,毕竟刚开始学习Spark的时候我对RDD概念、Spark流程是有很多困惑的,我觉得大家也可能存在这种问题。OK,接下来我将从以下几个角度来讲RDD和Spark:1、 Spark简介、对比hadoop、生态,2、 RDD概念

1、 Spark简介、对比hadoop、生态

1.1 Spark简介

Spark官网,可以看到官方对Spark的概述:

Spark Overview
Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.
Apache的Spark是一个用于大规模数据处理的统一分析引擎。它提供了一系列Java、Scaala、Python的高级API以及优化引擎,所以支持统一的操作。它同样的提供了一系列丰富的高阶工具,包括用于SQL查询、结构化数据处理的Spark SQL,用于机器学习的MLlib库,用于图处理的GraphX库,以及用于增量计算和流处理的Streaming库。

可以看到官网对Spark的定义就是一个大一统的框架,其中存在做结构化数据处理的组件Spark SQL,有用于机器学习的MLlib组件等等。在我实际学习的过程中可以感觉到组件间的关系就好像积木一样,需要的时候插上即可。

1.2 Spark对比Hadoop

Spark对比hadoop最大的特点就是快,在官网上第一张图就摆出来Spark比hadoop快了百倍,Spark的运算是基于内存的,而hadoop则需要通过HDFS将数据持久化到磁盘,所以显然是快的,但是快多少还是要看实际生产环境吧。

可是除了这点就没了吗?其实还有的,在《大数据基础:Spark工作原理及基础概念》中给大家罗列出来了:

特点 说明
spark 计算速度快 spark将每个任务构建成DAG进行计算,内部的计算过程通过弹性式分布式数据集RDD在内存在进行计算,相比于hadoop的mapreduce效率提升了100倍。
易于使用 spark 提供了大量的算子,开发只需调用相关api进行实现无法关注底层的实现原理。相较于以前离线任务采用mapreduce实现,实时任务采用storm实现,目前这些都可以通过spark来实现,降低来开发的成本。同时spark 通过spark SQL降低了用户的学习使用门槛,还提供了机器学习,图计算引擎等。
支持多种的资源管理模式 学习使用中可以采用local 模型进行任务的调试,在正式环境中又提供了standalone,yarn等模式,方便用户选择合适的资源管理模式进行适配。
社区支持 spark 生态圈丰富,迭代更新快,成为大数据领域必备的计算引擎。

1.3 Spark生态圈

其实刚刚介绍Spark的时候已经讲了一点了,大家请看图:

在这里插入图片描述

这是我找到比较合理的一张图,它把不同的工作内容分层,结构比较清晰。

说明
资源调度层 因为我们的任务是要提交到集群上运行的,不同的结点有不同的工作,所以需要对计算资源进行调度,而在这一层的资源调度方式就有很多:local模式、StandAlone模式、yarn模式、mesos模式等等。
计算层 计算层主要使用的是spark-core这个spark的核心库,其面向的是离线的计算,而R、Python这些就是所支持的语言。
存储层 存储层包括一系列的存储组件,最常见的比如有hadoop-HDFS、MySQL、HBASE、MongoDB、Redis等等,这些均是spark生态可以对接的存储组件,而右边的sparkSQL显然是支持这些数据源的,而下方的MLlib等等显然需要数据的支持。
数据流 在做实时计算的时候streaming可以对接flume、kafka等组件。

2、 RDD的概念(RDD到底是什么)、Spark的工作流程

这两个话题涉及了很多因素,我感觉这一篇文章还是不可能讲的很清楚,但是我会用我能做到的最朴素的语言给大家感性的讲一讲。同时,我建议大家多做几个小案例来加深认识。

2.1 RDD的概念

2.1.1 官方的定义

RDD是Spark中最重要的概念,其全称叫做Resilient Distributed Dataset (RDD),即弹性分布式数据集,是一种可容错的、可以被并行操作元素集合,是Spark中处理所有数据的一种基本抽象。

光是看这一句还是不够的,我在源码中找来注释给大家看一下,我建议大家仔细看下源码的注释

/**
 * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
 * partitioned collection of elements that can be operated on in parallel. This class contains the
 * basic operations available on all RDDs, such as `map`, `filter`, and `persist`.
 * 一个弹性分布式数据集(RDD),是Spark里的基本抽象。
 * 它代表了可以被并行操作的不可变的分区元素集合。这个类包含了各种RDD都支持的基本操作,比如map、filter、persist等。
 * 
 * In addition,[[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value pairs, such as `groupByKey` and `join`;
 * 此外,org.apache.spark.rdd.PairRDDFunctions里还包含了只有键值对(key-value)类型RDD可用的操作,比如groupByKey、join等。
 * 
 * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of Doubles; 
 * org.apache.spark.rdd.DoubleRDDFunctions 里包含了只有Double数据类型的RDD可用的操作。
 * 
 * and [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that can be saved as SequenceFiles.
 * org.apache.spark.rdd.SequenceFileRDDFunctions 里包含了可以被序列化成文件的RDD所包含的操作。
 * 
 * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit.
 * 所有的操作都可以通过implicit来赋予。
 * 
 * Internally, each RDD is characterized by five main properties:
 * 在RDD内部,每一个RDD都由这五个主要特征来描述:
 * 
 *  - A list of partitions
 *  - 一系列分区
 *  
 *  - A function for computing each split
 *  - 对每一个分片做计算的函数
 *  
 *  - A list of dependencies on other RDDs
 *  - 一系列对其他RDD的依赖
 *  
 *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
 *  - 视情况而定,一个作用于键值对RDD的分区器
 *  
 *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
 *  - 视情况而定, 要计算每个分片的首选位置的列表
 *
 * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
 * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
 * reading data from a new storage system) by overriding these functions. Please refer to the
 * <a href="http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf">Spark paper</a>
 * for more details on RDD internals.
 * Spark里的所有scheduling和execution都是基于这些方法(通过赋予RDD操作的方式)来实现其自身的计算方式,当然用户可以通过重写方法自定义RDD。
 */

最后注释中还贴心的给出了RDD的提出的论文:《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》

RDD的操作分为两大类,Transformation、Action。

Transformation是对已有的RDD进行转换(记录下一步操作)然后生成新的RDD,采用的是lazy策略,不会立即计算出结果。

Action是让已有的RDD对数据执行它的操作。

表格来自:大数据之Spark简介及RDD说明

Transformation
方法(算子) 说明
map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
cartesian(otherDataset) 笛卡尔积
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)
Action
方法(算子) 说明
reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回RDD的元素个数
first() 返回RDD的第一个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeOrdered(n, [ordering])
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path)
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func) 在数据集的每一个元素上,运行函数func进行更新。
2.1.2 我的感性认识

刚开始我对RDD也是很迷惑,它是在哪里体现了并行化计算的?但是当我真正正正做一个完整的案例时,我才对他有那么一点理解。

大家可以想一个完整的离线计算案例,比如:

我们需要计算美团上外卖的标签,那么我们会有类似以下数据集:

商品ID 用户ID 评价(String)
109283 yyyyxxx 味道还不错,就是有点贵
109283 swssim 虽然有点贵,但是分量足
109284 swssim 好难吃!

我们的目标是针对商品做标签,依据是商品出现最多的5个评价标签。

  • 1、 首先我们通过sparkContext读取数据
  • 2、 因为我们拿到的是评价String,所以做分词,这里假设分词调包成功,评价此时不再是一个长长的话,而是:评价1,评价2
  • 3、 接下来,提取出商品ID,评价
  • 4、 根据商品ID聚类,即groupByKey
  • 5、 对后面标签做操作…

请大家注意第3步,我们的程序放到集群中,而集群中显然不止一台worker,即显然不止一个executor,所以我们整个spark集群中每一个executor拿到的只是整个数据集的一部分(第一台拿0 - n-1行,第二台拿n - 2n-1行类似这样),但是我们的操作是写在一份程序里面,如何对不同机器中的数据集做统一的操作呢?

这显然就是RDD的作用,程序提交时会经过cluster manager分配资源、通过driver提交代码到executor,然后经过各种scheduler把程序进行分析,分成多个stage每一个stage代表了不需要跨机器执行的操作的集合(比如map、filter),而当出现要跨机器操作(比如collect、reduce)时,则会把数据集中到一台机器去操作。

说了那么多,RDD到底是什么呢?

解释1 : 因为每一台机器都知道哪几步本机器不需要依靠别人可以自己做(stage),所以可以先做,不需要看别人脸色,而遇到大家统一的操作时通过网络把数据合并由一台机器做。RDD就是定义这些操作的对象,RDD操作的对象就是分布在不同机器上的同一格式的数据集。

解释2 : 数据集分布在不同机器中,RDD定义了各个机器对这份数据的同一操作(先做什么再做什么)。就好像你安排你的小弟,去不同银行,插入银行卡,输入密码,取5000块钱,然后拿回来,最后给你汇总一样。

参考

总结

Spark毫无疑问是个非常优秀的框架,其中的组件就仿佛积木一般随时插拔。RDD作为Spark的最重要的概念,对Spark整个框架起着至关重要的作用。RDD的操作分为Transoformation和Action两种,其核心理念是定义一个抽象的数据操作,从而方便每个分区针对各自所管理的数据做统一的操作。今天这篇博客可能还有很多没法讲清楚的地方,接下来我会继续把Spark的其他概念、RDD涉及的相关概念更详细的给大家理清楚。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/Alian_W/article/details/109770644

智能推荐

Docker 快速上手学习入门教程_docker菜鸟教程-程序员宅基地

文章浏览阅读2.5w次,点赞6次,收藏50次。官方解释是,docker 容器是机器上的沙盒进程,它与主机上的所有其他进程隔离。所以容器只是操作系统中被隔离开来的一个进程,所谓的容器化,其实也只是对操作系统进行欺骗的一种语法糖。_docker菜鸟教程

电脑技巧:Windows系统原版纯净软件必备的两个网站_msdn我告诉你-程序员宅基地

文章浏览阅读5.7k次,点赞3次,收藏14次。该如何避免的,今天小编给大家推荐两个下载Windows系统官方软件的资源网站,可以杜绝软件捆绑等行为。该站提供了丰富的Windows官方技术资源,比较重要的有MSDN技术资源文档库、官方工具和资源、应用程序、开发人员工具(Visual Studio 、SQLServer等等)、系统镜像、设计人员工具等。总的来说,这两个都是非常优秀的Windows系统镜像资源站,提供了丰富的Windows系统镜像资源,并且保证了资源的纯净和安全性,有需要的朋友可以去了解一下。这个非常实用的资源网站的创建者是国内的一个网友。_msdn我告诉你

vue2封装对话框el-dialog组件_<el-dialog 封装成组件 vue2-程序员宅基地

文章浏览阅读1.2k次。vue2封装对话框el-dialog组件_

MFC 文本框换行_c++ mfc同一框内输入二行怎么换行-程序员宅基地

文章浏览阅读4.7k次,点赞5次,收藏6次。MFC 文本框换行 标签: it mfc 文本框1.将Multiline属性设置为True2.换行是使用"\r\n" (宽字符串为L"\r\n")3.如果需要编辑并且按Enter键换行,还要将 Want Return 设置为 True4.如果需要垂直滚动条的话将Vertical Scroll属性设置为True,需要水平滚动条的话将Horizontal Scroll属性设_c++ mfc同一框内输入二行怎么换行

redis-desktop-manager无法连接redis-server的解决方法_redis-server doesn't support auth command or ismis-程序员宅基地

文章浏览阅读832次。检查Linux是否是否开启所需端口,默认为6379,若未打开,将其开启:以root用户执行iptables -I INPUT -p tcp --dport 6379 -j ACCEPT如果还是未能解决,修改redis.conf,修改主机地址:bind 192.168.85.**;然后使用该配置文件,重新启动Redis服务./redis-server redis.conf..._redis-server doesn't support auth command or ismisconfigured. try

实验四 数据选择器及其应用-程序员宅基地

文章浏览阅读4.9k次。济大数电实验报告_数据选择器及其应用

随便推点

灰色预测模型matlab_MATLAB实战|基于灰色预测河南省社会消费品零售总额预测-程序员宅基地

文章浏览阅读236次。1研究内容消费在生产中占据十分重要的地位,是生产的最终目的和动力,是保持省内经济稳定快速发展的核心要素。预测河南省社会消费品零售总额,是进行宏观经济调控和消费体制改变创新的基础,是河南省内人民对美好的全面和谐社会的追求的要求,保持河南省经济稳定和可持续发展具有重要意义。本文建立灰色预测模型,利用MATLAB软件,预测出2019年~2023年河南省社会消费品零售总额预测值分别为21881...._灰色预测模型用什么软件

log4qt-程序员宅基地

文章浏览阅读1.2k次。12.4-在Qt中使用Log4Qt输出Log文件,看这一篇就足够了一、为啥要使用第三方Log库,而不用平台自带的Log库二、Log4j系列库的功能介绍与基本概念三、Log4Qt库的基本介绍四、将Log4qt组装成为一个单独模块五、使用配置文件的方式配置Log4Qt六、使用代码的方式配置Log4Qt七、在Qt工程中引入Log4Qt库模块的方法八、获取示例中的源代码一、为啥要使用第三方Log库,而不用平台自带的Log库首先要说明的是,在平时开发和调试中开发平台自带的“打印输出”已经足够了。但_log4qt

100种思维模型之全局观思维模型-67_计算机中对于全局观的-程序员宅基地

文章浏览阅读786次。全局观思维模型,一个教我们由点到线,由线到面,再由面到体,不断的放大格局去思考问题的思维模型。_计算机中对于全局观的

线程间控制之CountDownLatch和CyclicBarrier使用介绍_countdownluach于cyclicbarrier的用法-程序员宅基地

文章浏览阅读330次。一、CountDownLatch介绍CountDownLatch采用减法计算;是一个同步辅助工具类和CyclicBarrier类功能类似,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。二、CountDownLatch俩种应用场景: 场景一:所有线程在等待开始信号(startSignal.await()),主流程发出开始信号通知,既执行startSignal.countDown()方法后;所有线程才开始执行;每个线程执行完发出做完信号,既执行do..._countdownluach于cyclicbarrier的用法

自动化监控系统Prometheus&Grafana_-自动化监控系统prometheus&grafana实战-程序员宅基地

文章浏览阅读508次。Prometheus 算是一个全能型选手,原生支持容器监控,当然监控传统应用也不是吃干饭的,所以就是容器和非容器他都支持,所有的监控系统都具备这个流程,_-自动化监控系统prometheus&grafana实战

React 组件封装之 Search 搜索_react search-程序员宅基地

文章浏览阅读4.7k次。输入关键字,可以通过键盘的搜索按钮完成搜索功能。_react search