Spark 算子剖析

算子

Posted by danner on June 3, 2018

Spark 2.4.4 源码

Transformations

https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations

filter

def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
      this,
      (context, pid, iter) => iter.filter(cleanF),
      preservesPartitioning = true)
 }

map

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

mapPartitions

def mapPartitions[U: ClassTag](
  f: Iterator[T] => Iterator[U],
  preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
  this,
    (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
  preservesPartitioning)
}

都返回 MapPartitionsRDDmap 是对每个元素操作,而 mapPartitions 是对每个 partition 操作

mapPartitionsWithIndex

def mapPartitionsWithIndex[U: ClassTag](
  f: (Int, Iterator[T]) => Iterator[U],
  preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
  this,
    (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
  preservesPartitioning)
}

输入比 mapPartitions 多了个分区的序列号

distinct

def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
	map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}

底层是 reduceByKey 实现

union

def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = withScope {
	union(Seq(first) ++ rest)
}

两个 RDD 合并(没有去重),第二个 RDD 接在 第一个 RDD 后面

intersection

def intersection(other: RDD[T]): RDD[T] = withScope {
    this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
        .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
        .keys
}

两个 RDD交集且去重

  • cogroup 分组
  • leftGroup.nonEmpty && rightGroup.nonEmpty 取交集
  • keys保证去重

subtract

def subtract(
  other: RDD[T],
  p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    if (partitioner == Some(p)) {
      val p2 = new Partitioner() {
        override def numPartitions: Int = p.numPartitions
        override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)
      }
      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
    } else {
      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
    }
}

取第一个 RDD 的差集不去重;从 distinct 开始都是相同套路,加 null 变为 PairRDDFunctions 然后分组

glom

def glom(): RDD[Array[T]] = withScope {
	new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}

每个分区中的值组装成一个数组

groupBy

def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
  : RDD[(K, Iterable[T])] = withScope {
    val cleanF = sc.clean(f)
    this.map(t => (cleanF(t), t)).groupByKey(p)
}

groupBy 底层是调用 groupByKey实现

groupByKey

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) => CompactBuffer(v)
    // 每个分区内相同 key 的 value 之间操作 
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    // 不同分区之间,相同 key 的 value 之间操作 
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}

注意:groupByKey 没有在 mapCombine且不带任何操作,只是通过 combineByKeyWithClassTag 分组

reduceByKey

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

groupByKey 有几点不同:

  • groupByKey返回 CompactBuffer[V],而 reduceByKey 返回 V
  • reduceByKey 进行 mapSideCombine,这样效率比 groupByKey高,但操作有限制,不能用平均

sortBy

def sortBy[K](
  f: (T) => K,
  ascending: Boolean = true,
  numPartitions: Int = this.partitions.length)
  (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
this.keyBy[K](f)
    .sortByKey(ascending, numPartitions)
    .values
 }

底层调用 sortByKey 实现

sortByKey

def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
  : RDD[(K, V)] = self.withScope
    {
    val part = new RangePartitioner(numPartitions, self, ascending)
    new ShuffledRDD[K, V, V](self, part)
      .setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

分区是用 RangePartitioner 而不是一般的HashPartitioner,表示每个分区之间是有大小关系,说明 sortByKey 输出是按全局排序

join

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
    this.cogroup(other, partitioner).flatMapValues( pair =>
      for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
    )
}

join 底层使用 cogroup,返回两个 RDD都存在的 key

leftOuterJoin

def leftOuterJoin[W](
  other: RDD[(K, W)],
  partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope {
    this.cogroup(other, partitioner).flatMapValues { pair =>
      if (pair._2.isEmpty) {
        pair._1.iterator.map(v => (v, None))
      } else {
        for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
      }
    }
}

左连接:返回值 Tuple的 第二个值是 Option 表示有可能为空,底层也是 cogroup

rightOuterJoin

def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
  : RDD[(K, (Option[V], W))] = self.withScope {
    this.cogroup(other, partitioner).flatMapValues { pair =>
      if (pair._1.isEmpty) {
        pair._2.iterator.map(w => (None, w))
      } else {
        for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
      }
    }
}

右连接:返回值 Tuple的 第一个值是 Option 表示有可能为空,底层也是 cogroup

fullouterJoin

def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
  : RDD[(K, (Option[V], Option[W]))] = self.withScope {
    this.cogroup(other, partitioner).flatMapValues {
      case (vs, Seq()) => vs.iterator.map(v => (Some(v), None))
      case (Seq(), ws) => ws.iterator.map(w => (None, Some(w)))
      case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w))
    }
}

全连接:返回值 Tuple的两个值都是 Option 表示有可能为空,底层也是 cogroup

cogroup

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
  : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
    if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
    val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
    cg.mapValues { case Array(vs, w1s) =>
      (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
    }
}

返回值的 key = 两个 RDD 之和,value 是个 Tuple包含 CompactBuffer中是每个 RDD 内相同 key 的值

zipWithIndex

RDD 中每个元素增加 index变为 Tuple

repartition

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
	coalesce(numPartitions, shuffle = true)
}

重新分区,默认产生shuffle,底层是 coalesce实现

coalesce

def coalesce(numPartitions: Int, shuffle: Boolean = false,
             partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
...

默认不产生shuffle,需要指定 rdd.coalesce(num,true)

Action

https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions

Action 最明显的特征是其会调用 runJob 来执行一个 Job

runJob

collect

def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
}

RDD 内所有元素组装成数组,返回到 driver's,所以返回的内容不能太大防止 driver's 内存溢出。

foreach

def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

作用在每个元素,结果在 driver's,与 map 有两点不同

  • foreach 无返回值
  • foreach 结果在driver's,而 map 结果在 Partition
  • action

foreachPartition

def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}

作用在每个分区,输入是 Iterator,结果是在 Partition,与 mapPartitions不同

  • foreachPartition 无返回值
  • action

count

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum  

返回 RDD 元素个数

reduce

def reduce(f: (T, T) => T): T = withScope {
    val cleanF = sc.clean(f)
    val reducePartition: Iterator[T] => Option[T] = iter => {
      if (iter.hasNext) {
        Some(iter.reduceLeft(cleanF))
      } else {
        None
      }
    }
    var jobResult: Option[T] = None
    val mergeResult = (index: Int, taskResult: Option[T]) => {
      if (taskResult.isDefined) {
        jobResult = jobResult match {
          case Some(value) => Some(f(value, taskResult.get))
          case None => taskResult
        }
      }
    }
    sc.runJob(this, reducePartition, mergeResult)
    // Get the final result out of our Option, or throw an exception if the RDD was empty
    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

函数f作用在每个Partition ,然后再将 f 作用在 Partition 之间(此时分区只有一个值)

f 是 求平均值时,分区数对最终的结果是有影响的(分区内的元素分布变了)

first

def first(): T = withScope {
    take(1) match {
      case Array(t) => t
      case _ => throw new UnsupportedOperationException("empty collection")
    }
}

获取 RDD 第一个元素,底层是 take 实现

take

def take(num: Int): Array[T] = withScope {
    val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2)
    if (num == 0) {
      new Array[T](0)
    } else {
      val buf = new ArrayBuffer[T]
      val totalParts = this.partitions.length
      var partsScanned = 0
      while (buf.size < num && partsScanned < totalParts) {
        // The number of partitions to try in this iteration. It is ok for this number to be
        // greater than totalParts because we actually cap it at totalParts in runJob.
        var numPartsToTry = 1L
        val left = num - buf.size
        if (partsScanned > 0) {
          // If we didn't find any rows after the previous iteration, quadruple and retry.
          // Otherwise, interpolate the number of partitions we need to try, but overestimate
          // it by 50%. We also cap the estimation in the end.
          if (buf.isEmpty) {
            numPartsToTry = partsScanned * scaleUpFactor
          } else {
            // As left > 0, numPartsToTry is always >= 1
            numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
            numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
          }
        }

        val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
        val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)

        res.foreach(buf ++= _.take(num - buf.size))
        partsScanned += p.size
      }

      buf.toArray
    }
}

top

def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
	takeOrdered(num)(ord.reverse)
}

降序排列后,获取前几个元素

takeOrdered

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
    if (num == 0) {
      Array.empty
    } else {
      val mapRDDs = mapPartitions { items =>
        // Priority keeps the largest elements, so let's reverse the ordering.
        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
        queue ++= collectionUtils.takeOrdered(items, num)(ord)
        Iterator.single(queue)
      }
      if (mapRDDs.partitions.length == 0) {
        Array.empty
      } else {
        mapRDDs.reduce { (queue1, queue2) =>
          queue1 ++= queue2
          queue1
        }.toArray.sorted(ord)
      }
    }
}

重点在于BoundedPriorityQueue, 分两步:

  • new BoundedPriorityQueue[T](num)(ord.reverse)固定长度,获取分区内的前 num 元素
  • queue1 ++= queue2 整个 RDD

countByKey

def countByKey(): Map[K, Long] = self.withScope {
    self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
}

统计每个 key 个数,结果保存在 driver's

collectAsMap

def collectAsMap(): Map[K, V] = self.withScope {
    val data = self.collect()
    val map = new mutable.HashMap[K, V]
    map.sizeHint(data.length)
    data.foreach { pair => map.put(pair._1, pair._2) }
    map
}

pairs RDD 收集并组装成 Map存到driver's