Spark简单笔记

2018-10-17 3162 0

spark生态系统

Installation

  1. brew install scala
  2. brew install apache-spark

SparkContext是Spark的上下文对象,是Spark程序的主入口点,负责连接到spark cluster。可用于创建RDD,在集群上创建累加器和广播变量。

每个jvm只能激活一个SparkContext对象,创建新的SparkContext对象时,必须先stop掉原来的对象。

Setup spark 1.6 on hadoop 2.6.0

在Spark官网 下载对应版本的spark bin文件并解压。

拷贝本机的配置文件/usr/lib/spark/conf到覆盖解压后的conf目录。

修改conf/spark-env.sh 下面的SPARK_HOME变量。将当前的bin目录添加到环境变量PATH中。

这样修改后发现spark-submit提交总是出错,发现自己设置了LD_LIBRARY_PATH,而spark-env.sh中也会设置这个变量。由于自己安装了zlib包,和spark使用的zlib包不兼容,导致提交后会出现解压错误。

Log collection

Spark日志存放路径与部署模式有关:

  1. 如果是Spark Standalone模式,可以直接在Master UI界面查看日志。
  2. 如果是YARN模式,可以使用yarn logs -applicationId来查看,前提是必须开启日志聚合功能yarn.log-aggregation-enable,在默认情况下,这个参数是false。
  3. 同事需要注意,使用yarn logs时需要使用提交spark任务的用户,yarn logs命令默认去找的是当前用户的路径。

Spark Concepts

先明确一下一些概念:
Driver: 驱动器,一个 job 只有一个,主要负责 job 的解析,与 task 的调度等。
Executor:执行器,实际运行 task 的地方,一个 job 有多个。
Stage: 一个Job被拆分成若干个Stage,每个Stage执行一些计算,产生一些中间结果。它们的目的是最终生成这个Job的计算结果。而每个Stage是一个task set,包含若干个task。Task是Spark中最小的工作单元,在一个executor上完成一个特定的事情。如果Stage之间没有依赖关系则可以并行执行。

每个Spark应用由Driver构成,Driver通过SparkContext来访问Spark,由它启动各种操作。Driver含有main函数和分布式数据集,对它们应用各种操作。spark-shell就是Driver(包含main函数)。

集群模式:spark基于hadoop安装时,默认读取hdfs上的文件,
本地模式:读取本地文件使用,默认使用local模式运行spark程序。

HDFS的block和Spark的partition

hdfs中的block是分布式存储的最小单元,类似于盛放文件的盒子,一个文件可能要占多个盒子,但一个盒子里的内容只可能来自同一份文件。假设block设置为128M,你的文件是250M,那么这份文件占3个block(128+128+2)。这样的设计虽然会有一部分磁盘空间的浪费,但是整齐的block大小,便于快速找到、读取对应的内容。(p.s. 考虑到hdfs冗余设计,默认三份拷贝,实际上3*3=9个block的物理空间。)

spark中的partition 是弹性分布式数据集RDD的最小单元,RDD是由分布在各个节点上的partition 组成的。partition 是指的spark在计算过程中,生成的数据在计算空间内最小单元,同一份数据(RDD)的partition 大小不一,数量不定,是根据application里的算子和最初读入的数据分块数量决定的,这也是为什么叫“弹性分布式”数据集的原因之一。

总结:
block位于存储空间、partition 位于计算空间
block的大小是固定的、partition 大小是不固定的
block是有冗余的、不会轻易丢失,partition(RDD)没有冗余设计、丢失之后重新计算得到

当Spark读取HDFS上的文件作为输入时,会根据具体数据格式对应的InputFormat进行解析,一般是将若干个Block合并成一个输入分片,称为InputSplit,注意InputSplit不能跨越文件。

随后将为这些输入分片生成具体的Task。InputSplit与Task是一一对应的关系。这些具体的Task每个都会被分配到集群上的某个节点的某个Executor去执行。

Task被执行的并发度 = Executor数目 * 每个Executor核数。

至于partition的数目:

  1. 对于数据读入阶段,例如sc.textFile,输入文件被划分为多少InputSplit就会需要多少初始Task。
  2. 在Map阶段partition数目保持不变。
  3. 在Reduce阶段,RDD的聚合会触发shuffle操作,聚合后的RDD的partition数目跟具体操作有关,例如repartition操作会聚合成指定分区数,还有一些算子是可配置的。

RDD:Resilient Distributed Dataset

类名: org.apache.spark.rdd.RDD

spark的核心是RDD(弹性分布式数据集),一种通用的数据抽象,封装了基础的数据操作,如map,filter,reduce等。可进行parallel计算,spark集群中所有调度和执行都是基于RDD。在RDD内部,每个RDD都有5个特征:

  1. 有一个分片列表,就是能被切分,和Hadoop一样,能够切分的数据才能并行计算。一组分片(partition),即数据集的基本组成单位,对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。每个分配的存储是由BlockManager实现的,每个分区都会被逻辑映射成BlockManager的一个Block,而这个Block会被一个Task负责计算。
  2. 由一个函数计算每一个分片,这里指的是下面会提到的compute函数。
  3. 对其他RDD的依赖列表,依赖还具体分为宽依赖和窄依赖,但并不是所有的RDD都有依赖。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
  4. (可选)key-value型的RDD是根据哈希来分区的,类似于mapreduce当中的paritioner接口,控制Key分到哪个reduce。一个partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
  5. (可选)每一分片的优先计算位置,比如HDFS的block的所在位置应该是优先计算的位置。一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

RDD的特点

  1. 它是在集群节点上的不可变的、已分区的集合对象。
  2. 通过并行转换的方式来创建如(map, filter, join, etc)。
  3. 失败自动重建。
  4. 可以控制存储级别(内存、磁盘等)来进行重用。
  5. 必须是可序列化的,内存不足时降级到磁盘存储。
  6. 是静态类型的。

RDD的操作

主要分两类:转换(transformation)和动作(action)。两类函数的主要区别是,转换接受RDD并返回RDD,而动作接受RDD但是返回非RDD。转换采用惰性调用机制,每个RDD记录父RDD转换的方法,这种调用链表称之为血缘(lineage);而动作调用会直接计算。

采用惰性调用,通过血缘连接的RDD操作可以管道化(pipeline),管道化的操作可以直接在单节点完成,避免多次转换操作之间数据同步的等待。

RDD的使用

RDD的使用一般可以抽象为下面的几步:

  1. 加载外部数据,创建RDD对象
  2. 使用转换(如filter),创建新的RDD对象
  3. 缓存需要重用的RDD
  4. 使用动作(如count),启动并行计算

RDD的容错机制

RDD的容错机制实现分布式数据集容错方法有两种:数据检查点记录更新

RDD采用记录更新的方式:记录所有更新点的成本很高。所以,RDD只支持粗颗粒变换,即只记录单个块(分区)上执行的单个操作,然后创建某个RDD的变换序列(血统 lineage)存储下来;

变换序列指,每个RDD都包含了它是如何由其他RDD变换过来的以及如何重建某一块数据的信息。因此RDD的容错机制又称“血统”容错。 要实现这种“血统”容错机制,最大的难题就是如何表达父RDD和子RDD之间的依赖关系。实际上依赖关系可以分两种,窄依赖和宽依赖。

窄依赖:子RDD中的每个数据块只依赖于父RDD中对应的有限个固定的数据块;
宽依赖:子RDD中的一个数据块可以依赖于父RDD中的所有数据块。

例如:map变换,子RDD中的数据块只依赖于父RDD中对应的一个数据块;groupByKey变换,子RDD中的数据块会依赖于多块父RDD中的数据块,因为一个key可能分布于父RDD的任何一个数据块中。

将依赖关系分类的两个特性:
第一,窄依赖可以在某个计算节点上直接通过计算父RDD的某块数据计算得到子RDD对应的某块数据;宽依赖则要等到父RDD所有数据都计算完成之后,并且父RDD的计算结果进行hash并传到对应节点上之后才能计算子RDD。

第二,数据丢失时,对于窄依赖只需要重新计算丢失的那一块数据来恢复;对于宽依赖则要将祖先RDD中的所有数据块全部重新计算来恢复。

所以在“血统”链特别是有宽依赖的时候,需要在适当的时机设置数据检查点。也是这两个特性要求对于不同依赖关系要采取不同的任务调度机制和容错恢复机制。

Stage划分

Stage划分的依据就是宽依赖,什么时候产生宽依赖呢?例如reduceByKey,groupByKey等Action。

  1. 从后往前推理,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到Stage中;
  2. 每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition数量决定的;
  3. 最后一个Stage里面的任务的类型是ResultTask,前面所有其他Stage里面的任务类型都是ShuffleMapTask;
  4. 代表当前Stage的算子一定是该Stage的最后一个计算步骤;

理解闭包(closures)

考虑下面一段代码

  1. var counter = 0
  2. var rdd = sc.parallelize(1 to 10)
  3. rdd.foreach(x => counter += x)
  4. println("Counter value: " + counter)

本地模式
以本地模式运行在单个JVM上,上面的代码会将RDD中的值进行累加,并且将它存储到counter中。这是因为RDD和变量counter在driver节点的相同内存空间中。

集群模式
以集群模式运行时,会更加复杂,上面的代码的结果也许不会如我们预期的那样。当执行一个作业(job)时,Spark会将RDD分成多个任务(task)—每一个任务都会由一个executor来执行。在执行之前,Spark会计算闭包(closure)。

闭包是对executors可见的那部分变量和方法,executors会用闭包来执行RDD上的计算(在这个例子中,闭包是foreach())。这个闭包是被序列化的,并且发送给每个executor。在本地模式中,只有一个executor,所以共享相同的闭包。然而,在集群模式中,就不是这样了。executors会运行在各自的worker节点中,每个executor都有闭包的一个复本。

发送给每个executor的闭包中的变量其实也是复本。每个foreach函数中引用的counter不再是driver节点上的counter。当然,在driver节点的内存中仍然存在这一个counter,但是这个counter对于executors来说是不可见的。executors只能看到自己的闭包中的复本。这样,counter最后的值仍旧是0,因为所有在counter的操作只引用了序列化闭包中的值。

为了在这样的场景中,确保这些行为正确,应该使用累加变量(Accumulator)。在集群中跨节点工作时,Spark中的累加变量提供了一种安全的机制来更新变量。所以可变的全局状态应该使用累加变量来定义。

所以上面的例子可以这样写:

  1. var counter = sc.accumulator(0)
  2. var rdd = sc.parallelize(1 to 10)
  3. rdd.foreach(x => counter += x)
  4. println("Counter value: " + counter)

RDD数据同步

RDD目前提供两个数据同步的方法:广播和累计器。

广播 broadcast
前面提到过,广播可以将变量发送到闭包中,被闭包使用。但是,广播还有一个作用是同步较大数据。比如你有一个IP库,可能有几G,在map操作中,依赖这个ip库。那么,可以通过广播将这个ip库传到闭包中,被并行的任务应用。广播通过两个方面提高数据共享效率:1,集群中每个节点(物理机器)只有一个副本,默认的闭包是每个任务一个副本;2,广播传输是通过BT下载模式实现的,也就是P2P下载,在集群多的情况下,可以极大的提高数据传输速率。广播变量修改后,不会反馈到其他节点。

累加器 Accumulator
累加器是一个write-only的变量,用于累加各个任务中的状态,只有在驱动程序中,才能访问累加器。而且,截止到1.2版本,累加器有一个已知的缺陷,在action操作中,n个元素的RDD可以确保累加器只累加n次,但是在transformation时,spark不确保,也就是累加器可能出现n+1次累加。

目前RDD提供的同步机制粒度太粗,尤其是转换操作中变量状态不能同步,所以RDD无法做复杂的具有状态的事务操作。不过,RDD的使命是提供一个通用的并行计算框架,估计永远也不会提供细粒度的数据同步机制,因为这与其设计的初衷是违背的。

RDD优化技巧

主要包括一些RDD lineage设计、算子的合理使用、特殊操作的优化等。如避免创建重复的RDD,对多次使用的RDD进行持久化等。

RDD缓存

需要使用多次的数据需要cache,否则会进行不必要的重复操作。举个例子

  1. var data=sc.textFile("file:///home/data/test.txt");
  2. println(data.filter(_.contains("error")).count)
  3. println(data.filter(_.contains("warning")).count)

上面的代码会导致data变量被加载两次,高校的做法是data加载完后,立刻持久化到内存中。

  1. var data=sc.textFile("file:///home/data/test.txt");
  2. data.cache
  3. println(data.filter(_.contains("error")).count)
  4. println(data.filter(_.contains("warning")).count)
转换并行化

RDD的转换操作时并行化计算的,但是多个RDD的转换同样是可以并行的,参考如下

  1. val dataList:Array[RDD[Int]] =
  2. val result = dataList.map(_.map(_.sum));

上面的例子中,第一个map是便利Array变量,串行的计算每个RDD中的每行的sum。由于每个RDD之间计算是没有逻辑联系的,所以理论上是可以将RDD的计算并行化的,在scala中可以轻松试下,如下

  1. val dataList:Array[RDD[Int]] =
  2. val sumList = dataList.par.map(_.map(_.sum))
减少shuffle网络传输

一般而言,网络I/O开销是很大的,减少网络开销,可以显著加快计算效率。任意两个RDD的shuffle操作(join等)的大致过程如下,

用户数据userData和事件events数据通过用户id连接,那么会在网络中传到另外一个节点,这个过程中,有两个网络传输过程。Spark的默认是完成这两个过程。但是,如果你多告诉spark一些信息,spark可以优化,只执行一个网络传输。可以通过使用、HashPartition,在userData”本地”先分区,然后要求events直接shuffle到userData的节点上,那么就减少了一部分网络传输,减少后的效果如下,

虚线部分都是在本地完成的,没有网络传输。在数据加载时,就按照key进行partition,这样可以经一部的减少本地的HashPartition的过程,示例代码如下

  1. val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://…")
  2. .partitionBy(new HashPartitioner(100))
  3. .persist()

注意,上面一定要persist,否则会重复计算多次。100用来指定并行数量。

使用高性能的算子
  1. 使用reduceByKey/aggregateByKey替代groupByKey
    reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。
    使用map-side预聚合的shuffle操作

  2. 使用mapPartitions替代普通map
    mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重!

  3. 使用foreachPartitions替代foreach
    原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写MySQL,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。实践中发现,对于1万条左右的数据量写mysql,性能可以提升30%以上。

  4. 使用filter之后进行coalesce操作
    通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。

  5. 使用repartitionAndSortWithinPartitions替代repartition与sort类操作
    repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。

广播大变量

有时在开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能。

在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能。

因此对于上述情况,如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。

使用Kryo优化序列化性能

在Spark中,主要有三个地方涉及到了序列化:

  1. 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。
  2. 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
  3. 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。

  1. // 创建SparkConf对象。
  2. val conf = new SparkConf().setMaster(...).setAppName(...)
  3. // 设置序列化器为KryoSerializer。
  4. conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  5. // 注册要序列化的自定义类型。
  6. conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))