今天给大家带来一篇关于Spark和RDD的博客,由于我也是初学者,所以没法带来那么深刻的东西,但是我希望用我的感性认知带给大家一点灵感,毕竟刚开始学习Spark的时候我对RDD概念、Spark流程是有很多困惑的,我觉得大家也可能存在这种问题。OK,接下来我将从以下几个角度来讲RDD和Spark:1、 Spark简介、对比hadoop、生态,2、 RDD概念
在Spark官网,可以看到官方对Spark的概述:
可以看到官网对Spark的定义就是一个大一统的框架,其中存在做结构化数据处理的组件Spark SQL,有用于机器学习的MLlib组件等等。在我实际学习的过程中可以感觉到组件间的关系就好像积木一样,需要的时候插上即可。
Spark对比hadoop最大的特点就是快,在官网上第一张图就摆出来Spark比hadoop快了百倍,Spark的运算是基于内存的,而hadoop则需要通过HDFS将数据持久化到磁盘,所以显然是快的,但是快多少还是要看实际生产环境吧。
可是除了这点就没了吗?其实还有的,在《大数据基础:Spark工作原理及基础概念》中给大家罗列出来了:
特点 | 说明 |
---|---|
spark 计算速度快 | spark将每个任务构建成DAG进行计算,内部的计算过程通过弹性式分布式数据集RDD在内存在进行计算,相比于hadoop的mapreduce效率提升了100倍。 |
易于使用 | spark 提供了大量的算子,开发只需调用相关api进行实现无法关注底层的实现原理。相较于以前离线任务采用mapreduce实现,实时任务采用storm实现,目前这些都可以通过spark来实现,降低来开发的成本。同时spark 通过spark SQL降低了用户的学习使用门槛,还提供了机器学习,图计算引擎等。 |
支持多种的资源管理模式 | 学习使用中可以采用local 模型进行任务的调试,在正式环境中又提供了standalone,yarn等模式,方便用户选择合适的资源管理模式进行适配。 |
社区支持 | spark 生态圈丰富,迭代更新快,成为大数据领域必备的计算引擎。 |
其实刚刚介绍Spark的时候已经讲了一点了,大家请看图:
这是我找到比较合理的一张图,它把不同的工作内容分层,结构比较清晰。
层 | 说明 |
---|---|
资源调度层 | 因为我们的任务是要提交到集群上运行的,不同的结点有不同的工作,所以需要对计算资源进行调度,而在这一层的资源调度方式就有很多:local模式、StandAlone模式、yarn模式、mesos模式等等。 |
计算层 | 计算层主要使用的是spark-core这个spark的核心库,其面向的是离线的计算,而R、Python这些就是所支持的语言。 |
存储层 | 存储层包括一系列的存储组件,最常见的比如有hadoop-HDFS、MySQL、HBASE、MongoDB、Redis等等,这些均是spark生态可以对接的存储组件,而右边的sparkSQL显然是支持这些数据源的,而下方的MLlib等等显然需要数据的支持。 |
数据流 | 在做实时计算的时候streaming可以对接flume、kafka等组件。 |
这两个话题涉及了很多因素,我感觉这一篇文章还是不可能讲的很清楚,但是我会用我能做到的最朴素的语言给大家感性的讲一讲。同时,我建议大家多做几个小案例来加深认识。
RDD是Spark中最重要的概念,其全称叫做Resilient Distributed Dataset (RDD),即弹性分布式数据集,是一种可容错的、可以被并行操作的元素集合,是Spark中处理所有数据的一种基本抽象。
光是看这一句还是不够的,我在源码中找来注释给大家看一下,我建议大家仔细看下源码的注释:
/**
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
* partitioned collection of elements that can be operated on in parallel. This class contains the
* basic operations available on all RDDs, such as `map`, `filter`, and `persist`.
* 一个弹性分布式数据集(RDD),是Spark里的基本抽象。
* 它代表了可以被并行操作的不可变的分区元素集合。这个类包含了各种RDD都支持的基本操作,比如map、filter、persist等。
*
* In addition,[[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value pairs, such as `groupByKey` and `join`;
* 此外,org.apache.spark.rdd.PairRDDFunctions里还包含了只有键值对(key-value)类型RDD可用的操作,比如groupByKey、join等。
*
* [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of Doubles;
* org.apache.spark.rdd.DoubleRDDFunctions 里包含了只有Double数据类型的RDD可用的操作。
*
* and [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that can be saved as SequenceFiles.
* org.apache.spark.rdd.SequenceFileRDDFunctions 里包含了可以被序列化成文件的RDD所包含的操作。
*
* All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit.
* 所有的操作都可以通过implicit来赋予。
*
* Internally, each RDD is characterized by five main properties:
* 在RDD内部,每一个RDD都由这五个主要特征来描述:
*
* - A list of partitions
* - 一系列分区
*
* - A function for computing each split
* - 对每一个分片做计算的函数
*
* - A list of dependencies on other RDDs
* - 一系列对其他RDD的依赖
*
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - 视情况而定,一个作用于键值对RDD的分区器
*
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
* - 视情况而定, 要计算每个分片的首选位置的列表
*
* All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
* to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
* reading data from a new storage system) by overriding these functions. Please refer to the
* <a href="http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf">Spark paper</a>
* for more details on RDD internals.
* Spark里的所有scheduling和execution都是基于这些方法(通过赋予RDD操作的方式)来实现其自身的计算方式,当然用户可以通过重写方法自定义RDD。
*/
最后注释中还贴心的给出了RDD的提出的论文:《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》。
RDD的操作分为两大类,Transformation、Action。
Transformation是对已有的RDD进行转换(记录下一步操作)然后生成新的RDD,采用的是lazy策略,不会立即计算出结果。
Action是让已有的RDD对数据执行它的操作。
表格来自:大数据之Spark简介及RDD说明
方法(算子) | 说明 |
---|---|
map(func) | 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 |
filter(func) | 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成 |
flatMap(func) | 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) |
mapPartitions(func) | 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U] |
sample(withReplacement, fraction, seed) | 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子 |
union(otherDataset) | 对源RDD和参数RDD求并集后返回一个新的RDD |
intersection(otherDataset) | 对源RDD和参数RDD求交集后返回一个新的RDD |
distinct([numTasks])) | 对源RDD进行去重后返回一个新的RDD |
groupByKey([numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | — |
sortByKey([ascending], [numTasks]) | 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 与sortByKey类似,但是更灵活 |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD |
cartesian(otherDataset) | 笛卡尔积 |
pipe(command, [envVars]) | — |
coalesce(numPartitions) | — |
repartition(numPartitions) | — |
repartitionAndSortWithinPartitions(partitioner) | — |
方法(算子) | 说明 |
---|---|
reduce(func) | 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的 |
collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
count() | 返回RDD的元素个数 |
first() | 返回RDD的第一个元素(类似于take(1)) |
take(n) | 返回一个由数据集的前n个元素组成的数组 |
takeSample(withReplacement,num, [seed]) | 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子 |
takeOrdered(n, [ordering]) | — |
saveAsTextFile(path) | 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) | 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。 |
saveAsObjectFile(path) | — |
countByKey() | 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 |
foreach(func) | 在数据集的每一个元素上,运行函数func进行更新。 |
刚开始我对RDD也是很迷惑,它是在哪里体现了并行化计算的?但是当我真正正正做一个完整的案例时,我才对他有那么一点理解。
大家可以想一个完整的离线计算案例,比如:
我们需要计算美团上外卖的标签,那么我们会有类似以下数据集:
商品ID | 用户ID | 评价(String) |
---|---|---|
109283 | yyyyxxx | 味道还不错,就是有点贵 |
109283 | swssim | 虽然有点贵,但是分量足 |
… | … | … |
109284 | swssim | 好难吃! |
我们的目标是针对商品做标签,依据是商品出现最多的5个评价标签。
请大家注意第3步,我们的程序放到集群中,而集群中显然不止一台worker,即显然不止一个executor,所以我们整个spark集群中每一个executor拿到的只是整个数据集的一部分(第一台拿0 - n-1行,第二台拿n - 2n-1行类似这样),但是我们的操作是写在一份程序里面,如何对不同机器中的数据集做统一的操作呢?
这显然就是RDD的作用,程序提交时会经过cluster manager分配资源、通过driver提交代码到executor,然后经过各种scheduler把程序进行分析,分成多个stage,每一个stage代表了不需要跨机器执行的操作的集合(比如map、filter),而当出现要跨机器操作(比如collect、reduce)时,则会把数据集中到一台机器去操作。
解释1 : 因为每一台机器都知道哪几步本机器不需要依靠别人可以自己做(stage),所以可以先做,不需要看别人脸色,而遇到大家统一的操作时通过网络把数据合并由一台机器做。RDD就是定义这些操作的对象,RDD操作的对象就是分布在不同机器上的同一格式的数据集。
解释2 : 数据集分布在不同机器中,RDD定义了各个机器对这份数据的同一操作(先做什么再做什么)。就好像你安排你的小弟,去不同银行,插入银行卡,输入密码,取5000块钱,然后拿回来,最后给你汇总一样。
Spark毫无疑问是个非常优秀的框架,其中的组件就仿佛积木一般随时插拔。RDD作为Spark的最重要的概念,对Spark整个框架起着至关重要的作用。RDD的操作分为Transoformation和Action两种,其核心理念是定义一个抽象的数据操作,从而方便每个分区针对各自所管理的数据做统一的操作。今天这篇博客可能还有很多没法讲清楚的地方,接下来我会继续把Spark的其他概念、RDD涉及的相关概念更详细的给大家理清楚。
文章浏览阅读2.5w次,点赞6次,收藏50次。官方解释是,docker 容器是机器上的沙盒进程,它与主机上的所有其他进程隔离。所以容器只是操作系统中被隔离开来的一个进程,所谓的容器化,其实也只是对操作系统进行欺骗的一种语法糖。_docker菜鸟教程
文章浏览阅读5.7k次,点赞3次,收藏14次。该如何避免的,今天小编给大家推荐两个下载Windows系统官方软件的资源网站,可以杜绝软件捆绑等行为。该站提供了丰富的Windows官方技术资源,比较重要的有MSDN技术资源文档库、官方工具和资源、应用程序、开发人员工具(Visual Studio 、SQLServer等等)、系统镜像、设计人员工具等。总的来说,这两个都是非常优秀的Windows系统镜像资源站,提供了丰富的Windows系统镜像资源,并且保证了资源的纯净和安全性,有需要的朋友可以去了解一下。这个非常实用的资源网站的创建者是国内的一个网友。_msdn我告诉你
文章浏览阅读1.2k次。vue2封装对话框el-dialog组件_
文章浏览阅读4.7k次,点赞5次,收藏6次。MFC 文本框换行 标签: it mfc 文本框1.将Multiline属性设置为True2.换行是使用"\r\n" (宽字符串为L"\r\n")3.如果需要编辑并且按Enter键换行,还要将 Want Return 设置为 True4.如果需要垂直滚动条的话将Vertical Scroll属性设置为True,需要水平滚动条的话将Horizontal Scroll属性设_c++ mfc同一框内输入二行怎么换行
文章浏览阅读832次。检查Linux是否是否开启所需端口,默认为6379,若未打开,将其开启:以root用户执行iptables -I INPUT -p tcp --dport 6379 -j ACCEPT如果还是未能解决,修改redis.conf,修改主机地址:bind 192.168.85.**;然后使用该配置文件,重新启动Redis服务./redis-server redis.conf..._redis-server doesn't support auth command or ismisconfigured. try
文章浏览阅读4.9k次。济大数电实验报告_数据选择器及其应用
文章浏览阅读236次。1研究内容消费在生产中占据十分重要的地位,是生产的最终目的和动力,是保持省内经济稳定快速发展的核心要素。预测河南省社会消费品零售总额,是进行宏观经济调控和消费体制改变创新的基础,是河南省内人民对美好的全面和谐社会的追求的要求,保持河南省经济稳定和可持续发展具有重要意义。本文建立灰色预测模型,利用MATLAB软件,预测出2019年~2023年河南省社会消费品零售总额预测值分别为21881...._灰色预测模型用什么软件
文章浏览阅读1.2k次。12.4-在Qt中使用Log4Qt输出Log文件,看这一篇就足够了一、为啥要使用第三方Log库,而不用平台自带的Log库二、Log4j系列库的功能介绍与基本概念三、Log4Qt库的基本介绍四、将Log4qt组装成为一个单独模块五、使用配置文件的方式配置Log4Qt六、使用代码的方式配置Log4Qt七、在Qt工程中引入Log4Qt库模块的方法八、获取示例中的源代码一、为啥要使用第三方Log库,而不用平台自带的Log库首先要说明的是,在平时开发和调试中开发平台自带的“打印输出”已经足够了。但_log4qt
文章浏览阅读786次。全局观思维模型,一个教我们由点到线,由线到面,再由面到体,不断的放大格局去思考问题的思维模型。_计算机中对于全局观的
文章浏览阅读330次。一、CountDownLatch介绍CountDownLatch采用减法计算;是一个同步辅助工具类和CyclicBarrier类功能类似,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。二、CountDownLatch俩种应用场景: 场景一:所有线程在等待开始信号(startSignal.await()),主流程发出开始信号通知,既执行startSignal.countDown()方法后;所有线程才开始执行;每个线程执行完发出做完信号,既执行do..._countdownluach于cyclicbarrier的用法
文章浏览阅读508次。Prometheus 算是一个全能型选手,原生支持容器监控,当然监控传统应用也不是吃干饭的,所以就是容器和非容器他都支持,所有的监控系统都具备这个流程,_-自动化监控系统prometheus&grafana实战
文章浏览阅读4.7k次。输入关键字,可以通过键盘的搜索按钮完成搜索功能。_react search