Typography

活版印字


  • Home
  • Archive
  •  

© 2025 John Doe

Theme Typography by Makito

Proudly published with Hexo

Posted at 2025-06-21

Spark

中间结果保存在内存,延迟小

task以线程方式,任务启动快

核心概念

RDD

弹性分布式数据集(只封装计算逻辑,不保存数据)

  • 不可变

  • 可分区

  • 并行计算

spark最基本的数据抽象

这个抽象的数据模型,让使用者可以不必关心底层数据是分布式的
只需关心如何把应用逻辑转化为一系列转换函数,进而实现管道化,
从而避免了存储中间结果,大大降低数据复制、磁盘IO的开销

有很多分片(分布式),可以指定分片数

不存储数据,只是记录数据位置,转换关系(调用什么方法)

惰性执行原因:action时对RDD操作形成DAG进行stage的划分和并行优化

宽窄依赖

区别:
父RDD数据是否进入不同的子RDD

设计原因:
窄依赖分区可以并行,一个分区丢失重新计算对应分区即可
宽依赖(介于两个stage之间),需要等待上一个stage计算完才可以计算

‍

RDD缓存

  1. persist
  2. cache(调用了persist)

存储级别

image

Checkpoint

更可靠的数据持久化

开发经验:
对使用频繁且重要的数据,先持久化,再checkpoint

持久化和checkpoint的区别:
持久化保存在本地磁盘或内存,checkpoint保存在HDFS这类可靠的存储
持久化会在程序结束时被清除,或是手动调用unpersist清除缓存,checkpoint在程序结束后依然存在

‍

累加器和广播变量

‍

RDD转化为dataframe的流程

  1. 定义Schema(表结构)
  2. 将RDD映射为Row对象(一行是什么)
  3. 转换为dataframe(使用createDataFrame方法)

‍

DAG

有向无环图,记录RDD执行流程

边界:
开始—sparkcontext创建的RDD
结束—触发action

一个job一个DAG

image

构建流程

  1. 读取HDFS文件,创建RDD对象

  2. DAGScheduler计算RDD之间的依赖关系

  3. 从后往前划分stage

    1. 遇到宽依赖,断开
    2. 遇到窄依赖,加入stage
  4. 依赖关系形成了DAG

DStream

一连串的batch,操作单元是每个batch里的RDD

spark-submit

官方shell脚本
用于提交任务,为spark作业申请资源

1
2
3
4
5
6
7
bin/spark-submit
-- master 指定运行模式
-- num-executors 50-100
-- executor-cores 2-4
-- executor-memory 4-8G
-- driver-cores
-- driver-memory 通常不设置,或1G,使用collect算子时注意保证够大

executor-cores不是指物理核心和线程,只是一个虚拟概念
就算只有300个物理核心,但分配的executor-core总数为400,操作系统会通过时间片轮转等调度算法模拟更多的核心执行,可能会性能下降

application、job、stage、task

初始化一个spark context就是生成一个application

由于shuffle存在,不同stage是不能并行计算的,因为stage的计算需要依赖前面stage的shuffle结果

stage是由一组完全独立的计算任务(task)组成,每个task运算逻辑相同,只不过每个task只会处理自己对应的partition

task

  • 被送到某个executor上的工作单元
  • 每个task处理一个rdd分区的数据
  • 如果每个excutor有2个cpu core、4个task的话,会是先执行2个再执行2个
  • 一个线程

为什么划分stage

减少磁盘I/O:比如map操作的输出可以直接作为filter操作的输入,无需将中间结果存储到磁盘

广播变量

创建方式

​Broadcast<> 变量名 = sc.broadcast(需要传输的数据)​

作用

map join

序列化

算子外的代码在driver运行
算子内的代码在executor运行

算子内经常会用到算子外的数据,需要序列化后传递

方案

一、Java序列化
比较重(字节多)

二、kryo序列化

端口号

当前任务运行情况:4040

历史服务器:18080

yarn任务运行情况:8088

job:4040

代码的组织结构

入口:SparkSession

​SparkSession.builder()​

‍

数据切分与任务划分

输入分片 — task —组成— RDD

‍

内存管理

spark1.6

  • 之前,静态
  • 之后,统一

可以使用spark.memory.useLegacyMode​参数启用

一、静态内存管理机制

spark的内存被分为两个区域

  • 存储内存(storage memory)

    • 存储计算中间数据、RDD数据
  • 执行内存(execution memory)

    • 存储shuffle数据

静态是指spark不会动态调整二者比例,而是让用户预先配置

二、统一内存管理机制

‍

工作组件

driver

驱动器节点

执行main方法
不参与计算

负责

  • 将用户程序(application)转化为作业(job)
  • 在executor之间调度任务(task)
  • 跟踪executor的执行情况
  • 通过UI展示运行情况

executor

参与具体计算

集群中工作节点(worker)的一个JVM进程

负责

  • 运行具体任务(task),将结果返回给driver
  • 通过自身的块管理器(block manager)为RDD提供内存缓存,缓存在executor进程中

executor是一个独立的jvm进程
使用多线程模型可以并发运行多个task,每个task为一个线程

cluster manager

blockmanager

管理内存和磁盘上的数据块
各个节点(executor)都有一个BlockManager实例

‍

工作流程

  1. spark-submit提交一个spark作业
  2. 启动一个driver进程,可能在本地,可能在集群上某个节点,这个要看具体的部署模式(deploy-mode),向集群管理器申请该作业需要的资源(executor进程)
  3. 在各个工作节点上,启动一定数量的executor进程
  4. driver将作业代码拆分为多个stage,每个stage负责一部分代码,每个stage包含一批并行的有相同的计算逻辑的task,只是处理的数据不同而已,这些task被放到executor中执行,被executor中的cpu core计算
  5. 一个stage所有task执行完后,结果写入各个节点的本地磁盘文件,作为下一个stage的输入,串行执行

spark在运行一个任务时,yarn会分配对应executor数量的container跑任务,实际是一个jvm进程
且以yarn-cluster模式运行时,会多分配一个container用来跑applicationMaster进程,用于调控spark任务,该进程内存大小以spark.driver.memory控制

‍

Shuffle

一、Hash Shuffle

二、Sort Shuffle

‍

spark调优

基础调优

最简单
增加资源

一、调节最优的资源配置(分配更多的资源)

spark-submit用于给spark作业分配资源
sparkconf设置spark作业具体参数

1
2
3
4
5
调节spark-submit这个shell脚本的参数
- num-executors *executor的数量* 一般是50-100(80)
- executor-cores *executor的cpu core数量* 一般是2-3(2)
- executor-memory *每个executor的内存大小* 一般是6-10(6g)
- driver-memory *driver的内存大小(影响不大)* 一般是1-5 (5g)

举例:一次spark任务就要消耗160个cpu core、500g内存

增加num-executor、增加executor-cores =

  • 增加并行处理的task个数
    如原20个executor,每个2cpu core,就是40个task并行

增加executor-memory =
15. 减少磁盘I/O
- 需要对RDD进行cache时,大内存可以写入更多,也就不需要写入磁盘
- shuffle的reduce端需要进行数据存放和聚合,大内存可以更少写入磁盘或不写入磁盘
16. 减少垃圾回收次数
- task执行可能会创建很多对象,内存较小可能会导致JVM堆频繁存满、频繁垃圾回收

对着调好的资源配置,进行接下来的调优

二、调节并行度

1
SparkConf conf = new SparkConf().set(""spark.default.parallelism", "500")

调节并行度实际就是充分利用集群计算资源

并行度,也就是task数量

  • 最少:等于总cpu core数量
  • 推荐:2~3倍总cpu core数量

因为实际情况,有的task快,有的task慢,要是数量相等,还是会造成一些资源浪费
设置成倍数后,一个task运行完,另一个task能立刻补上来,尽量让cpu core不空闲

大部分情况,资源和并行度到位了,spark作业就很快了,几分钟

三、RDD架构重构及RDD持久化

RDD重复计算带来的问题:HDFS-RDD1-RDD2需要走两遍,从15分钟变30分钟

RDD架构重构:尽量复用RDD,差不多的,可以抽取为一个公共RDD,供反复使用

纯内存方式持久化公共RDD时,可以考虑序列化
将RDD的每个partition数据,序列化成一个大的字节数组,就一个对象,大大减少了内存task占用
缺点是需要反序列化
序列化纯内存后还是OOM内存溢出,那就考虑磁盘+内存的方式,再往上加可以将其序列化
内存资源极度充足时,可以使用持久化的双副本机制提高可靠性

四、广播大的变量

五、改用Kyro序列化

六、使用fastutil优化数据格式

七、调节数据本地化等待时长

JVM调优

minor gc / full gc都会导致[[JVM]]工作线程停止工作

一、降低cache操作的内存占比

二、调节 executor堆外内存、连接等待时长

shuffle调优

spark.sql.shuffle.partitions
分区数
默认200
优化:
小数据集:减少,比如50
大数据集:增加,比如1000或更多,取决于数据量和集群规模

spark.shuffle.file.buffer
缓冲区大小
默认32k
优化:
增大以减少磁盘I/O,如64k

spark.shuffle.io.retryWait
网络延迟等待时间
默认5s
优化:
在高负载或网络抖动时增加,以便有足够的恢复时间

一、合并map端输出文件

开启命令

1
new SparkConf().set("spark.shuffle.consolidateFiles", "true")

对比
1000个task,分配给100个executor
每个executor有2个cpu core(每次并行处理2个task),需要处理10个task
每个task处理出2个文件
每次处理产生4个文件,处理5次,共产生20个文件(1个executor)
合并前

二、调节map端内存缓冲大小、reduce端内存占比

map端处理的数据量比较大时,可能出现缓冲数据频繁spill溢写到磁盘文件中(磁盘I/O)

默认缓冲大小32K,每个task处理640KB的数据,就会发生640/32=20次溢写

配置方法

1
val conf = new SparkConf().set("spark.shuffle.file.buffer", "64")

三、HashShuffleManager、SortShuffleManager

算子调优

一、使用MapPartitions提升Map操作类性能

由一个元素执行一次function改为一个分区执行一次function

优点:当需要把数据通过JDBC写入时,map需要每个元素创建一个数据库连接,而mapPartitions只需在一个分区中创建一个数据库连接

缺点:
map处理数据时内存不足,可以垃圾回收掉
mapPartitions无法回收内存,可能OOM(内存溢出)

适用情况:
数据量不是特别大(估算RDD数据量、每个partition数据量、分配给每个executor的内存量,资源允许,可以考虑代替)

二、filter后使用coalesce减少分区数量

三、使用foreachPartition优化写数据库性能

同调优方案一

将foreach改为foreachPartition
一次处理一个分区
一个分区的数据只需创建一个数据库连接
只需向数据库发送一次sql语句和多组参数

可能造成OOM(某个分区数据量非常大)

四、使用repartition解决spark sql低并行度的性能问题

五、使用reduceByKey的本地聚合特性优化性能

数据倾斜

本质

数据分布不均匀
某些key对应的数据远高于其他key
一个key对应一个task任务,导致某些task任务非常重

定位方法

Spark Web UI查看当前运行到了第几个stage
看一下当前这个stage各个task分配的数据量,从而确定是否因task分配的数据不均匀导致了数据倾斜
根据stage划分原理,推算出来发生倾斜的stage对应代码中的哪一部分,这部分代码中肯定会有一个[[shuffle|shuffle]]类算子

实例

将session粒度的数据与用户信息数据join
这一步可能会出现数据倾斜,适合用map join替换掉reduce join,也就是将user信息作为广播变量广播出去,然后再利用userId2SessionInfo在调用mapToPair算子的时候取出[[广播变量]]的值进行聚合user信息

可能触发数据倾斜的算子

  1. ByKey类
  2. join

解决方案

一、Hive ETL预处理

适用场景
Hive表中的数据本身很不均匀,且业务场景需要频繁对Hive表执行分析操作

本质
将Spark作业的shuffle操作提前到了Hive ETL中

实现思路
在Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join
在Spark作业中针对的数据源就是预处理后的Hive表
那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了

优缺点
简单、效果好
完全规避了数据倾斜,Spark作业性能大幅提升
治标不治本,Hive ETL中还是会发生数据倾斜

二、过滤导致倾斜的key

适用场景
业务需求需要能接受某些数据舍弃掉
比如100万个key,只有两个key数据量达到了10万,其他key对应的都是几十,那就抛弃这两个key
实现思路
从Hive表查数据的时候,直接用where过滤

三、提高shuffle操作的reduce并行度

方案一二不适合,就方案三

调用shuffle类算子时传入并行度参数(提高并行度)
shuffle时会创建指定数量的reduce task
每个reduce task分配到更少的数据

没有从根本上解决数据倾斜,只是尽量减轻shuffle reduce task的压力

实际经验
最理想
减轻后可忽略不计
不太理想
稍微快一点
以前某个task需要5小时,现在需要4小时
以前某个task直接oom,现在不会了,但也很慢,需要5小时

不太理想的话,就换后四种方案

四、使用随机key实现双重聚合

针对groupByKey、reduceByKey

某个task的value太多,比如(1,1)(1,2)(1,3)(1,4)
给key加上随机前缀(map),比如形成(1_1,1)(2_1,2)(1_1,3)(2_1,4)
此时一个大task就被拆开为多个小的task(逻辑上)
进行局部聚合
再使用map去掉key前缀
进行全局聚合

五、将reduce join转换为map join

针对join

六、sample采样倾斜key进行两次join

针对join

七、使用随机数、扩容表进行join

针对join

常见问题

yarn-cluster和yarn-client区别

yarn模式:spark客户端直接连接yarn

driver运行节点有区别

  • cluster在yarn,用于生产
  • client在本地,用于本地测试,有网络激增问题,本地可以看到log,方便调试

函数和算子的区别✅

函数/方法:本地对象API

算子:分布式对象API

对spark懒执行的理解

如果每个转换算子都要执行,那会导致在没有执行算子的时候,中间结果不会被输出,就相当于是黑盒,可能导致计算资源的浪费(白算了)

比如在饭店点菜,只有付款了厨师才会开始做,要是不付款就做,做完了不要了,那就是浪费

流批一体原理

  1. 相同的编程模型,实际都是对rdd进行操作
  2. 相同的api,用的算子几乎一样
  3. 对流的操作实际为微批操作,不过在2.3后有个structured streaming,实现和flink一样真正的流处理

面对mr的优势与劣势

优势

DAG计算模型(速度快的根本原因)

  • 将多个操作合并成一个阶段(stage),某些任务可以在一个物理节点的内存中完成,减少了大量的shuffle和数据落盘的次数
  • 比如map或是filter,都是可以直接基于内存计算

可以将反复用到的数据cache到内存中,减少数据加载耗时

支持实时流处理,为企业提供了统一的大数据流批处理平台

劣势

稳定性不如mapreduce

  • 大量数据被缓存在内存中,java回收垃圾缓慢的情况严重,导致spark性能不稳定
  • mapreduce就算运行慢,但是可以运行完
    对技术要求高一点
  • 因为基于内存计算,面对超大数据(一次操作10亿以上),没有调优的话,容易出问题,比如OOM内存溢出
    spark不能完全替代mapreduce
  • 实际生产中可能因为内存资源不够导致任务失败,使用mapreduce更好

实战问题

对一天的数据统计每十分钟用户的一个活跃度(活跃量)(可能用到Spark的一些算子)

  1. 按每十分钟的时间窗口对数据分组
  2. 计算每个时间窗口内的用户活跃量

Share 

 Previous post: flink原理 Next post: Hello World xusu 

© 2025 John Doe

Theme Typography by Makito

Proudly published with Hexo