Spark 是一种基于内存的快速、通用、可扩展的大数据分析引擎。
Hadoop 的历史
2003,2004 Google 2 篇论文
(1)2011 年发布 1.0 版本
(2)2012 年发布稳定版
·MR 的缺点:
mr 基于数据集的计算,所以面向数据。
① 基本运算规则:从存储介质中获取(采集)数据,然后进行计算,最后将结果存储到介质中,所以主要应用于一次性计算,不适合数据挖掘和机器学习这样迭代计算和图形挖掘计算。
② MR 基于文件存储介质的操作,所以性能非常的慢。
③ MR 和 hadoop 紧密耦合一起,无法动态替换。
(3)2013 年 10月发布 2.X 版本(Yarn)
Spark 历史
2009 年诞生于加州大学伯克利分校 AMPLab,项目采用 Scala 编写。
2010 年开源;
2013 年 6 月成为 Apache 孵化项目
2014 年 2 月成为 Apache 顶级项目。
Spark 基于 hadoop1.X 架构思想,采用自己的方式改善 Hadoop1.X 中的问题。
Spark 计算基于内存(多任务) ,并且基于 Scala 语言开发,所以天生适合迭代计算。
Spark Core
实现了 Spark 的基本功能,包括任务调度、内存管理、错误恢复、与存储系统交互等模块。Spark Core 中还包括对弹性分布数据集(Resilent Distributed DataSet,简称 RDD)的 API 定义。
Spark SQL
是 Spark 用来操作结构化数据的程序包。通过 Spark SQL,我们可以使用 SQL 或者 Apache Hive 版本的 SQL 方言(HQL)来查询数据。Spark SQL 支持多种数据源,比如 Hive 表、Parquet 以及 JSON 等。
Spark Streaming
是 Spark 提供的对实时数据进行流式计算的组件。提供了用来操作数据流的 API,并且与 Spark Core 中的 RDD API 高度对应。
Spark Mlib
提供常见的机器学习(ML)功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据导入等额外支持功能。
集群管理器
Spark 设计可以高效地在一个计算节点到数千个计算节点之间伸缩计算。为了实现这样的要求,同时获得最大灵活性,Spark 支持在各种集群管理器(Cluster Manager)上运行,包括 Hadoop YARN、Apache Mesos,以及 Spark 自带的一个简易调度器,叫作独立调度器。
速度快
与 Hadoop 的 MapReduce 相比,Spark 基于内存的运算要快 100 倍以上,基于硬盘的运算也要快 10 倍以上。Spark 实现了高效的 DAG 执行引擎,可以通过基于内存来高效处理数据流。计算的中间结果是存在于内存中的。
易用
Spark 支持 Java、Python 和 Scala 的 API,还支持超过 80 种高级算法,使用户可以快速构建不同的应用。而且 Spark 支持交互式的 Python 和 Scala 的 Shell,可以方便地在这些 Shell 中使用 Spark 集群来验证解决问题的方法。
通用
Spark 提供了统一的解决方案。Spark 可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark Mlib)和图计算(GraphX)。这些不同类型的处理都可以在同一应用中无缝使用。减少了开发和维护的人力成本和部署平台的物力成本。
兼容性
Spark 可以非常方便地与其他的开源产品进行融合。比如,Spark 可以使用 Hadoop YARN 和 Apache Mesos 作为它的资源管理和调度器,并且可以处理所有 Hadoop 支持的数据,包括 HDFS、HBase 等。这对于已经部署 Hadoop 集群的用户特别重要,因为不需要做任何数据迁移就可以使用 Spark 的强大处理能力。
Spark 的驱动器是执行开发程序中 main 方法的进程。它负责开发人员编写的用来创建 SparkContent、创建 RDD,以及进行 RDD 的转化操作和行动操作代码的执行。当启动 Spark shell 的时候,系统后台自启动了一个 Spark 驱动程序,就是在 Spark shell 中预加载了一个叫作 sc 的 SparkContent 对象。如果驱动器程序终止,那么 Spark 应用也结束了。
Driver 主要负责:
Spark Executor 是一个工作进程,负责在 Spark 作业中运行任务,任务之间相互独立。Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有 Executor 节点发生了故障或崩溃,Spark 应用也可以继续进行,因为会将出错节点上的任务调度到其他 Executor 节点上继续运行。
Executor 主要负责:
Local 模式就是运行在一台计算机上的模式,通常就是用于在本机上练手和测试。
它可以通过以下几种方式设置 Master。
local
所有计算都运行在一个线程当中,没有任何并行计算,通常我们在本机执行一些测试代码,或者练手,就用这种模式;
local[K]
指定使用几个线程来运行计算,比如 local[4 ]就是运行 4 个Worker线程。通常我们的 Cpu 有几个 Core,就指定几个线程,最大化利用 Cpu 的计算能力;
local[*]
这种模式直接帮你按照 Cpu 最多 Cores 来设置线程数了。
tar -zxvf spark-2.3.1-bin-hadoop2.7.tgz -C /hadoop/
mv spark-2.3.1-bin-hadoop2.7/ spark-2.3.1
vim /etc/profile
在文件中添加以下内容:
#SPARK
export SPARK_HOME=/hadoop/spark-2.3.1
export PATH=$PATH:$SPARK_HOME/bin
使配置文件生效:
source /etc/profile
spark-submit \
--class org.apache.spark.examples.SparkPi \
--executor-memory 1G \
--total-executor-cores 2 \
/hadoop/spark-2.3.1/examples/jars/spark-examples_2.11-2.3.1.jar \
100
① 基本语法
spark-submit \
--class <main-class>
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
② 参数说明:
参数 | 说明 |
---|---|
class | 你的应用的启动类 (如 org.apache.spark.examples.SparkPi) |
master | 指定 Master 的地址,默认为 Local |
deploy-mode | 是否发布你的驱动到 worker 节点(cluster)或者作为一个本地客户端 (client) (default: client)* |
conf | 任意的 Spark 配置属性, 格式 key=value. 如果值包含空格,可以加引号 “key=value” |
application-jar | 打包好的应用 jar,包含依赖,这个 URL 在集群中全局可见。 比如 hdfs:// 共享存储系统, 如果是 file:// path, 那么所有的节点的 path 都包含同样的 jar |
application-arguments | 传给 main() 方法的参数 |
executor-memory 1G | 指定每个 executor 可用内存为 1 G |
total-executor-cores 2 | 指定每个 executor 使用的 cup 核数为 2 个 |
mkdir input
hello Word
hello Scala
Hello Spark
spark-shell
查看 spark 状态
① 开启另一个窗口
jps
② 浏览器输入 master:4040 查看程序运行
sc.textFile("/hadoop/spark-2.3.1/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
查看结果
① 控制台
② web 页面
WordCount 案例分析
① 提交任务分析:
② 数据流分析:
sc.textFile("/hadoop/spark-2.3.1/input").flatMap(.split(" ")).map((,1)).reduceByKey(+).collect
textFile("/hadoop/spark-2.3.1/input"):读取本地 input 文件夹数据;
flatMap(_.split(" ")):压平操作,按照空格分割符将一行数据映射成一个个单词;
map((_,1)):对每一个元素操作,将单词映射为元组;
reduceByKey(+):按照key将值进行聚合,相加;
collect:将数据收集到Driver端展示。
构建一个由 Master+Slave 构成的 Spark 集群,Spark 运行在集群中。
修改 spark 的配置文件(/hadoop/spark-2.3.1/conf 目录下)
① 修改配置文件名称
mv slaves.template slaves
mv spark-env.sh.template spark-env.sh
② 配置 slave 文件,添加 work 节点
master
slave1
slave2
③ 修改 spark-env.sh 文件
SPARK_MASTER_HOST=master
SPARK_MASTER_PORT=7077
export JAVA_HOME=/usr/local/java/jdk1.8.0_151
xsync spark-2.3.1/
xsync /etc/profile
source /etc/profile
sbin/start-all.sh
查看
① 查看 jps
② 网页查看:master:8080
sbin/start-all.sh
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://master:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
/hadoop/spark-2.3.1/examples/jars/spark-examples_2.11-2.3.1.jar \
100
spark-shell \
--master spark://master:7077 \
--executor-memory 1g \
--total-executor-cores 2
参数:–master spark://master:7077指定要连接的集群的 master
sc.textFile("/hadoop/spark-2.3.1/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
mv spark-defaults.conf.template spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://hadoop102:9000/directory
hadoop fs -mkdir /directory
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080
-Dspark.history.retainedApplications=30
-Dspark.history.fs.logDirectory=hdfs://master:9000/directory"
xsync spark-defaults.conf
xsync spark-env.sh
sbin/start-history-server.sh
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://master:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
/hadoop/spark-2.3.1/examples/jars/spark-examples_2.11-2.3.1.jar \
100
查看历史服务
master:18080
#SPARK_MASTER_HOST=hadoop102
#SPARK_MASTER_PORT=7077
SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=master,slave1,slave2
-Dspark.deploy.zookeeper.dir=/spark"
xsync spark-env.sh
sbin/start-all.sh
sbin/start-master.sh
spark-shell \
--master spark://master:7077,slave1:7077 \
--executor-memory 2g \
--total-executor-cores 2
Spark 客户端直接连接 Yarn,不需要额外构建 Spark 集群。有 yarn-client 和 yarn-cluster 两种模式,主要区别在于:Driver 程序的运行节点。
① yarn-client
Driver 程序运行在客户端,适用于交互、调试,希望立即看到 app 的输出。
② yarn-cluster
Driver 程序运行在由 RM(ResourceManager)启动的 AP(APPMaster)适用于生产环境。
YARN 运行模式介绍:
<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
xsync yarn-site.xml
# - YARN_CONF_DIR, to point Spark towards YARN configuration files when you use YARN
YARN_CONF_DIR=/hadoop/hadoop-2.7.7/etc/hadoop
xsync spark-env.sh
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
/hadoop/spark-2.3.1/examples/jars/spark-examples_2.11-2.3.1.jar \
100
spark-shell --master yarn
参数:–master spark://master:7077指定要连接的集群的 master
sc.textFile("/hadoop/spark-2.3.1/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
修改配置文件 spark-defaults.conf
添加如下内容:
spark.yarn.historyServer.address=master:18080
spark.history.ui.port=18080
sbin/stop-history-server.sh
sbin/start-history-server.sh
3 提交任务到 Yarn 执行
spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
/hadoop/spark-2.3.1/examples/jars/spark-examples_2.11-2.3.1.jar \
100
模式 | Spark安装机器数 | 需启动的进程 | 所属者 |
---|---|---|---|
Local | 1 | 无 | Spark |
Standalone | 3 | Master 及 Worker | Spark |
Yarn | 1 | Yarn 及 HDFS | Hadoop |
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>
<build>
<finalName>WordCount</finalName>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
创建 scala 源码目录
创建 input 文件夹,并创建 word.txt
hello Scala
hello Spark
hello World
hello Hadoop
hello Scala
package spark
import org.apache.spark.{
SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
//1.创建SparkConf
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
//2.创建SparkContext
val sc = new SparkContext(conf)
//3.使用sc创建RDD并执行相应的transformation和action
// sc.textFile("input").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect()
// 读取文件,将文件的内容一行一行的读出来
val lines = sc.textFile("spark/input")
// 将一行一行的数据分解一个一个的单词
val words = lines.flatMap(_.split(" "))
// 将单词的数据进行结构的转换
val wordToOne = words.map((_, 1))
// 对转换结构后的数据进行分组聚合
val wordToSum = wordToOne.reduceByKey(_ + _)
// 将统计结果采集后打印到控制台
val result = wordToSum.collect()
result.foreach(println)
//4.关闭连接
sc.stop()
}
}
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<archive>
<manifest>
<mainClass>WordCount</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
文章浏览阅读825次。DROP PROCEDURE IF EXISTS `create_appeal`;DELIMITER $$CREATE PROCEDURE `create_appeal`(IN userId INT,IN userName VARCHAR(20),IN historyId VARCHAR(20),IN productId INT,IN tourneyId INT,IN roundId TINY..._mysql8 创建存储过程返回 -1
文章浏览阅读950次。将参数中有关xss攻击的非法字符全部处理掉。_xss过滤器
文章浏览阅读58次。有问题请留言:[email protected]
文章浏览阅读545次。项目运行环境配置:Jdk1.8 + Tomcat7.0 + Mysql + HBuilderX(Webstorm也行)+ Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。项目技术:SSM + mybatis + Maven + Vue 等等组成,B/S模式 + Maven管理等等。环境需要1.运行环境:最好是java jdk 1.8,我们在这个平台上运行的。其他版本理论上也可以。2.IDE环境:IDEA,Eclipse,Myeclipse都可以。_智慧消防 代码货栈
文章浏览阅读6.6k次,点赞11次,收藏79次。1、.netCore的执行过程2、如何在controller中注入service?在Config Services方法中配置这个service在Controller的构造函数中 添加这个依赖注入3、.netCore比.net更具优势的地方是什么?跨平台,可以运行在 Windows 、Linux 和 MAC 系统上对框架本身安装没有依赖,所有依赖都和程序本身在一起.netCore处理请求的效率更高,进而可以处理更多的请求具有更多的安装配置方法4、.netCore主要的特性有哪些?依赖注入_.netcore
文章浏览阅读1k次,点赞25次,收藏27次。如何在Mac上恢复误删除的文件?在日常使用Mac电脑时,无论是工作还是娱乐,我们都会创建和处理大量的文件。然而,有时候可能会不小心删除一些重要的文件,这无疑会给我们带来一些麻烦。那么,要在Mac电脑上恢复误删除的文件,我们可以采取以下4种方法,本文将详细介绍这些方法。_easyrecovery crack
文章浏览阅读971次,点赞15次,收藏16次。在MetaGPT看来,可以将智能体想象成环境中的数字人,其中智能体 = 大语言模型(LLM) + 观察 + 思考 + 行动 + 记忆这个公式概括了智能体的功能本质。举例说明:智能体为某后开发工程师角色那多智能体意味着什么呢如图例说明:你是一个软件公司的老板以上多智能体意味着一句话实现一个小型应用的开发:样例(由GPT-4生成)例如,如果你执行,将会得到包括数据类型&API设计在内的输出生成一个包含分析和设计内容的样例的成本约为(使用GPT-4),而生成一个完整项目的成本约为。在这里插入图片描述。_metagpt调用qwen
文章浏览阅读282次。其次,引入粒子群算法,结合含有抽水蓄能机组的混合发电系统的调峰经济调度模型,提出双层储能容量优化配置的方法。而将粒子群算法与含有抽水蓄能机组的混合发电系统的调峰经济调度模型相结合,可以获得储能的最优容量配置,进一步提高混合发电系统的经济性和效益。通过介绍遗传算法和粒子群算法的原理及其在该问题中的应用,并进行实验验证,展示了双层储能容量优化配置的有效性和优越性。本人亲子编写,可修改,上层用遗传算法出容量配置,下层粒子群算法出运行调度计划,以成本最低得到含抽水蓄能机组的混合发电系统的调峰经济调度模型。
文章浏览阅读5.4k次。此文首发于我的个人博客:Python-PyCharm 报错解决:ImportError: cannot import name ‘InteractiveConsole’ from ‘code’ — zhang0peter的个人博客早上在用PyCharm跑Python代码时遇到报错:Traceback (most recent call last): File "C:\Users\pete..._importerror: cannot import name 'interactiveconsole' from 'code
文章浏览阅读645次。Layui下拉框_layui刷新下拉框
文章浏览阅读806次,点赞16次,收藏20次。通过npm全局安装@vue/cli脚手架 ,@3.10表示下载某个指定版本,如果不写,则默认下载最新版本。如果已经安装了 vue-cli (1.x或者2.x) ,需要先卸载,再重新全局安装vue-cli。(首次使用安装 vue 脚手架可跳过此步骤)_npm 安装脚手架
文章浏览阅读1.2w次。3DES又称Triple DES,是DES加密算法的一种模式,它使用3条56位的密钥对数据进行三次加密。3DES(即Triple DES)是DES向AES过渡的加密算法(1999年,NIST将3-DES指定为过渡的加密标准),加密算法,其具体实现如下:设Ek()和Dk()代表DES算法的加密和解密过程,K代表DES算法使用的密钥,M代表明文,C代表密文,这样:3DES加密过程为:C_3des的加密过程为des加密-