Spark
中间结果保存在内存,延迟小
task以线程方式,任务启动快
核心概念
RDD
弹性分布式数据集(只封装计算逻辑,不保存数据)
不可变
可分区
并行计算
spark最基本的数据抽象
这个抽象的数据模型,让使用者可以不必关心底层数据是分布式的
只需关心如何把应用逻辑转化为一系列转换函数,进而实现管道化,
从而避免了存储中间结果,大大降低数据复制、磁盘IO的开销
有很多分片(分布式),可以指定分片数
不存储数据,只是记录数据位置,转换关系(调用什么方法)
惰性执行原因:action时对RDD操作形成DAG进行stage的划分和并行优化
宽窄依赖
区别:
父RDD数据是否进入不同的子RDD
设计原因:
窄依赖分区可以并行,一个分区丢失重新计算对应分区即可
宽依赖(介于两个stage之间),需要等待上一个stage计算完才可以计算
RDD缓存
- persist
- cache(调用了persist)
存储级别
Checkpoint
更可靠的数据持久化
开发经验:
对使用频繁且重要的数据,先持久化,再checkpoint
持久化和checkpoint的区别:
持久化保存在本地磁盘或内存,checkpoint保存在HDFS这类可靠的存储
持久化会在程序结束时被清除,或是手动调用unpersist清除缓存,checkpoint在程序结束后依然存在
累加器和广播变量
RDD转化为dataframe的流程
- 定义Schema(表结构)
- 将RDD映射为Row对象(一行是什么)
- 转换为dataframe(使用createDataFrame方法)
DAG
有向无环图,记录RDD执行流程
边界:
开始—sparkcontext创建的RDD
结束—触发action
一个job一个DAG
构建流程
读取HDFS文件,创建RDD对象
DAGScheduler计算RDD之间的依赖关系
从后往前划分stage
- 遇到宽依赖,断开
- 遇到窄依赖,加入stage
依赖关系形成了DAG
DStream
一连串的batch,操作单元是每个batch里的RDD
spark-submit
官方shell脚本
用于提交任务,为spark作业申请资源
1 | bin/spark-submit |
executor-cores不是指物理核心和线程,只是一个虚拟概念
就算只有300个物理核心,但分配的executor-core总数为400,操作系统会通过时间片轮转等调度算法模拟更多的核心执行,可能会性能下降
application、job、stage、task
初始化一个spark context就是生成一个application
由于shuffle存在,不同stage是不能并行计算的,因为stage的计算需要依赖前面stage的shuffle结果
stage是由一组完全独立的计算任务(task)组成,每个task运算逻辑相同,只不过每个task只会处理自己对应的partition
task
- 被送到某个executor上的工作单元
- 每个task处理一个rdd分区的数据
- 如果每个excutor有2个cpu core、4个task的话,会是先执行2个再执行2个
- 一个线程
为什么划分stage
减少磁盘I/O:比如map操作的输出可以直接作为filter操作的输入,无需将中间结果存储到磁盘
广播变量
创建方式
Broadcast<> 变量名 = sc.broadcast(需要传输的数据)
作用
map join
序列化
算子外的代码在driver运行
算子内的代码在executor运行
算子内经常会用到算子外的数据,需要序列化后传递
方案
一、Java序列化
比较重(字节多)
二、kryo序列化
端口号
当前任务运行情况:4040
历史服务器:18080
yarn任务运行情况:8088
job:4040
代码的组织结构
入口:SparkSession
SparkSession.builder()
数据切分与任务划分
输入分片 — task —组成— RDD
内存管理
spark1.6
- 之前,静态
- 之后,统一
可以使用spark.memory.useLegacyMode
参数启用
一、静态内存管理机制
spark的内存被分为两个区域
存储内存(storage memory)
- 存储计算中间数据、RDD数据
执行内存(execution memory)
- 存储shuffle数据
静态是指spark不会动态调整二者比例,而是让用户预先配置
二、统一内存管理机制
工作组件
driver
驱动器节点
执行main方法
不参与计算
负责
- 将用户程序(application)转化为作业(job)
- 在executor之间调度任务(task)
- 跟踪executor的执行情况
- 通过UI展示运行情况
executor
参与具体计算
集群中工作节点(worker)的一个JVM进程
负责
- 运行具体任务(task),将结果返回给driver
- 通过自身的块管理器(block manager)为RDD提供内存缓存,缓存在executor进程中
executor是一个独立的jvm进程
使用多线程模型可以并发运行多个task,每个task为一个线程
cluster manager
blockmanager
管理内存和磁盘上的数据块
各个节点(executor)都有一个BlockManager实例
工作流程
- spark-submit提交一个spark作业
- 启动一个driver进程,可能在本地,可能在集群上某个节点,这个要看具体的部署模式(deploy-mode),向集群管理器申请该作业需要的资源(executor进程)
- 在各个工作节点上,启动一定数量的executor进程
- driver将作业代码拆分为多个stage,每个stage负责一部分代码,每个stage包含一批并行的有相同的计算逻辑的task,只是处理的数据不同而已,这些task被放到executor中执行,被executor中的cpu core计算
- 一个stage所有task执行完后,结果写入各个节点的本地磁盘文件,作为下一个stage的输入,串行执行
spark在运行一个任务时,yarn会分配对应executor数量的container跑任务,实际是一个jvm进程
且以yarn-cluster模式运行时,会多分配一个container用来跑applicationMaster进程,用于调控spark任务,该进程内存大小以spark.driver.memory控制
Shuffle
一、Hash Shuffle
二、Sort Shuffle
spark调优
基础调优
最简单
增加资源
一、调节最优的资源配置(分配更多的资源)
spark-submit用于给spark作业分配资源
sparkconf设置spark作业具体参数
1 | 调节spark-submit这个shell脚本的参数 |
举例:一次spark任务就要消耗160个cpu core、500g内存
增加num-executor、增加executor-cores =
- 增加并行处理的task个数
如原20个executor,每个2cpu core,就是40个task并行
增加executor-memory =
15. 减少磁盘I/O
- 需要对RDD进行cache时,大内存可以写入更多,也就不需要写入磁盘
- shuffle的reduce端需要进行数据存放和聚合,大内存可以更少写入磁盘或不写入磁盘
16. 减少垃圾回收次数
- task执行可能会创建很多对象,内存较小可能会导致JVM堆频繁存满、频繁垃圾回收
对着调好的资源配置,进行接下来的调优
二、调节并行度
1 | SparkConf conf = new SparkConf().set(""spark.default.parallelism", "500") |
调节并行度实际就是充分利用集群计算资源
并行度,也就是task数量
- 最少:等于总cpu core数量
- 推荐:2~3倍总cpu core数量
因为实际情况,有的task快,有的task慢,要是数量相等,还是会造成一些资源浪费
设置成倍数后,一个task运行完,另一个task能立刻补上来,尽量让cpu core不空闲
大部分情况,资源和并行度到位了,spark作业就很快了,几分钟
三、RDD架构重构及RDD持久化
RDD重复计算带来的问题:HDFS-RDD1-RDD2需要走两遍,从15分钟变30分钟
RDD架构重构:尽量复用RDD,差不多的,可以抽取为一个公共RDD,供反复使用
纯内存方式持久化公共RDD时,可以考虑序列化
将RDD的每个partition数据,序列化成一个大的字节数组,就一个对象,大大减少了内存task占用
缺点是需要反序列化
序列化纯内存后还是OOM内存溢出,那就考虑磁盘+内存的方式,再往上加可以将其序列化
内存资源极度充足时,可以使用持久化的双副本机制提高可靠性
四、广播大的变量
五、改用Kyro序列化
六、使用fastutil优化数据格式
七、调节数据本地化等待时长
JVM调优
minor gc / full gc都会导致[[JVM]]工作线程停止工作
一、降低cache操作的内存占比
二、调节 executor堆外内存、连接等待时长
shuffle调优
spark.sql.shuffle.partitions
分区数
默认200
优化:
小数据集:减少,比如50
大数据集:增加,比如1000或更多,取决于数据量和集群规模
spark.shuffle.file.buffer
缓冲区大小
默认32k
优化:
增大以减少磁盘I/O,如64k
spark.shuffle.io.retryWait
网络延迟等待时间
默认5s
优化:
在高负载或网络抖动时增加,以便有足够的恢复时间
一、合并map端输出文件
开启命令
1 | new SparkConf().set("spark.shuffle.consolidateFiles", "true") |
对比
1000个task,分配给100个executor
每个executor有2个cpu core(每次并行处理2个task),需要处理10个task
每个task处理出2个文件
每次处理产生4个文件,处理5次,共产生20个文件(1个executor)
合并前
二、调节map端内存缓冲大小、reduce端内存占比
map端处理的数据量比较大时,可能出现缓冲数据频繁spill溢写到磁盘文件中(磁盘I/O)
默认缓冲大小32K,每个task处理640KB的数据,就会发生640/32=20次溢写
配置方法
1 | val conf = new SparkConf().set("spark.shuffle.file.buffer", "64") |
三、HashShuffleManager、SortShuffleManager
算子调优
一、使用MapPartitions提升Map操作类性能
由一个元素执行一次function改为一个分区执行一次function
优点:当需要把数据通过JDBC写入时,map需要每个元素创建一个数据库连接,而mapPartitions只需在一个分区中创建一个数据库连接
缺点:
map处理数据时内存不足,可以垃圾回收掉
mapPartitions无法回收内存,可能OOM(内存溢出)
适用情况:
数据量不是特别大(估算RDD数据量、每个partition数据量、分配给每个executor的内存量,资源允许,可以考虑代替)
二、filter后使用coalesce减少分区数量
三、使用foreachPartition优化写数据库性能
同调优方案一
将foreach改为foreachPartition
一次处理一个分区
一个分区的数据只需创建一个数据库连接
只需向数据库发送一次sql语句和多组参数
可能造成OOM(某个分区数据量非常大)
四、使用repartition解决spark sql低并行度的性能问题
五、使用reduceByKey的本地聚合特性优化性能
数据倾斜
本质
数据分布不均匀
某些key对应的数据远高于其他key
一个key对应一个task任务,导致某些task任务非常重
定位方法
Spark Web UI查看当前运行到了第几个stage
看一下当前这个stage各个task分配的数据量,从而确定是否因task分配的数据不均匀导致了数据倾斜
根据stage划分原理,推算出来发生倾斜的stage对应代码中的哪一部分,这部分代码中肯定会有一个[[shuffle|shuffle]]类算子
实例
将session粒度的数据与用户信息数据join
这一步可能会出现数据倾斜,适合用map join替换掉reduce join,也就是将user信息作为广播变量广播出去,然后再利用userId2SessionInfo在调用mapToPair算子的时候取出[[广播变量]]的值进行聚合user信息
可能触发数据倾斜的算子
- ByKey类
- join
解决方案
一、Hive ETL预处理
适用场景
Hive表中的数据本身很不均匀,且业务场景需要频繁对Hive表执行分析操作
本质
将Spark作业的shuffle操作提前到了Hive ETL中
实现思路
在Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join
在Spark作业中针对的数据源就是预处理后的Hive表
那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了
优缺点
简单、效果好
完全规避了数据倾斜,Spark作业性能大幅提升
治标不治本,Hive ETL中还是会发生数据倾斜
二、过滤导致倾斜的key
适用场景
业务需求需要能接受某些数据舍弃掉
比如100万个key,只有两个key数据量达到了10万,其他key对应的都是几十,那就抛弃这两个key
实现思路
从Hive表查数据的时候,直接用where过滤
三、提高shuffle操作的reduce并行度
方案一二不适合,就方案三
调用shuffle类算子时传入并行度参数(提高并行度)
shuffle时会创建指定数量的reduce task
每个reduce task分配到更少的数据
没有从根本上解决数据倾斜,只是尽量减轻shuffle reduce task的压力
实际经验
最理想
减轻后可忽略不计
不太理想
稍微快一点
以前某个task需要5小时,现在需要4小时
以前某个task直接oom,现在不会了,但也很慢,需要5小时
不太理想的话,就换后四种方案
四、使用随机key实现双重聚合
针对groupByKey、reduceByKey
某个task的value太多,比如(1,1)(1,2)(1,3)(1,4)
给key加上随机前缀(map),比如形成(1_1,1)(2_1,2)(1_1,3)(2_1,4)
此时一个大task就被拆开为多个小的task(逻辑上)
进行局部聚合
再使用map去掉key前缀
进行全局聚合
五、将reduce join转换为map join
针对join
六、sample采样倾斜key进行两次join
针对join
七、使用随机数、扩容表进行join
针对join
常见问题
yarn-cluster和yarn-client区别
yarn模式:spark客户端直接连接yarn
driver运行节点有区别
- cluster在yarn,用于生产
- client在本地,用于本地测试,有网络激增问题,本地可以看到log,方便调试
函数和算子的区别✅
函数/方法:本地对象API
算子:分布式对象API
对spark懒执行的理解
如果每个转换算子都要执行,那会导致在没有执行算子的时候,中间结果不会被输出,就相当于是黑盒,可能导致计算资源的浪费(白算了)
比如在饭店点菜,只有付款了厨师才会开始做,要是不付款就做,做完了不要了,那就是浪费
流批一体原理
- 相同的编程模型,实际都是对rdd进行操作
- 相同的api,用的算子几乎一样
- 对流的操作实际为微批操作,不过在2.3后有个structured streaming,实现和flink一样真正的流处理
面对mr的优势与劣势
优势
DAG计算模型(速度快的根本原因)
- 将多个操作合并成一个阶段(stage),某些任务可以在一个物理节点的内存中完成,减少了大量的shuffle和数据落盘的次数
- 比如map或是filter,都是可以直接基于内存计算
可以将反复用到的数据cache到内存中,减少数据加载耗时
支持实时流处理,为企业提供了统一的大数据流批处理平台
劣势
稳定性不如mapreduce
- 大量数据被缓存在内存中,java回收垃圾缓慢的情况严重,导致spark性能不稳定
- mapreduce就算运行慢,但是可以运行完
对技术要求高一点 - 因为基于内存计算,面对超大数据(一次操作10亿以上),没有调优的话,容易出问题,比如OOM内存溢出
spark不能完全替代mapreduce - 实际生产中可能因为内存资源不够导致任务失败,使用mapreduce更好
实战问题
对一天的数据统计每十分钟用户的一个活跃度(活跃量)(可能用到Spark的一些算子)
- 按每十分钟的时间窗口对数据分组
- 计算每个时间窗口内的用户活跃量
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 jaytp@qq.com