WRY

Where Are You?
You are on the brave land,
To experience, to remember...

0%

Spark 初识

概览

Spark是分布式数据处理的通用框架,提供了用于大规模数据操作、内存数据缓存和跨计算机重用的API。它在分区数据上应用一组粗粒度转换函数并依赖于数据集在失败时重新计算。尤为值得一提的是,Spark支持主流的数据格式集成了多种数据存储系统,可以在YARN和Mesos上进行运行。

Spark is a generalized framework for distributed data processing providing functional API for manipulating data at scale, in-memory data caching and reuse across computations. It applies set of coarse-grained transformations over partitioned data and relies on dataset's lineage to recompute tasks in case of failures. Worth mentioning is that Spark supports majority of data formats, has integrations with various storage systems and can be executed on Mesos or YARN.

Spark 并行计算模型

基本概念

  • Cluster manager,负责在分布式系统中,启动和分发executors
  • Work node,计算节点,上面可以有一个或者多个executor
  • Executor,执行器,计算时使用的数据也是分布式存储在多台executors上面;一个executor可以执行一个或者多个tasks
  • Task,具体分为如下两种
    • ShuffleMapTask,将输入分区,来进行Shuffle
    • RestultTask,将其输出到driver上
  • RDD,它是一个不可变的分布式对象集合,存储在executor上
  • DAGScheduler 将DAG图切分成不同stages的tasks
  • Stage,将RDD图按照shuffle边界切成多个stage,和task类似,也有两种类型
    • ShuffleMapStage
    • ResultStage

RDD特点

  • 延迟计算,当RDD被使用到的时候才开始计算;通常会被写入磁盘或者被驱动器收集聚合起来。

  • 在Spark的应用程序的整个生命周期中,尽量存在于内存之中,实现快速访问的目的

  • RDD是不可修改的,计算会生成一个新的RDD,不会覆盖掉旧的RDD

RDD主要属性

RDD包含5个主要的属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//a list of partitions (e.g. splits in Hadoop)
def getPartitions: Array[Partition]

//a list of dependencies on other RDDs
def getDependencies: Seq[Dependency[_]]

//a function for computing each split
def compute(split: Partition, context: TaskContext): Iterator[T]

//(optional) a list of preferred locations to compute each split on
def getPreferredLocations(split: Partition): Seq[String] = Nil

//(optional) a partitioner for key-value RDDs
val partitioner: Option[Partitioner] = None

RDD操作

Spark的计算通常都是作用于RDD上,Spark在RDD上提供了很多操作,主要分为两大类

  • transformation

    • 将用户编写的函数应用到每一个元素上,或者整个分区上

    • 在整个数据集上应用聚合函数(aggregation function),例如group bysort by

    • 在RDD之间引入依赖关系,形成DAG图

    • 提供重新分区的功能

  • action

    • 触发一个job的执行
    • 用于将RDD数据转存成其他非RDD格式的数据

Transformation包含wide和narrow两种依赖方式。在下面将会详细介绍

Narrow(pipelineable)

  • parent RDD 的每个分区最多只会被一个child RDD的分区使用
  • 允许在cluster node上流水线的执行
  • 因为只需要重新计算丢失的父分区,所以故障恢复的效率更高

Wide(shuffle)

  • 多个child partition可能依赖于同一个parent partition
  • 需要所有的parent partition都被执行完成才能在节点之间进行shuffled操作
  • 如果某个分区执行失败了,其重新计算的过程是十分复杂的

Shuffle

在Shuffle阶段,ShuffleMapTask将block写入local driver上,然后在下一个stage的task通过网络访问这些block。

  • Shuffle Write
    • 在分区之间重新分配数据,并将他们写到disk上
    • 每个hash shuffle为每个reduce任务创建一个文件
    • sort shuffle,创建一个分配给reduce的文件
    • sort shuffle,使用内存排序,溢出到磁盘,得到最终的结果
  • Shuffle Read
    • 抓取文件并应用reduce逻辑
    • if data ordering is needed then it is sorted on “reducer” side for any type of shuffle 看不懂

Spark工作流程

  • 用户编写程序
  • Spark解析程序生成Spark Context,其中包含了RDD之间的生成关系,并根据RDD之间的相互关系翻译成DAG图,提交给Scheduler在worker节点上进行执行
  • cluster Manager分配任务到Executor
  • 在worker上的Executor执行task

Spark Streaming

spark接收流式数据,例如Kafka等将其变成批量的输入数据,再进行后续的处理

基本概念

  • DStream,是一个Spark Streaming的一个基本抽象,他是micro-batch为基础的,在每一个batch中DStream只会生成一个RDD。

在Dstream上面的函数可以分为

  • transformation:filter,map,transform etc.
  • output function:foreachRDD

DStream和RDD之间的关系

  • 都是延迟计算的
  • DStream必须有一个output function去触发数据收集
  • 在output function上的RDD,必须有一个action去触发执行

分布式共享变量

除了弹性分布式数据集RDD之外,Spark第二种低级API是“分布式共享变量”,主要包含两种类型:broadcast variable和accumulator。

broadcast variable

It can share readable and immutable variable effectively.

在驱动executor(待定)上使用变量的方法是function closure,但这会在executor上反复执行序列化和反序列化,他们将会被重复发送和不是只发送一次。所以通过引入broadcast variable的原因,他们缓存在集群中的每个节点上,而不是在每个任务中都反复序列化。

典型应用场景,把一个大型的查找表当作一个共享变量来传播。

accumulator

只适合满足交换律结合律的操作进行累加操作。

结构化API简介

基本概念

Spark支持两种结构化集合类型,分别是DataFrame和Dataset,二者之间的区别主要体现在类型化上。

  • DataFrame的类型完全由Spark负责维护的,暴露给用户而言是无类型的,仅在运行时检查这些类型和Schema中指定的类型是否一致。
  • Dataset会在编译时就检查类型是否符合规范,Dataset也仅适用于JVM的语言,例如Scala和Java,通过case类和Java beans指定类型。

在通常情况下,DataFrame就是一些Row类型的Dataset的集合。Row类型是Spark为支持内存计算而优化的数据格式,他会避免使用有昂贵的垃圾回收开销和对象实例化开销的JVM类型。

Schema 定义了DataFrame的列名和类型。

执行概述

步骤如下

  • 编写DataFrame / Dataset / SQL代码
  • Spark分析,转化成逻辑执行计划
  • 将Logical Plan 转化成 Physical Plan, 检查优化策略,在此过程检查优化
  • 在集群上执行物理计划

逻辑计划:

  • 首先将用户代码转换成未解析的逻辑计划,只保证语法是正确的
  • 未解析的逻辑计划会在catalog(所有表和DataFrame的信息存储库)的解析列和表格,经过解析之后转变成解析后的逻辑计划
  • 解析后的逻辑计划在Catalyst优化器的优化(主要通过谓词推理或选择器来实现)得到优化的逻辑计划

物理计划

  • 又被称为Spark计划,经过不同的物理执行策略,并通过代价模型的比较,从而指定如何在集群上执行逻辑计划
  • 物理执行计划产生一系列的RDD和转换操作

执行

  • Spark将所有代码运行在Spark的底层编程接口RDD上

参考资料