1. Spark
  2. 核心概念
    1. RDD
      1. 宽窄依赖
      2. RDD缓存
      3. Checkpoint
      4. 累加器和广播变量
      5. RDD转化为dataframe的流程
    2. DAG
    3. DStream
    4. spark-submit
    5. application、job、stage、task
      1. 为什么划分stage
    6. 广播变量
    7. 序列化
    8. 端口号
  3. 代码的组织结构
  4. 数据切分与任务划分
  5. 内存管理
    1. 一、静态内存管理机制
    2. 二、统一内存管理机制
  6. 工作组件
    1. driver
    2. executor
    3. cluster manager
    4. blockmanager
  7. 工作流程
  8. Shuffle
  9. spark调优
    1. 基础调优
      1. 一、调节最优的资源配置(分配更多的资源)
      2. 二、调节并行度
      3. 三、RDD架构重构及RDD持久化
      4. 四、广播大的变量
      5. 五、改用Kyro序列化
      6. 六、使用fastutil优化数据格式
      7. 七、调节数据本地化等待时长
    2. JVM调优
      1. 一、降低cache操作的内存占比
      2. 二、调节 executor堆外内存、连接等待时长
    3. shuffle调优
      1. 一、合并map端输出文件
      2. 二、调节map端内存缓冲大小、reduce端内存占比
      3. 三、HashShuffleManager、SortShuffleManager
    4. 算子调优
      1. 一、使用MapPartitions提升Map操作类性能
      2. 二、filter后使用coalesce减少分区数量
      3. 三、使用foreachPartition优化写数据库性能
      4. 四、使用repartition解决spark sql低并行度的性能问题
      5. 五、使用reduceByKey的本地聚合特性优化性能
  • 数据倾斜
    1. 本质
    2. 定位方法
    3. 实例
    4. 可能触发数据倾斜的算子
    5. 解决方案
      1. 一、Hive ETL预处理
      2. 二、过滤导致倾斜的key
      3. 三、提高shuffle操作的reduce并行度
      4. 四、使用随机key实现双重聚合
      5. 五、将reduce join转换为map join
      6. 六、sample采样倾斜key进行两次join
      7. 七、使用随机数、扩容表进行join
  • 常见问题
    1. yarn-cluster和yarn-client区别
    2. 函数和算子的区别✅
    3. 对spark懒执行的理解
    4. 流批一体原理
    5. 面对mr的优势与劣势
    6. 实战问题
  • Spark

    中间结果保存在内存,延迟小

    task以线程方式,任务启动快

    核心概念

    RDD

    弹性分布式数据集(只封装计算逻辑,不保存数据)

    • 不可变

    • 可分区

    • 并行计算

    spark最基本的数据抽象

    这个抽象的数据模型,让使用者可以不必关心底层数据是分布式的
    只需关心如何把应用逻辑转化为一系列转换函数,进而实现管道化,
    从而避免了存储中间结果,大大降低数据复制、磁盘IO的开销

    有很多分片(分布式),可以指定分片数

    不存储数据,只是记录数据位置,转换关系(调用什么方法)

    惰性执行原因:action时对RDD操作形成DAG进行stage的划分和并行优化

    宽窄依赖

    区别:
    父RDD数据是否进入不同的子RDD

    设计原因:
    窄依赖分区可以并行,一个分区丢失重新计算对应分区即可
    宽依赖(介于两个stage之间),需要等待上一个stage计算完才可以计算

    RDD缓存

    1. persist
    2. cache(调用了persist)

    存储级别

    image

    Checkpoint

    更可靠的数据持久化

    开发经验:
    对使用频繁且重要的数据,先持久化,再checkpoint

    持久化和checkpoint的区别:
    持久化保存在本地磁盘或内存,checkpoint保存在HDFS这类可靠的存储
    持久化会在程序结束时被清除,或是手动调用unpersist清除缓存,checkpoint在程序结束后依然存在

    累加器和广播变量

    RDD转化为dataframe的流程

    1. 定义Schema(表结构)
    2. 将RDD映射为Row对象(一行是什么)
    3. 转换为dataframe(使用createDataFrame方法)

    DAG

    有向无环图,记录RDD执行流程

    边界:
    开始—sparkcontext创建的RDD
    结束—触发action

    一个job一个DAG

    image

    构建流程

    1. 读取HDFS文件,创建RDD对象

    2. DAGScheduler计算RDD之间的依赖关系

    3. 从后往前划分stage

      1. 遇到宽依赖,断开
      2. 遇到窄依赖,加入stage
    4. 依赖关系形成了DAG

    DStream

    一连串的batch,操作单元是每个batch里的RDD

    spark-submit

    官方shell脚本
    用于提交任务,为spark作业申请资源

    1
    2
    3
    4
    5
    6
    7
    bin/spark-submit
    -- master 指定运行模式
    -- num-executors 50-100
    -- executor-cores 2-4
    -- executor-memory 4-8G
    -- driver-cores
    -- driver-memory 通常不设置,或1G,使用collect算子时注意保证够大

    executor-cores不是指物理核心和线程,只是一个虚拟概念
    就算只有300个物理核心,但分配的executor-core总数为400,操作系统会通过时间片轮转等调度算法模拟更多的核心执行,可能会性能下降

    application、job、stage、task

    初始化一个spark context就是生成一个application

    由于shuffle存在,不同stage是不能并行计算的,因为stage的计算需要依赖前面stage的shuffle结果

    stage是由一组完全独立的计算任务(task)组成,每个task运算逻辑相同,只不过每个task只会处理自己对应的partition

    task

    • 被送到某个executor上的工作单元
    • 每个task处理一个rdd分区的数据
    • 如果每个excutor有2个cpu core、4个task的话,会是先执行2个再执行2个
    • 一个线程

    为什么划分stage

    减少磁盘I/O:比如map操作的输出可以直接作为filter操作的输入,无需将中间结果存储到磁盘

    广播变量

    创建方式

    Broadcast<> 变量名 = sc.broadcast(需要传输的数据)

    作用

    map join

    序列化

    算子外的代码在driver运行
    算子内的代码在executor运行

    算子内经常会用到算子外的数据,需要序列化后传递

    方案

    一、Java序列化
    比较重(字节多)

    二、kryo序列化

    端口号

    当前任务运行情况:4040

    历史服务器:18080

    yarn任务运行情况:8088

    job:4040

    代码的组织结构

    入口:SparkSession

    SparkSession.builder()

    数据切分与任务划分

    输入分片 — task —组成— RDD

    内存管理

    spark1.6

    • 之前,静态
    • 之后,统一

    可以使用spark.memory.useLegacyMode​参数启用

    一、静态内存管理机制

    spark的内存被分为两个区域

    • 存储内存(storage memory)

      • 存储计算中间数据、RDD数据
    • 执行内存(execution memory)

      • 存储shuffle数据

    静态是指spark不会动态调整二者比例,而是让用户预先配置

    二、统一内存管理机制

    工作组件

    driver

    驱动器节点

    执行main方法
    不参与计算

    负责

    • 将用户程序(application)转化为作业(job)
    • 在executor之间调度任务(task)
    • 跟踪executor的执行情况
    • 通过UI展示运行情况

    executor

    参与具体计算

    集群中工作节点(worker)的一个JVM进程

    负责

    • 运行具体任务(task),将结果返回给driver
    • 通过自身的块管理器(block manager)为RDD提供内存缓存,缓存在executor进程中

    executor是一个独立的jvm进程
    使用多线程模型可以并发运行多个task,每个task为一个线程

    cluster manager

    blockmanager

    管理内存和磁盘上的数据块
    各个节点(executor)都有一个BlockManager实例

    工作流程

    1. spark-submit提交一个spark作业
    2. 启动一个driver进程,可能在本地,可能在集群上某个节点,这个要看具体的部署模式(deploy-mode),向集群管理器申请该作业需要的资源(executor进程)
    3. 在各个工作节点上,启动一定数量的executor进程
    4. driver将作业代码拆分为多个stage,每个stage负责一部分代码,每个stage包含一批并行的有相同的计算逻辑的task,只是处理的数据不同而已,这些task被放到executor中执行,被executor中的cpu core计算
    5. 一个stage所有task执行完后,结果写入各个节点的本地磁盘文件,作为下一个stage的输入,串行执行

    spark在运行一个任务时,yarn会分配对应executor数量的container跑任务,实际是一个jvm进程
    且以yarn-cluster模式运行时,会多分配一个container用来跑applicationMaster进程,用于调控spark任务,该进程内存大小以spark.driver.memory控制

    Shuffle

    一、Hash Shuffle

    二、Sort Shuffle

    spark调优

    基础调优

    最简单
    增加资源

    一、调节最优的资源配置(分配更多的资源)

    spark-submit用于给spark作业分配资源
    sparkconf设置spark作业具体参数

    1
    2
    3
    4
    5
    调节spark-submit这个shell脚本的参数
    - num-executors *executor的数量* 一般是50-100(80)
    - executor-cores *executor的cpu core数量* 一般是2-3(2)
    - executor-memory *每个executor的内存大小* 一般是6-10(6g)
    - driver-memory *driver的内存大小(影响不大)* 一般是1-5 (5g)

    举例:一次spark任务就要消耗160个cpu core、500g内存

    增加num-executor、增加executor-cores =

    • 增加并行处理的task个数
      如原20个executor,每个2cpu core,就是40个task并行

    增加executor-memory =
    15. 减少磁盘I/O
    - 需要对RDD进行cache时,大内存可以写入更多,也就不需要写入磁盘
    - shuffle的reduce端需要进行数据存放和聚合,大内存可以更少写入磁盘或不写入磁盘
    16. 减少垃圾回收次数
    - task执行可能会创建很多对象,内存较小可能会导致JVM堆频繁存满、频繁垃圾回收

    对着调好的资源配置,进行接下来的调优

    二、调节并行度

    1
    SparkConf conf = new SparkConf().set(""spark.default.parallelism", "500")

    调节并行度实际就是充分利用集群计算资源

    并行度,也就是task数量

    • 最少:等于总cpu core数量
    • 推荐:2~3倍总cpu core数量

    因为实际情况,有的task快,有的task慢,要是数量相等,还是会造成一些资源浪费
    设置成倍数后,一个task运行完,另一个task能立刻补上来,尽量让cpu core不空闲

    大部分情况,资源和并行度到位了,spark作业就很快了,几分钟

    三、RDD架构重构及RDD持久化

    RDD重复计算带来的问题:HDFS-RDD1-RDD2需要走两遍,从15分钟变30分钟

    RDD架构重构:尽量复用RDD,差不多的,可以抽取为一个公共RDD,供反复使用

    纯内存方式持久化公共RDD时,可以考虑序列化
    将RDD的每个partition数据,序列化成一个大的字节数组,就一个对象,大大减少了内存task占用
    缺点是需要反序列化
    序列化纯内存后还是OOM内存溢出,那就考虑磁盘+内存的方式,再往上加可以将其序列化
    内存资源极度充足时,可以使用持久化的双副本机制提高可靠性

    四、广播大的变量

    五、改用Kyro序列化

    六、使用fastutil优化数据格式

    七、调节数据本地化等待时长

    JVM调优

    minor gc / full gc都会导致[[JVM]]工作线程停止工作

    一、降低cache操作的内存占比

    二、调节 executor堆外内存、连接等待时长

    shuffle调优

    spark.sql.shuffle.partitions
    分区数
    默认200
    优化:
    小数据集:减少,比如50
    大数据集:增加,比如1000或更多,取决于数据量和集群规模

    spark.shuffle.file.buffer
    缓冲区大小
    默认32k
    优化:
    增大以减少磁盘I/O,如64k

    spark.shuffle.io.retryWait
    网络延迟等待时间
    默认5s
    优化:
    在高负载或网络抖动时增加,以便有足够的恢复时间

    一、合并map端输出文件

    开启命令

    1
    new SparkConf().set("spark.shuffle.consolidateFiles", "true")

    对比
    1000个task,分配给100个executor
    每个executor有2个cpu core(每次并行处理2个task),需要处理10个task
    每个task处理出2个文件
    每次处理产生4个文件,处理5次,共产生20个文件(1个executor)
    合并前

    二、调节map端内存缓冲大小、reduce端内存占比

    map端处理的数据量比较大时,可能出现缓冲数据频繁spill溢写到磁盘文件中(磁盘I/O)

    默认缓冲大小32K,每个task处理640KB的数据,就会发生640/32=20次溢写

    配置方法

    1
    val conf = new SparkConf().set("spark.shuffle.file.buffer", "64")

    三、HashShuffleManager、SortShuffleManager

    算子调优

    一、使用MapPartitions提升Map操作类性能

    由一个元素执行一次function改为一个分区执行一次function

    优点:当需要把数据通过JDBC写入时,map需要每个元素创建一个数据库连接,而mapPartitions只需在一个分区中创建一个数据库连接

    缺点:
    map处理数据时内存不足,可以垃圾回收掉
    mapPartitions无法回收内存,可能OOM(内存溢出)

    适用情况:
    数据量不是特别大(估算RDD数据量、每个partition数据量、分配给每个executor的内存量,资源允许,可以考虑代替)

    二、filter后使用coalesce减少分区数量

    三、使用foreachPartition优化写数据库性能

    同调优方案一

    将foreach改为foreachPartition
    一次处理一个分区
    一个分区的数据只需创建一个数据库连接
    只需向数据库发送一次sql语句和多组参数

    可能造成OOM(某个分区数据量非常大)

    四、使用repartition解决spark sql低并行度的性能问题

    五、使用reduceByKey的本地聚合特性优化性能

    数据倾斜

    本质

    数据分布不均匀
    某些key对应的数据远高于其他key
    一个key对应一个task任务,导致某些task任务非常重

    定位方法

    Spark Web UI查看当前运行到了第几个stage
    看一下当前这个stage各个task分配的数据量,从而确定是否因task分配的数据不均匀导致了数据倾斜
    根据stage划分原理,推算出来发生倾斜的stage对应代码中的哪一部分,这部分代码中肯定会有一个[[shuffle|shuffle]]类算子

    实例

    将session粒度的数据与用户信息数据join
    这一步可能会出现数据倾斜,适合用map join替换掉reduce join,也就是将user信息作为广播变量广播出去,然后再利用userId2SessionInfo在调用mapToPair算子的时候取出[[广播变量]]的值进行聚合user信息

    可能触发数据倾斜的算子

    1. ByKey类
    2. join

    解决方案

    一、Hive ETL预处理

    适用场景
    Hive表中的数据本身很不均匀,且业务场景需要频繁对Hive表执行分析操作

    本质
    将Spark作业的shuffle操作提前到了Hive ETL中

    实现思路
    在Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join
    在Spark作业中针对的数据源就是预处理后的Hive表
    那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了

    优缺点
    简单、效果好
    完全规避了数据倾斜,Spark作业性能大幅提升
    治标不治本,Hive ETL中还是会发生数据倾斜

    二、过滤导致倾斜的key

    适用场景
    业务需求需要能接受某些数据舍弃掉
    比如100万个key,只有两个key数据量达到了10万,其他key对应的都是几十,那就抛弃这两个key
    实现思路
    从Hive表查数据的时候,直接用where过滤

    三、提高shuffle操作的reduce并行度

    方案一二不适合,就方案三

    调用shuffle类算子时传入并行度参数(提高并行度)
    shuffle时会创建指定数量的reduce task
    每个reduce task分配到更少的数据

    没有从根本上解决数据倾斜,只是尽量减轻shuffle reduce task的压力

    实际经验
    最理想
    减轻后可忽略不计
    不太理想
    稍微快一点
    以前某个task需要5小时,现在需要4小时
    以前某个task直接oom,现在不会了,但也很慢,需要5小时

    不太理想的话,就换后四种方案

    四、使用随机key实现双重聚合

    针对groupByKey、reduceByKey

    某个task的value太多,比如(1,1)(1,2)(1,3)(1,4)
    给key加上随机前缀(map),比如形成(1_1,1)(2_1,2)(1_1,3)(2_1,4)
    此时一个大task就被拆开为多个小的task(逻辑上)
    进行局部聚合
    再使用map去掉key前缀
    进行全局聚合

    五、将reduce join转换为map join

    针对join

    六、sample采样倾斜key进行两次join

    针对join

    七、使用随机数、扩容表进行join

    针对join

    常见问题

    yarn-cluster和yarn-client区别

    yarn模式:spark客户端直接连接yarn

    driver运行节点有区别

    • cluster在yarn,用于生产
    • client在本地,用于本地测试,有网络激增问题,本地可以看到log,方便调试

    函数和算子的区别✅

    函数/方法:本地对象API

    算子:分布式对象API

    对spark懒执行的理解

    如果每个转换算子都要执行,那会导致在没有执行算子的时候,中间结果不会被输出,就相当于是黑盒,可能导致计算资源的浪费(白算了)

    比如在饭店点菜,只有付款了厨师才会开始做,要是不付款就做,做完了不要了,那就是浪费

    流批一体原理

    1. 相同的编程模型,实际都是对rdd进行操作
    2. 相同的api,用的算子几乎一样
    3. 对流的操作实际为微批操作,不过在2.3后有个structured streaming,实现和flink一样真正的流处理

    面对mr的优势与劣势

    优势

    DAG计算模型(速度快的根本原因)

    • 将多个操作合并成一个阶段(stage),某些任务可以在一个物理节点的内存中完成,减少了大量的shuffle和数据落盘的次数
    • 比如map或是filter,都是可以直接基于内存计算

    可以将反复用到的数据cache到内存中,减少数据加载耗时

    支持实时流处理,为企业提供了统一的大数据流批处理平台

    劣势

    稳定性不如mapreduce

    • 大量数据被缓存在内存中,java回收垃圾缓慢的情况严重,导致spark性能不稳定
    • mapreduce就算运行慢,但是可以运行完
      对技术要求高一点
    • 因为基于内存计算,面对超大数据(一次操作10亿以上),没有调优的话,容易出问题,比如OOM内存溢出
      spark不能完全替代mapreduce
    • 实际生产中可能因为内存资源不够导致任务失败,使用mapreduce更好

    实战问题

    对一天的数据统计每十分钟用户的一个活跃度(活跃量)(可能用到Spark的一些算子)

    1. 按每十分钟的时间窗口对数据分组
    2. 计算每个时间窗口内的用户活跃量

    转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 jaytp@qq.com

    💰

    ×

    Help us with donation