Spark必会知识点

机器学习  收藏
0 / 284
  • Spark Core 知识点
    • 1)、Spark 作业提交流程?
    • 2)、Spark 的内存模型?
    • 3)、SparkContext 创建流程?源码级别?
    • 4)、简述 Spark 个版本区别?1.x 与 2.x?
    • 5)、使用 Spark 中遇到过哪些问题?如何解决的?
    • 6)、Spark 的 Shuffle 过程? 和 MR Shuffle 区别?
    • 7)、Spark 中的数据倾斜问题有啥好的解决方案?
    • 8)、Spark 有哪些聚合类的算子,我们应该怎么避免使用这些算子?ReduceByKey 和 GroupByKey 的区别?
    • 9)、Spark On Yarn 作业执行流程?yarn-client 和 yarn-cluster 的区别?
    • 10)、Spark 中 Job、Task、RDD、DAG、Stage 的理解?
    • 11)、Spark 中 RDD 如何通过记录更新的方式容错?
    • 12)、Spark 常用调优方法?
    • 13)、Spark 中宽依赖和窄依赖如何理解?
    • 14)、Spark 中 Job 和 Task 如何理解?
    • 15)、Spark 中 Transformation 和 Action 区别是什么?列举出常用的方法?
    • 16)、Spark 中 persist()和 cache()的区别?
    • 17)、Spark 中 map 和 mapPartitions 的区别?
    • 18)、Spark 中 Worker 和 Executor 的异同?
    • 19)、Spark 中提供的 2 中共享变量是啥?
    • 20)、菲波那切数列可以用 Spark 做出来么?
    • 21)、看过哪些 Spark 源码?
    • 22)、Spark 通信机制?
    • 23)、Spark 的存储级别有哪些?
    • 24)、Spark 序列化模式有哪些?
    • 25)、Spark 使用到的安全协议有哪些?
    • 26)、Spark 部署模式有哪些?
    • 27)、Spark 的 cache 后能不能接其它算子?是不是 action 操作?
    • 28)、Spark 中 reduceByKey 是 action 算子不?reduce 呢?
    • 29)、Spark 中数据本地性是哪个阶段确定的?
    • 30)、Spark 中 RDD 的弹性提现在哪里?
    • 31)、Spark 中容错机制?
    • 32)、Spark 中 RDD 的缺陷?
    • 33)、Spark 中有哪些聚合类的算子?应该避免什么类型的算子?
    • 34)、Spark 中并行度怎么设置比较合理一些?
    • 35)、Spark 中数据的位置由谁来管理?
    • 36)、Spark 中数据本地性有哪几种?
    • 37)、Spark 如何处理不被序列化的数据?
    • 38)、Spark 中 collect 功能是啥?其底层是如何实现的?
    • 39)、Spark 作业在没有获得足够资源就开始启动了,可能会导致什么问题?
    • 40)、Spark 中 map 和 flatMap 有啥区别?
    • 41)、介绍一下 join 操作优化经验?
    • 42)、Spark 有哪些组件?
    • 43)、Spark 的工作机制?
    • 44)、Spark 中的宽窄依赖?
    • 45)、Spark 如何划分 stage?
    • 46)、spark-submit 时候如何引用外部的 jar 包?
    • 47)、Spark 中 RDD 有哪些特性?
    • 48)、Spark 的一个工作流程?
    • 49)、Spark on yarn 与 standalone 区别?
    • 50)、Spark 优化之内存管理?
    • 51)、Spark 优化之广播变量?
    • 52)、Spark 优化之数据本地性?
    • 53)、Spark 中 task 有几种类型?
    • 54)、Spark 中基本概念?
    • 55)、Spark 中的 ShuffleManager?
    • 56)、Spark 中 Shuffle 时候数据一定会落磁盘么?
    • 57)、Spark 和 MR 中 Shuffle 不同?Spark 的优势?
    • 58)、Spark 如何做 checkpoint?
    • 59)、Spark 比 MR 速度快的原因?
    • 60)、Spark distinct 去重原理?
    • 61)、Spark cache 和 checkpoint 区别?
1、Spark 作业提交流程?

Spark 作业提交流程:

  1. 将我们编写的 Spark 程序打包成 jar 包。
  2. 使用 spark-submit 脚本将任务提交到集群中运行。
  3. 运行 SparkSubmit 的 main 方法,通过反射的方式创建我们编写主类的实例对象,然后调用主类的 main 方法开始执行我们编写的代码。
  4. 当代码运行到 SparkContext 时,就开始初始化 SparkContext。
  5. 在初始化 SparkContext 时候会创建 DAGScheduler 和 TaskScheduler。
  6. spark-submit 提交代码,执行 new SparkContext(),在 SparkContext 里构造 DAGScheduler 和 TaskScheduler。
  7. TaskScheduler 会通过后台的一个进程,连接 Master,向 Master 注册 Application。
  8. Master 接收到 Application 请求后,会使用相应的资源调度算法,在 Worker 上为这个 Application 启动多个 Executer。
  9. Executor 启动后,会自己反向注册到 TaskScheduler 中。 所有 Executor 都注册到 Driver 上之后,SparkContext 结束初始化,接下来往下执行我们自己的代码。
  10. 每执行到一个 Action,就会创建一个 Job。Job 会提交给 DAGScheduler。
  11. DAGScheduler 会将 Job 划分为多个 stage,然后每个 stage 创建一个 TaskSet。
  12. TaskScheduler 会把每一个 TaskSet 里的 Task,提交到 Executor 上执行。
  13. Executor 上有线程池,每接收到一个 Task,就用 TaskRunner 封装,然后从线程池里取出一个线程执行这个 task。
    (TaskRunner 将我们编写的代码,拷贝,反序列化,执行 Task,每个 Task 执行 RDD 里的一个 partition)
2、Spark 的内存模型?

Spark 集群在提交应用程序时候会创建 Driver 和 Execotor 两种 JVM 进程。

Driver 内存:Driver 作为主控进程,负责创建 Spark 作业的上下文,将提交的作业 Job 转化为计算任务 Task,分发到 Executor 进程中进行执行。

Execotor 内存:在工作节点上执行具体的计算任务,并将结果返回给 Driver,同时提供 RDD 的持久化机制。

堆内内存划分:

指的是 JVM 堆内存大小,在 Spark 应用程序启动时通过 spark.executor.memory 参数配置。Executor 内运行的并发任务共享 JVM 堆内内存。堆内内存大致可以分为如图 2 所示以下 4 个部分

2.1.1、执行 Execution 内存

用于存储 Spark task 执行过程中需要的对象,如 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据。

2.1.2、存储 Storage 内存

主要用于存储 Spark 的 cache 数据,例如 RDD 的 cache,Broadcast 变量,Unroll 数据等。需要注意的是,unrolled 的数据如果内存不够,会存储在 driver 端。

RDD 在缓存到存储内存之后,Partition 被转换成 Block,Record 在堆内或堆外存储内存中占用一块连续的空间。将 Partition 由不连续的存储空间转换为连续存储空间的过程,Spark 称之为"展开"(Unroll)。

2.1.3、用户内存 User Memory

分配 Spark Memory 剩余的内存,用户可以根据需要使用。例如 RDD 依赖等信息。

2.1.4、预留内存 Reserved Memory

这部分内存是预留给系统使用,是固定不变的。在 1.6.0 默认为 300MB(RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024),不过在 2.4.4 版本中已经看不到这个参数了。

系统预留内存,会用来存储 Spark 内部对象。

堆外内存划分

这里 Off-heap Memory 从概念上可以分为两个:

1).Executor JVM 内的 off-heap memory(*),主要用于 JVM 自身,字符串, NIO Buffer 等开销,可以通过 spark.executor.memoryOverhead 参数进行配置,

大小一般设为 executorMemory * 0.10, with minimum of 384。

2).为了进一步优化内存的使用以及提高 Shuffle 时的排序的效率,Spark 引入了堆外(Off-heap)内存,直接在工作节点的系统内存中开辟的空间,存储经过序列化的二进制数据。

Spark 可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间可以被精确计算,

所以相比堆内内存来说降低了管理的难度,也降低了误差。

Spark 为存储内存和执行内存的管理提供了统一的接口——MemoryManager,同一个 Executor 内的任务都调用这个接口的方法来申请或释放内存。

MemoryManager 的具体实现上,Spark 1.6 之后默认为统一管理(Unified Memory Manager)方式,

1.6 之前采用的静态管理(Static Memory Manager)方式仍被保留,可通过配置 spark.memory.useLegacyMode 参数启用。

两种方式的区别在于对空间分配的方式。

Spark 1.6 之后引入的统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域

其中最重要的优化在于动态占用机制,其规则如下:

1.设定基本的 Storage 内存和 Execution 内区域(spark.storage.storageFraction 参数),该设定确定了双方各自拥有的空间的范围

2.双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)

3.Execution 内存的空间被对方占用后,可让对方将占用的部分转存到硬盘

4.Storage 内存的空间被对方占用后,无法让对方"归还",多余的 Storage 内存被转存到硬盘

3、SparkContext 创建流程?源码级别?

SparkContext 初始化,包括事件总线(LiveListenerBus),UI 界面,心跳,JobProgressListener,资源动态分配(ExecutorAllocationManager)等等

初始化的核心包括:

DAGScheduler:负责创建 Job,RDD 的 Stage 划分以及提交。

TaskScheduler:负责 Task 的调度,调度的 Task 是由 DAGScheduler 创建的。

*SchedulerBackend:负责连接 Master 并注册当前的程序;申请资源(Executor)和 task 具体到 Executor 的执行和管理,具体类型和 deploy Mode 有关比如 standalone 或者 on yarn

SparkEnv:Spark 运行时环境,Executor 依赖 SparkEnv,它内部包含有很多组件,例如 serializerManager,RpcEnv,BlockManager。(Driver 中也有 SparkEnv,这个是为了 Local[*]模式下能运行)

4、简述 Spark 个版本区别?1.x 与 2.x?

spark2.x 增加 Tungsten 执行引擎,比 spark1.x 快 10 倍。

spark2.x 增加了 SparkSession,把 spark1.x 的 SQLContext 和 HiveContext 整合了。

spark2.x 统一 DataFrames 和 Datasets 的 API

spark2.x 的 Spark Streaming 基于 Spark SQL 构建了 high-level API.使得 Spark Streaming 更好的受益于 Spark SQL 的易用性和性能提升。

5、使用 Spark 中遇到过哪些问题?如何解决的?

数据倾斜

内存溢出

6、Spark 的 Shuffle 过程?和 MR 的 Shuffle 区别?

Shuffle 是数据的重新分发过程,将各个节点的同一类数据汇聚到某一节点上进行计算。

Spark 和 Mr 的 Shuffle 区别:

1)本质上相同,都是把 Map 端数据分类处理后交由 Reduce 的过程。

2)数据流有所区别,MR 按 map, spill, merge, shuffle, sort, reduce 等各阶段逐一实现。Spark 基于 DAG 数据流,可实现更复杂数据流操作(根据宽/窄依赖实现)

3)实现功能上有所区别,MR 在 map 中做了排序操作,而 Spark 假定大多数应用场景 Shuffle 数据的排序操作不是必须的,而是采用 Aggregator 机制(Hashmap 每个元素 <K,V> 形式)实现。

(为了减少内存使用,Aggregator 是在磁盘进行,也就是说,尽管 Spark 是“基于内存的计算框架”,但是 Shuffle 过程需要把数据写入磁盘)

7、Spark 中的数据倾斜问题有啥好的解决方案?

导致数据倾斜的问题基本是使用 shuffle 算子引起的,所以我们先去找到代码中的 shuffle 的算子,比如 distinct、groupBYkey、reduceBykey、aggergateBykey、join、cogroup、repartition 等

1).数据做预处理(hive etl ,spark sql...)

2).采样倾斜 key 并分拆 join 操作

3).采用随机前缀和扩容 rdd 进行 join

4).提高 shuffle 操作的并行度

5).将 reduce join 变为 map join 实现(比如广播...)

6).两阶段聚合(局部聚合 + 全局聚合)

7).过滤少数导致倾斜的 key

预、拆、随、并、join、两、过

8、Spark 有哪些聚合类的算子,我们应该怎么避免使用这些算子?ReduceByKey 和 GroupByKey 的区别?

groupByKey,reduceByKey,aggregateByKey,sortByKey,join 等

ReduceByKey 更适合使用在大数据集上,在每个分区移动数据之前将输出数据进行 combine 操作。

foldByKey,aggregateByKey 都是由 combineByKey 实现,并且 mapSideCombine=true,因此可以使用这些函数替代 goupByKey。

9、SparkOnYarn 作业执行流程?yarn-client 和 yarn-cluster 的区别?

Spark On Yarn 的优势

1.Spark 支持资源动态共享,运行于 Yarn 的框架都共享一个集中配置好的资源池

2.可以很方便的利用 Yarn 的资源调度特性来做分类,隔离以及优先级控制负载,拥有更灵活的调度策略

3.Yarn 可以自由地选择 executor 数量

4.Yarn 是唯一支持 Spark 安全的集群管理器,使用 Yarn,Spark 可以运行于 Kerberized Hadoop 之上,在它们进程之间进行安全认证

在 yarn-cluster 模式下,driver 运行在 AM 中,负责向 Yarn(RM)申请资源,并监督 Application 的运行情况,

当 Client(这里的 Client 指的是 Master 节点)提交作业后,就会关掉 Client,作业会继续在 yarn 上运行,这也是 Cluster 模式不适合交互类型作业的原因。

在 Yarn-client 模式下,Driver 运行在 Client 上,通过 AM 向 RM 获取资源。本地 Driver 负责与所有的 executor container 进行交互,并将最后的结果汇总。结束掉终端,相当于 kill 掉这个 spark 应用。

10、Spark 中 Job、Task、RDD、DAG、Stage 的理解?

Job-Stage-Task 之间的关系:

一个 Spark 程序可以被划分为一个或多个 Job,划分的依据是 RDD 的 Action 算子,每遇到一个 RDD 的 Action 操作就生成一个新的 Job。

每个 spark Job 在具体执行过程中因为 shuffle 的存在,需要将其划分为一个或多个可以并行计算的 stage,划分的依据是 RDD 间的 Dependency 关系,当遇到 Wide Dependency 时划分不同的 Stage。

Stage 是由 Task 组组成的并行计算,因此每个 stage 中可能存在多个 Task,这些 Task 执行相同的程序逻辑,只是它们操作的数据不同。一般 RDD 的一个 Partition 对应一个 Task,Task 可以分为 ResultTask 和 ShuffleMapTask。

Application: 用户编写的 Spark 应用程序,包括一个 Driver 和多个 executors

Application jar: 包含用户程序的 Jar 包

Driver Program: 运行 main()函数并创建 SparkContext 进程

Cluster Manager: 在集群上获取资源的外部服务,如 standalone manager,yarn,Mesos

Deploy Mode: 部署模式,区别在于 driver process 运行的位置

Worker Node: 集群中可以运行程序代码的节点(机器)

Executor: 运行在 worker node 上执行具体的计算任务,存储数据的进程

Task: 被分配到一个 Executor 上的计算单元

Job: 由多个任务组成的并行计算阶段,因 RDD 的 Action 产生

Stage: 每个 Job 被分为小的计算任务组,每组称为一个 stage

DAGScheduler: 根据 Job 构建基于 Stage 的 DAG,并提交 Stage 给 TaskScheduler

TaskScheduler: 将 TaskSet 提交给 worker 运行,每个 executor 运行什么 task 在此分配

11、Spark 中 RDD 如何通过记录更新的方式容错?

一般而言,分布式数据集的容错性具备两种方式:数据检查点和记录数据的更新

checkpoint 机制——数据检查点

记录更新机制(在 Saprk 中对应 Lineage 机制)

RDD 只支持粗粒度转换,即只记录单个块上执行的单个操作,然后将创建 RDD 的一系列变换序列记录下来,以便恢复丢失的分区。

Lineage 本质上很类似于数据库中的重做日志(Redo Log),只不过这个重做日志粒度很大,是对全局数据做同样的重做进而恢复数据。

Rdd 在 Lineage 依赖方面划分成两种依赖:窄依赖(Narrow Dependencies)与宽依赖,根据父 RDD 分区是对应 1 个还是多个子 RDD 分区来区分窄依赖(父分区对应一个子分区)和宽依赖(父分区对应多个子分区)

容错原理:

在容错机制中,如果一个节点死机了,而且运算窄依赖,则只要把丢失的父 RDD 分区重算即可,不依赖于其他节点。而宽依赖需要父 RDD 的所有分区都存在,重算就很昂贵了。

可以这样理解开销的经济与否:在窄依赖中,在子 RDD 的分区丢失、重算父 RDD 分区时,父 RDD 相应分区的所有数据都是子 RDD 分区的数据,并不存在冗余计算。

在宽依赖情况下,丢失一个子 RDD 分区重算的每个父 RDD 的每个分区的所有数据并不是都给丢失的子 RDD 分区用的,会有一部分数据相当于对应的是未丢失的子 RDD 分区中需要的数据,这样就会产生冗余计算开销,这也是宽依赖开销更大的原因。

因此如果使用 Checkpoint 算子来做检查点,不仅要考虑 Lineage 是否足够长,也要考虑是否有宽依赖,对宽依赖加 Checkpoint 是最物有所值的。

12、Spark 常用调优方法?

原则一:避免创建重复的 RDD

原则二:尽可能复用同一个 RDD

原则三:对多次使用的 RDD 进行持久化

原则四:尽量避免使用 shuffle 类算子

原则五:使用 map-side 预聚合的 shuffle 操作

原则六:使用高性能的算子

原则七:广播大变量

原则八:使用 Kryo 优化序列化性能

原则九:优化数据结构

13、Spark 中宽依赖和窄依赖如何理解?

Spark 中 RDD 的高效与 DAG(有向无环图)有很大的关系,在 DAG 调度中需要对计算的过程划分 Stage,划分的依据就是 RDD 之间的依赖关系。

RDD 之间的依赖关系分为两种,宽依赖(wide dependency/shuffle dependency)和窄依赖(narrow dependency)。

1)窄依赖:就是指父 RDD 的每个分区只被一个子 RDD 分区使用,子 RDD 分区通常只对应常数个父 RDD 分区

窄依赖有分为两种:

OneToOneDependency: 一种是一对一的依赖

RangeDependency:范围的依赖,它仅仅被 org.apache.spark.rdd.UnionRDD 使用。

UnionRDD 是把多个 RDD 合成一个 RDD,这些 RDD 是被拼接而成,即每个 parent RDD 的 Partition 的相对顺序不会变,只不过每个 parent RDD 在 UnionRDD 中的 Partition 的起始位置不同

2)宽依赖:就是指父 RDD 的每个分区都有可能被多个子 RDD 分区使用,子 RDD 分区通常对应父 RDD 所有分区。

14、Spark 中 Job 和 Task 如何理解?

Task: 被分配到一个 Executor 上的计算单元

Job: 由多个任务组成的并行计算阶段,因 RDD 的 Action 产生

15、Spark 中 Transformation 和 Action 区别是什么?列举出常用的方法?

spark 中的数据都是抽象为 RDD 的,它支持两种类型的算子操作:Transformation 和 Action。

Transformation 算子的代码不会真正被执行。只有当我们的程序里面遇到一个 action 算子的时候,代码才会真正的被执行。

Transformation 算子主要包括:map、mapPartitions、flatMap、filter、union、groupByKey、repartition、cache 等。

Action 算子主要包括:reduce、collect、show、count、foreach、saveAsTextFile 等。

16、Spark 中 persist()和 cache()的区别?

RDD 可以使用 persist() 方法或 cache() 方法进行持久化。数据将会在第一次 action 操作时进行计算,并缓存在节点的内存中。

Spark 的缓存具有容错机制,如果一个缓存的 RDD 的某个分区丢失了,Spark 将按照原来的计算过程,自动重新计算并进行缓存。

// Persist this RDD with the default storage level (`MEMORY_ONLY`).

def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

// Persist this RDD with the default storage level (`MEMORY_ONLY`).

def cache(): this.type = persist()

17、Spark 中 map 和 mapPartitions 的区别?

map 是对 rdd 中的每一个元素进行操作。比如一个 partition 中有 1 万条数据。ok,那么你的 function 要执行和计算 1 万次。

mapPartitions 则是对 rdd 中的每个分区的迭代器进行操作。function 一次接收所有的 partition 数据。

18、Spark 中 Worker 和 Executor 的异同?

Master 和 Worker 是 Spark 的守护进程,即 Spark 在特定模式下正常运行所必须的进程。Driver 和 Executor 是临时程序,当有具体任务提交到 Spark 集群才会开启的程序。

Master:

Spark 特有资源调度系统的 leader,掌握整个系统的资源信息。例似于 Yarn 中的 RM

1)监听 worker,查看 worker 工作是否正常

2)对 worker 和 application 进行管理(接收 worker 注册信息并管理所有 worker 节点,接收 client 提交的 application 信息,调度等待的 application 并向 worker 提交)

Worker:

Spark 特有资源调度系统的 slave,每个节点掌握着节点的所有资源信息,例似于 Yarn 中的 NM

1)通过 RegisterWorker 注册到 Master

2)定时发送心跳给 Master

3)根据 master 发送的 application 配置进程环境,并启动 ExecutorBackend(执行 task 需要的临时进程)

19、Spark 中提供的两种共享变量是啥?

Spark 为此提供了两种共享变量,一种是 Broadcast Variable(广播变量),另一种是 Accumulator(累加变量)。

Broadcast Variable 会将使用到的变量,仅仅为每个节点拷贝一份,更大的用处是优化性能,减少网络传输以及内存消耗。

Accumulator 主要用于多个节点对一个变量进行共享性的操作。Accumulator 只提供了累加的功能。但是确给我们提供了多个 task 对一个变量并行操作的功能。

但是 task 只能对 Accumulator 进行累加操作,不能读取它的值。只有 Driver 程序可以读取 Accumulator 的值。

20、菲波那切数列可以用 Spark 做出来么?

可以

21、看过哪些 Spark 源码?

1).作业提交流程,deploy 模块

2).作业初始化,sparkcontext 流程

3).内存模块

4).存储模块

5).执行模块

6).数据集 RDD 模块

7).数据 shuffle 模块

22、Spark 通信机制?

1).通信方式:

1.6 版本之前 Spark 的通信机制只采用 Akka 通信框架;

1.6 版本之后加入 Netty 通信框架,并通过配置的方式允许用户自定义使用哪种通信方式;

2.0 版本之后把 Akka 去掉,只保留了 Netty。
2).RPC 通信协议:

网络通信需要遵守相同的通信协议,Spark 通信采用的 RPC 通信协议。
RPC 即远程过程调用协议,一种通过网络从远程计算机上请求服务而不需要了解底层网络传输技术的协议。

Akka:

Akka 是一个开发库和运行环境,可以用于构建高并发、分布式、可容错、事件驱动的基于 JVM 的应用。使构建高并发的分布式应用更加容易。Akka 最重要的是它的 Actor 模型。

Netty:

Netty 是一款异步的事件驱动的网络应用程序框架,支持快速地开发可维护的高性能的面向协议的服务器和客户端。

23、Spark 的存储级别有哪些?

MEMORY_ONLY : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,部分数据分区将不再缓存,在每次需要用到这些数据时重新进行计算。这是默认的级别。

MEMORY_AND_DISK : 将 RDD 以反序列化 Java 对象的形式存储在 JVM 中。如果内存空间不够,将未缓存的数据分区存储到磁盘,在需要使用这些分区时从磁盘读取。

MEMORY_ONLY_SER : 将 RDD 以序列化的 Java 对象的形式进行存储(每个分区为一个 byte 数组)。

这种方式会比反序列化对象的方式节省很多空间,尤其是在使用 fast serializer 时会节省更多的空间,但是在读取时会增加 CPU 的计算负担。

MEMORY_AND_DISK_SER : 类似于 MEMORY_ONLY_SER,但是溢出的分区会存储到磁盘,而不是在用到它们时重新计算。

DISK_ONLY : 只在磁盘上缓存 RDD。

MEMORY_ONLY_2,MEMORY_AND_DISK_2,等等 : 与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。

OFF_HEAP(实验中): 类似于 MEMORY_ONLY_SER ,但是将数据存储在 off-heap memory,这需要启动 off-heap 内存。

24、Spark 序列化模式有哪些?

spark 是分布式的计算框架,其中涉及到了 rpc 的通信和中间数据的缓存。spark 为了高效率的通信和减少数据存储空间,会把数据先序列化,然后处理。

Java 序列化: 在默认情况下,Spark 采用 Java 的 ObjectOutputStream 序列化一个对象。

该方式适用于所有实现了 java.io.Serializable 的类。

Java 序列化非常灵活,但是速度较慢,在某些情况下序列化的结果也比较大。

Kryo 序列化: Kryo 是 Spark 引入的一个外部的序列化工具, 可以增快 RDD 的运行速度,
因为 Kryo 序列化后的对象更小, 序列化和反序列化的速度非常快

Kryo 不但速度极快, 而且产生的结果更为紧凑(通常能提高 10 倍)。

Kryo 的缺点是不支持所有类型, 为了更好的性能, 需要提前注册程序中所使用的类(class)。

数据压缩: 当指定了 spark.io.compression.codec 配置的值后,spark 会选择对应的压缩方式。

目前压缩方式支持三种方式, lz4, lzf, snappy。

25、Spark 使用到的安全协议有哪些?

SecurityManager 主要对账号、权限及身份认证进行设置和管理。

26、Spark 部署模式有哪些?

Spark 支持多种分布式部署模式,主要支持三种部署模式,分别是:Standalone、Spark on YARN 和 Spark on Mesos 模式。

Standalone 模式为 Spark 自带的一种集群管理模式,即独立模式,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统。

它是 Spark 实现的资源调度框架,其主要的节点有 Driver 节点、Master 节点和 Worker 节点。Standalone 模式也是最简单最容易部署的一种模式。

Spark on YARN 模式,即 Spark 运行在 Hadoop YARN 框架之上的一种模式。

Hadoop YARN(Yet Another Resource Negotiator,另一种资源协调者)是一种新的 Hadoop 资源管理器,它是一个通用资源管理系统,可为上层应用提供统一的资源管理和调度。

Spark on Mesos 模式,即 Spark 运行在 Apache Mesos 框架之上的一种模式。

Apache Mesos 是一个更强大的分布式资源管理框架,负责集群资源的分配,它允许多种不同的框架部署在其上,包括 YARN。它被称为是分布式系统的内核。

27、Spark 的 cache 后能不能接其它算子?是不是 action 操作?

cache 可以接其他算子,但是接了算子之后,起不到缓存应有的效果,因为会重新触发 cache。

cache 类算子的返回值必须复制给一个变量,在接下来的 job 中,直接使用这个变量就能读取到内存中缓存的数据。

cache 不是 action 操作。

28、Spark 中 reduceByKey 是 action 算子不?reduce 呢?

不是,很多人都会以为是 action,reduce rdd 是 action

29、Spark 中数据本地性是哪个阶段确定的?

dag 划分 stage 的时候,确定的具体的 task 运行在哪台机器上

30、Spark 中 RDD 的弹性提现在哪里?
  1. 自动的进行内存和磁盘的存储切换;
  2. 基于 Lingage 的高效容错;
  3. task 如果失败会自动进行特定次数的重试;
  4. stage 如果失败会自动进行特定次数的重试,而且只会计算失败的分片;
  5. checkpoint 和 persist,数据计算之后持久化缓存
  6. 数据调度弹性,DAG TASK 调度和资源无关
  7. 数据分片的高度弹性
31、Spark 中容错机制?
  1. .数据检查点,会发生拷贝,浪费资源
  2. .记录数据的更新,每次更新都会记录下来,比较复杂且比较消耗性能
32、Spark 中 RDD 的缺陷?
  1. 不支持细粒度的写和更新操作(如网络爬虫),spark 写数据是粗粒度的

所谓粗粒度,就是批量写入数据,为了提高效率。但是读数据是细粒度的也就是

说可以一条条的读

  1. 不支持增量迭代计算,Flink 支持
33、Spark 中有哪些聚合类的算子?应该避免什么类型的算子?

避免使用 reduceByKey、join、distinct、repartition 等会进行 shuffle 的算子

34、Spark 中并行度怎么设置比较合理一些?

并行度和数据规模无关,和内存与 CPU 有关。

每个 core 承载 2-4 个 partition

35、Spark 中数据的位置由谁来管理?

每个数据分片都对应具体物理位置,数据的位置被 blockManager 管理

36、Spark 中数据本地性有哪几种?

Spark 中的数据本地性有三种:
1).PROCESS_LOCAL 是指读取缓存在本地节点的数据

2).NODE_LOCAL 是指读取本地节点硬盘数据

3).ANY 是指读取非本地节点数据

通常读取数据 PROCESS_LOCAL>NODE_LOCAL>ANY,尽量使数据以 PROCESS_LOCAL 或 NODE_LOCAL 方式读取。

其中 PROCESS_LOCAL 还和 cache 有关,如果 RDD 经常用的话将该 RDD cache 到内存中,注意,由于 cache 是 lazy 的,所以必须通过一个 action 的触发,才能真正的将该 RDD cache 到内存中。

37、Spark 如何处理不被序列化的数据?

将不能序列化的内容封装成 object

38、Spark 中 collect 功能是啥?其底层是如何实现的?

driver 通过 collect 把集群中各个节点的内容收集过来汇总成结果,collect 返回结果是 Array 类型的,

collect 把各个节点上的数据抓过来,抓过来数据是 Array 型,collect 对 Array 抓过来的结果进行合并

39、Spark 作业在没有获得足够资源就开始启动了,可能会导致什么问题?

task 的调度线程和 Executor 资源申请是异步的。

会导致执行该 job 时候集群资源不足,导致执行 job 结束也没有分配足够的资源,分配了部分 Executor,该 job 就开始执行 task。

如果想等待申请完所有的资源再执行 job 的:

需要将 spark.scheduler.maxRegisteredResourcesWaitingTime 设置的很大;

spark.scheduler.minRegisteredResourcesRatio 设置为 1,

但是应该结合实际考虑否则很容易出现长时间分配不到资源,job 一直不能运行的情况。

40、Spark 中 map 和 flatmap 有啥区别?

map:是将函数用于 RDD 中的每个元素,将返回值构成新的 RDD。

flatMap:是将函数应用于 RDD 中的每个元素,将返回的迭代器的所有内容构成新的 RDD,这样就得到了一个由各列表中的元素组成的 RDD,而不是一个列表组成的 RDD。

41、介绍一下 join 操作优化经验?

join 其实常见的就分为两类: map-side join 和 reduce-side join。

当大表和小表 join 时,用 map-side join 能显著提高效率。将多份数据进行关联是数据处理过程中非常普遍的用法,不过在分布式计算系统中,这个问题往往会变的非常麻烦,

因为框架提供的 join 操作一般会将所有数据根据 key 发送到所有的 reduce 分区中去,也就是 shuffle 的过程。造成大量的网络以及磁盘 IO 消耗,运行效率极其低下,这个过程一般被称为 reduce-side-join。

如果其中有张表较小的话,我们则可以自己实现在 map 端实现数据关联,跳过大量数据进行 shuffle 的过程,运行时间得到大量缩短,根据不同数据可能会有几倍到数十倍的性能提升。

热掉数据再分区

key 先聚合再 join

42、Spark 有哪些组件?

Application:基于 Spark 的用户程序,即由用户编写的调用 Spark API 的应用程序,它由集群上的一个驱动(Driver)程序和多个执行器(Executor)程序组成。

其中应用程序的入口为用户所定义的 main 方法。

SparkContext:是 Spark 所有功能的主要入口点,它是用户逻辑与 Spark 集群主要的交互接口。

通过 SparkContext,可以连接到集群管理器(Cluster Manager),能够直接与集群 Master 节点进行交互,并能够向 Master 节点申请计算资源,

也能够将应用程序用到的 JAR 包或 Python 文件发送到多个执行器(Executor)节点上。

Cluster Manager:即集群管理器,它存在于 Master 进程中,主要用来对应用程序申请的资源进行管理。

Worker Node:任何能够在集群中能够运行 Spark 应用程序的节点。

Task:由 SparkContext 发送到 Executor 节点上执行的一个工作单元。

Driver:也即驱动器节点,它是一个运行 Application 中 main()函数并创建 SparkContext 的进程。

Driver 节点也负责提交 Job,并将 Job 转化为 Task,在各个 Executor 进程间协调 Task 的调度。Driver 节点可以不运行于集群节点机器上。

Executor:也即执行器节点,它是在一个在工作节点(Worker Node)上为 Application 启动的进程,它能够运行 Task 并将数据保存在内存或磁盘存储中,也能够将结果数据返回给 Driver。

43、Spark 的工作机制?

一、应用执行机制

Driver 进程运行在客户端(Client 模式):

即用户在客户端直接运行程序。

程序的提交过程大致会经过以下阶段:

  1. 用户运行程序。
  2. 启动 Driver 进行(包括 DriverRunner 和 SchedulerBackend),并向集群的 Master 注册。
  3. Driver 在客户端初始化 DAGScheduler 等组件。
  4. Woker 节点向 Master 节点注册并启动 Executor(包括 ExecutorRunner 和 ExecutorBackend)。
  5. ExecutorBackend 启动后,向 Driver 内部的 SchedulerBackend 注册,使得 Driver 可以找到计算节点。
  6. Driver 中的 DAGScheduler 解析 RDD 生成 Stage 等操作。
  7. Driver 将 Task 分配到各个 Executor 中并行执行。
  8. Driver 进程运行在集群中(某个 Worker 节点,Cluster 模式):

即用户将 Spark 程序提交给 Master 分配执行。

大致会经过一下流程:

  1. 用户启动客户端,提交 Spark 程序给 Master。
  2. Master 针对每个应用分发给指定的 Worker 启动 Driver 进行。
  3. Worker 收到命令之后启动 Driver 进程(即 DriverRunner 和其中的 SchedulerBackend),并向 Master 注册。
  4. Master 指定其他 Worker 启动 Executor(即 ExecutorRunner 和其内部的 ExecutorBackend)。
  5. ExecutorBackend 向 Driver 中的 SchedulerBackend 注册。
  6. Driver 中的 DAGScheduler 解析 RDD 生产 Stage 等。
  7. Executor 内部启动线程池并行化执行 Task。
44、Spark 中的宽窄依赖?

参考: 13、Spark 中宽依赖和窄依赖如何理解?

45、Spark 如何划分 stage?

Shuffle 是产生宽依赖 RDD 的算子,例如 reduceByKey、reparttition、sortByKey 等算子。同一个 Stage 内的所有 Transformation 算子所操作的 RDD 都是具有相同的 Partition 数量的。

Stage 划分基于数据依赖关系的,一般分为两类:宽依赖(Shuffle Dependency)与窄依赖(Narrow Dependency)。

宽依赖,父 RDD 的一个分区会被子 RDD 的多个分区使用。

窄依赖,父 RDD 的分区最多只会被子 RDD 的一个分区使用。

区分宽窄依赖,我们主要从父 RDD 的 Partition 流向来看:流向单个 RDD 就是窄依赖,流向多个 RDD 就是宽依赖。

Spark Stage 划分,就是从最后一个 RDD 往前推算,遇到窄依赖(NarrowDependency)就将其加入该 Stage,当遇到宽依赖(ShuffleDependency)则断开。

每个 Stage 里 task 的数量由 Stage 最后一个 RDD 中的分区数决定。如果 Stage 要生成 Result,则该 Stage 里的 Task 都是 ResultTask,否则是 ShuffleMapTask。

ShuffleMapTask 的计算结果需要 shuffle 到下一个 Stage,其本质上相当于 MapReduce 中的 mapper。Result Task 则相当于 MapReduce 中的 reducer。

因此整个计算过程会根据数据依赖关系自后向前建立,遇到宽依赖则形成新的 Stage。

Stage 的调度是由 DAG Scheduler 完成的。由 RDD 的有向无环图 DAG 切分出了 Stage 的有向无环图 DAG。

Stage 以最后执行的 Stage 为根进行广度优先遍历,遍历到最开始执行的 Stage 执行,如果提交的 Stage 仍有未完成的父 Stage,则 Stage 需要等待其父 Stage 执行完才能执行。

46、spark-submit 时候如何引用外部的 jar 包?

方法一:spark-submit –jars

根据 spark 官网,在提交任务的时候指定–jars,用逗号分开。

命令:spark-submit --master yarn-client --jars .jar,.jar

方法二:extraClassPath

提交时在 spark-default 中设定参数,将所有需要的 jar 包考到一个文件里,然后在参数中指定该目录就可以

spark.executor.extraClassPath=/home/hadoop/xxx_workspace/lib/*

spark.driver.extraClassPath=/home/hadoop/xxx_workspace/lib/*

47、Spark 中 RDD 有哪些特性?

RDD(Resilient Distributed Dataset)叫做分布式数据集,是 Spark 中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。

1,分区列表( a list of partitions)

Spark RDD 是被分区的,每一个分区都会被一个计算任务(Task)处理,分区数决定了并行计千算的数量,RDD 的并行度默认从父 RDD 传给子 RDD。

2,每一个分区都有一个计算函数( a function for computing each split)

3,依赖于其他 RDD 的列表( a list of dependencies on other RDDS)

4,key- value 数据类型的 RDD 分区器( a Partitioner for key- alue RDDS)、控制分区策略和分区数

5,每个分区都有一个优先位置列表( a list of preferred locations to compute each split on)

48、Spark 的一个工作流程?

(1)任何 spark 的应用程序都包含 Driver 代码和 Executor 代码。Spark 应用程序首先在 Driver 初始化 SparkContext。

因为 SparkCotext 是 Spark 应用程序通往集群的唯一路径,在 SparkContext 里面包含了 DAGScheduler 和 TaskScheduler 两个调度器类。在创建 SparkContext 对象的同时也自动创建了这两个类。

(2)SparkContext 初始化完成后,首先根据 Spark 的相关配置,向 Cluster Master 申请所需要的资源,然后在各个 Worker 结点初始化相应的 Executor。

Executor 初始化完成后,Driver 将通过对 Spark 应用程序中的 RDD 代码进行解析,生成相应的 RDD graph(RDD 图),该图描述了 RDD 的相关信息及彼此之间的依赖关系。

(3)RDD 图构建完毕后,Driver 将提交给 DAGScheduler 进行解析。DAGScheduler 在解析 RDD 图的过程中,当遇到 Action 算子后将进行逆向解析,根据 RDD 之间的依赖关系以及是否存在 shuffle 等,

将 RDD 图解析成一系列具有先后依赖关系的 Stage。Stage 以 shuffle 进行划分,即如果两个 RDD 之间存在宽依赖的关系,DAGScheduler 将会在这 RDD 之间拆分为两个 Stage 进行执行,

且只有在前一个 Stage(父 Stage)执行完毕后,才执行后一个 Stage。

(4)DAGScheduler 将划分的一系列的 Stage(TaskSet),按照 Stage 的先后顺序依次提交给底层的调度器 TaskScheduler 去执行。

(5)TaskScheduler 接收到 DAGScheduler 的 Stage 任务后,将会在集群环境中构建一个 TaskSetManager 实例来管理 Stage(TaskSet) 的生命周期。

(6)TaskSetManager 将会把相关的计算代码、数据资源文件等发送到相应的 Executor 上,并在相应的 Executor 上启动线程池执行。TaskSetManager 在执行过程中,使用了一些优化的算法,

用于提高执行的效率,譬如根据数据本地性决定每个 Task***位置、推测执行碰到 Straggle 任务需要放到别的结点上重试、出现 shuffle 输出数据丢失时要报告 fetch failed 错误等机制。

(7)在 Task 执行的过程中,可能有部分应用程序涉及到 I/O 的输入输出,在每个 Executor 由相应的 BlockManager 进行管理,相关 BlockManager 的信息将会与 Driver 中的 Block tracker 进行交互和同步。

(8)在 TaskThreads 执行的过程中,如果存在运行错误、或其他影响的问题导致失败,TaskSetManager 将会默认尝试 3 次,尝试均失败后将上报 TaskScheduler,TaskScheduler 如果解决不了,

再上报 DAGScheduler,DAGScheduler 将根据各个 Worker 结点的运行情况重新提交到别的 Executor 中执行。

(9)TaskThreads 执行完毕后,将把执行的结果反馈给 TaskSetManager,TaskSetManager 反馈给 TaskScheduler,TaskScheduler 再上报 DAGScheduler,

DAGScheduler 将根据是否还存在待执行的 Stage,将继续循环迭代提交给 TaskScheduler 去执行。

(10)待所有的 Stage 都执行完毕后,将会最终达到应用程序的目标,或者输出到文件、或者在屏幕显示等,Driver 的本次运行过程结束,等待用户的其他指令或者关闭。

(11)在用户显式关闭 SparkContext 后,整个运行过程结束,相关的资源被释放或回收。

49、SparkOnYarn 与 standalone 区别?
  1. Yarn 支持动态资源配置。
  2. Standalone 模式只支持简单的固定资源分配策略,每个任务固定数量的 core,各 Job 按顺序依次分配在资源,资源不够的时候就排队。

这种模式比较适合单用户的情况,多用户的情境下,会有可能有些用户的任务得不到资源。

Yarn 作为通用的种子资源调度平台,除了 Spark 提供调度服务之外,还可以为其他系统提供调度,如 Hadoop MapReduce, Hive 等。

50、Spark 优化之内存管理?

1.设置 cache

2.设置 jvm 堆外内存

51、Spark 优化之广播变量?

broadcast 就是将数据从一个节点发送到其他各个节点上去。

比如 driver 上有一张表,其他节点上运行的 task 需要 lookup 这张表,那么 driver 可以先把这张表 copy 到这些节点,这样 task 就可以在本地查表了。

52、Spark 优化之数据本地性?

数据本地化目的,只移动计算,不移动数据,避免网络资源浪费。

1).首先了解 Application 任务执行流程:

在 Spark Application 提交后,Driver 会根据 action 算子划分成一个个的 job,然后对每一 个 job 划分成一个个的 stage,stage 内部实际上是由一系列并行计算的 task 组成的

然后 以 TaskSet 的形式提交给你 TaskScheduler,TaskScheduler 在进行分配之前都会计算出每一个 task 最优计算位置。

Spark 的 task 的分配算法优先将 task 发布到数据所在的节点上,从而达到数据最优计算位置。

2).数据本地化五种级别

TaskSetManager 的 Locality Levels 分为以下五个级别:

• 1).PROCESS_LOCAL 进程本地化 ,task 要计算的数据在同一个 Executor 中

• 2).NODE_LOCA 节点本地化 ,速度比 PROCESS_LOCAL 稍慢,因为数据需要在不同进程之间传递或从文件中读取

情况一:task 要计算的数据是在同一个 Worker 的不同 Executor 进程中

情况二:task 要计算的数据是在同一个 Worker 的 磁盘上,或在 HDFS 上,恰好有 block 在同一个节点上。

Spark 计算数据来源于 HDFS,那么最好的数据本地化级别就是 NODE_LOCAL

• 3).NO_PREF 没有本地化 , 没有最佳 位置这一说,数据从哪里访问都一样快,不需要位置优先。比如说 SparkSQL 读取 MySQL 中的数据

• 4).RACK_LOCAL 机架本地化(集群内), 数据在同一机架的不同节点上。需要通过网络传输数据及文件 IO,比 NODE_LOCAL 慢

情况一:task 计算的数据在 Worker2 的 Executor 中

情况二:task 计算的数据在 Worker2 的磁盘上

• 5).ANY 跨机架本地化 ,数据在非同一机架的网络上,速度最慢

Spark 中的数据本地化由 DAGScheduler 和 TaskScheduler 共同负责。

DAGScheduler 切割 Job,划分 Stage, 通过调用 submitStage 来提交一个 Stage 对应的 tasks,submitStage 会调用 submitMissingTasks

submitMissingTasks 确定每个需要计算的 task 的 preferredLocations,通过调用 getPreferrdeLocations()得到 partition 的优先位置

就是这个 partition 对应的 task 的优先位置,对于要提交到 TaskScheduler 的 TaskSet 中的每一个 task,该 task 优先位置与其对应的 partition 对应的优先位置一致。

TaskScheduler 接收到了 TaskSet 后,TaskSchedulerImpl 会为每个 TaskSet 创建一个 TaskSetManager 对象,该对象包含 taskSet 所有 tasks,

并管理这些 tasks 的执行,其中就包括计算 TaskSetManager 中的 tasks 都有哪些 locality levels,以便在调度和延迟调度 tasks 时发挥作用。

53、Spark 中 task 有几种类型?

有 2 种:

resultTask 类型,最后一个 task

shuffleMapTask 类型,除了最后一个 task 都是

54、Spark 中 repartition 和 coalesce 区别?

1)spark 分区 partition 的理解:

spark 中是以 vcore 级别调度 task 的。

如果读取的是 hdfs,那么有多少个 block,就有多少个 partition

举例来说:sparksql 要读表 T, 如果表 T 有 1w 个小文件,那么就有 1w 个 partition

这时候读取效率会较低。假设设置资源为 --executor-memory 2g --executor-cores 2 --num-executors 5。

步骤是拿出 1-10 号 10 个小文件(也就是 10 个 partition) 分别给 5 个 executor 读取(spark 调度会以 vcore 为单位,实际就是 5 个 executor,10 个 task 读 10 个 partition)

如果 5 个 executor 执行速度相同,再拿 11-20 号文件 依次给这 5 个 executor 读取,而实际执行速度不会完全相同,那就是哪个 task 先执行完,哪个 task 领取下一个 partition 读取执行,

以此类推。这样往往读取文件的调度时间大于读取文件本身,而且会频繁打开关闭文件句柄,浪费较为宝贵的 io 资源,执行效率也大大降低。

2)coalesce 与 repartition 的区别:

repartition(numPartitions:Int):RDD[T]和 coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]

repartition 只是 coalesce 接口中 shuffle 为 true 的实现

3)例子:

有 1w 的小文件,资源也为--executor-memory 2g --executor-cores 2 --num-executors 5。

repartition(4):产生 shuffle。这时会启动 5 个 executor 像之前介绍的那样依次读取 1w 个分区的文件,然后按照某个规则 %4,写到 4 个文件中,这样分区的 4 个文件基本毫无规律,比较均匀。

coalesce(4):这个 coalesce 不会产生 shuffle。那启动 5 个 executor 在不发生 shuffle 的时候是如何生成 4 个文件呢,其实会有 1 个或 2 个或 3 个甚至更多的 executor 在空跑

(具体几个 executor 空跑与 spark 调度有关,与数据本地性有关,与 spark 集群负载有关),他并没有读取任何数据!

我们常认为 coalesce 不产生 shuffle 会比 repartition 产生 shuffle 效率高,而实际情况往往要根据具体问题具体分析,coalesce 效率不一定高:

coalesce 与 repartition 他们两个都是 RDD 的分区进行重新划分,repartition 只是 coalesce 接口中 shuffle 为 true 的实现(假设源 RDD 有 N 个分区,需要重新划分成 M 个分区)

  1. 如果 N<M。一般情况下 N 个分区有数据分布不均匀的状况,利用 HashPartitioner 函数将数据重新分区为 M 个,这时需要将 shuffle 设置为 true(repartition 实现,coalesce 也实现不了)。
  2. 如果 N>M 并且 N 和 M 相差不多,(假如 N 是 1000,M 是 100)那么就可以将 N 个分区中的若干个分区合并成一个新的分区,最终合并为 M 个分区,这时可以将 shuff 设置为 false(coalesce 实现)
  3. 如果 N>M 并且两者相差悬殊,这时你要看 executor 数与要生成的 partition 关系,如果 executor 数小于等于要生成 partition 数,coalesce 效率高,

反之如果用 coalesce 会导致(executor 数-要生成 partiton 数)个 excutor 空跑从而降低效率。

55、Spark 中基本概念?

Application:用户编写的 Spark 应用程序。

Driver:Spark 中的 Driver 即运行上述 Application 的 main 函数并创建 SparkContext,创建 SparkContext 的目的是为了准备 Spark 应用程序的运行环境,

在 Spark 中有 SparkContext 负责与 ClusterManager 通信,进行资源申请、任务的分配和监控等,当 Executor 部分运行完毕后,Driver 同时负责将 SparkContext 关闭。

Executor:是运行在工作节点(WorkerNode)的一个进程,负责运行 Task。

RDD:弹性分布式数据集,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型。

DAG:有向无环图,反映 RDD 之间的依赖关系。

Task:运行在 Executor 上的工作单元。

Job:一个 Job 包含多个 RDD 及作用于相应 RDD 上的各种操作。

Stage:是 Job 的基本调度单位,一个 Job 会分为多组 Task,每组 Task 被称为 Stage,或者也被称为 TaskSet,代表一组关联的,相互之间没有 Shuffle 依赖关系的任务组成的任务集。

Cluter Manager:指的是在集群上获取资源的外部服务。目前有三种类型

  1. Standalon : spark 原生的资源管理,由 Master 负责资源的分配
  2. Apache Mesos:与 hadoop MR 兼容性良好的一种资源调度框架
  3. Hadoop Yarn: 主要是指 Yarn 中的 ResourceManager
56、IV,WOE 计算?
  1. IV (information value) 衡量的是某一个变量的信息量,N 为分组的组数,IV 可用来表示一个变量的预测能力。

IV 预测能力

<0.03 无预测能力

0.03-0.09 低

0.1-0.29 中

0.3-0.49 高

>=0.5 极高且可疑

IV 值的取值范围是[0,+∞),且当前分组中只包含响应客户或者未响应客户时,IV = +∞。

  1. WOE 的全称是“Weight of Evidence”,即证据权重。WOE 是对原始自变量的一种编码形式。

要对一个变量进行 WOE 编码,需要首先把这个变量进行分组处理(也叫离散化、分箱等等,说的都是一个意思)。

feature 在 label 中分布不足,或导致计算结果不准确,部分 IV 值会很大

加入填充值后,整体 IV 值降低到合理区间

WOE 其实描述了变量当前这个分组,对判断个体是否会响应(或者说属于哪个类)所起到影响方向和大小,

当 WOE 为正时,变量当前取值对判断个体是否会响应起到的正向的影响,

当 WOE 为负时,起到了负向影响。而 WOE 值的大小,则是这个影响的大小的体现。

从上面 IV 的计算结果我们可以看出 IV 的以下特点:

对于变量的一个分组,这个分组的 0 标签比例和 1 标签比例与样本整体 0 和 1 的比例相差越大,IV 值越大,否则,IV 值越小;

极端情况下,当前分组的 0 标签比例和 1 标签比例和样本整体的 0 和 1 的比例相等时,IV 值为 0;

IV 值的取值范围是[0,+∞),且,当当前分组中只包含 0 标签数量或者 1 标签数量时,IV = +∞。

  1. AUC(Area Under Curve)被定义为 ROC 曲线下与坐标轴围成的面积,显然这个面积的数值不会大于 1。

又由于 ROC 曲线一般都处于 y=x 这条直线的上方,所以 AUC 的取值范围在 0.5 和 1 之间。

AUC 越接近 1.0,检测方法真实性越高;

等于 0.5 时,则真实性最低,无应用价值。

  1. KS(Kolmogorov-Smirnov)统计量由两位苏联数学家 A.N. Kolmogorov 和 N.V. Smirnov 提出。在风控中,KS 常用于评估模型区分度。区分度越大,说明模型的风险排序能力(ranking ability)越强。

KS 值的取值范围是[0,1],一般习惯乘以 100%。通常来说,KS 越大,表明正负样本区分程度越好。

20 以下:不建议采用

20-40:较好

41-50:良好

51-60:很强

61-75:非常强

75 以上:能力高但疑似有误

57、Spark 中的 ShuffleManager?
  1. 未经优化的 HashShuffleManager

这是 spark1.2 版本之前,最早使用的 shuffle 方法,这种 shuffle 方法不要使用,只是用来对比改进后的 shuffle 方法。

上游每个 task 都输出下游 task 个数的结果文件,下游每个 task 去上游 task 输出的结果文件中获取对应自己的文件。

缺点:生成文件个数过多,生成和传输文件数量等于 上游 task 数量 * 下游 task 数量 个文件。

  1. 经过优化以后的 HashShufferManager

上游 1 个 Executor 所有 task 顺序输出下游 task 个数的结果文件,下游每个 task 去上游 task 输出的结果文件中获取对应自己的。

  1. SortShuffleManager-普通运行机制
  2. 、数据会先写入一个内存数据结构中,此时根据不同的 shuffle 算子,可能选用不同的数据结构(Map or Array )。
  3. 、如果是 reduceByKey 这种聚合类的 shuffle 算子,那么会选用 Map 数据结构,一边通过 Map 进行聚合,一边写入内存
  4. 、如果是普通的 shuffle 算子如 join,count 等,那么会选用 Array 数据结构,直接写入内存。
  5. 、然后,每写一条数据进入内存数据结构之后如果达到了某个临界阈值,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
  6. 、在溢写到磁盘文件之前,会先根据 key 对内存数据结构中已有的数据进行排序。
  7. 、排序过后,会分批将数据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的形式通过 Java 的 BufferedOutputStream 写入磁盘文件。
  8. 、一个 task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,

这就是 merge 过程,此时会将之前所有临时磁盘文件中的数据读取出来,生成 1 个文件 + 索引文件(标识了下游各个 task 的数据在文件中的 start offset 与 end offset)。

  1. SortShuffleManager-bypass 运行机制
  2. 、上游 task 会为每个下游 task 都创建一个内存缓冲,并根据 key 的 hash 值写入对应的缓冲区。
  3. 、缓冲区满之后溢写到磁盘文件的。
  4. 、最后,将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

对比未经优化的 HashShuffleManager:

其实前面的步骤和未经优化的 HashShuffleManager 是一摸一样额,只是最后多了一了 merge 的操作,产生的文件包括一个盘文件和一个索引文件。

最终磁盘文件的数量等于上游 task 的数量

58、Spark 中 Shuffle 时候数据一定会落磁盘么?

在目前的 Spark 实现中,shuffle block 一定是落地到磁盘的。

在 Spark 0.6 和 0.7 时,Shuffle 的结果都需要先存储到内存中(有可能要写入磁盘),因此对于大数据量的情况下,发生 GC 和 OOM 的概率非常大。

因此在 Spark 0.8 的时候,Shuffle 的每个 record 都会直接写入磁盘,并且为下游的每个 Task 都生成一个单独的文件。

59、Spark 和 MR 中 Shuffle 不同?Spark 的优势?

功能上,MR 的 shuffle 和 Spark 的 shuffle 是没啥区别的,都是对 Map 端的数据进行分区,要么聚合排序,要么不聚合排序,然后 Reduce 端或者下一个调度阶段进行拉取数据,完成 map 端到 reduce 端的数据传输功能。

方案上,有很大的区别,MR 的 shuffle 是基于合并排序的思想,在数据进入 reduce 端之前,都会进行 sort,为了方便后续的 reduce 端的全局排序,而 Spark 的 shuffle 是可选择的聚合,特别是 1.2 之后,需要通过调用特定的算子才会触发排序聚合的功能。

数据拉取,MR 的 reduce 是直接拉去 Map 端的分区数据,而 Spark 是根据索引读取,而且是在 action 触发的时候才会拉去数据。

60、Spark 如何做 checkpoint?

用 sparkContext 设置 hdfs 的 checkpoint 的目录。

sc.setCheckpointDir("hdfs://localhost:9001/checkpoint_xxx")

checkpoint 也是个 transformation 的算子,获取数据可以使用 collect。

61、Spark 比 MR 速度快的原因?

1).shuffle 机制: mr 的每次 mapreduce 都会有一次 shuffle,而 spark 只有碰到宽依赖时候才会发生 shuffle

2).jvm 优化: mr 是以进程的方式运行在 yarn 集群上,一个 job 有 1024 个 MapTask,这个时候就需要开启 1024 个进程取处理这 1024 个 task,每启动一个 task 就会启动一次 jvm。

Spark 的任务是以线程方式运行在进程中的,只在启动 Executor 进程时启动一 次 jvm,每次执行一个 task 都是复用 Executor 进程中的线程。

(Executor 中维护着一个线程池)。Spark 和 MR 相比节省了大量启动 jvm 的时间

3).IO 操作: mr 是基于磁盘的,spark 是基于内存的,mr 的每次 shuffle 必须要写入磁盘

62、Spark distinct 去重原理?

map(x => (x, null)).reduceByKey((x, y) => x).map(_._1)

使用 map 算子把元素转为带有 null 的元组,使用 reduceByKey 对具有相同 key 的元素统计,之后再使用 map 算子,取得元组中的单词元素,实现去重效果

63、Spark cache 和 checkpoint 区别?

cache 是 persist 默认存储级别调用,如果内存放不下会丢掉,下次使用重新计算。

// Persist this RDD with the default storage level (`MEMORY_ONLY`).

def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

// Persist this RDD with the default storage level (`MEMORY_ONLY`).

def cache(): this.type = persist()

如果内存空间不够,部分数据分区将不再缓存,在每次需要用到这些数据时重新进行计算。这是默认的级别。

checkpoint 不会丢弃数据,会对数据做持久化存储。