Spark 2.4.4 源码
Transformations
https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations
filter
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}
map
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
mapPartitions
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
都返回 MapPartitionsRDD,map 是对每个元素操作,而 mapPartitions 是对每个 partition 操作
mapPartitionsWithIndex
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning)
}
输入比 mapPartitions 多了个分区的序列号
distinct
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}
底层是 reduceByKey 实现
union
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = withScope {
union(Seq(first) ++ rest)
}
两个 RDD 合并(没有去重),第二个 RDD 接在 第一个 RDD 后面
intersection
def intersection(other: RDD[T]): RDD[T] = withScope {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
}
两个 RDD 取交集且去重:
cogroup分组leftGroup.nonEmpty && rightGroup.nonEmpty取交集- 取
keys保证去重
subtract
def subtract(
other: RDD[T],
p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
if (partitioner == Some(p)) {
val p2 = new Partitioner() {
override def numPartitions: Int = p.numPartitions
override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)
}
this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
} else {
this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
}
}
取第一个 RDD 的差集不去重;从 distinct 开始都是相同套路,加 null 变为 PairRDDFunctions 然后分组
glom
def glom(): RDD[Array[T]] = withScope {
new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}
将每个分区中的值组装成一个数组
groupBy
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
: RDD[(K, Iterable[T])] = withScope {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(p)
}
groupBy 底层是调用 groupByKey实现
groupByKey
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => CompactBuffer(v)
// 每个分区内相同 key 的 value 之间操作
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
// 不同分区之间,相同 key 的 value 之间操作
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
注意:groupByKey 没有在 map端 Combine且不带任何操作,只是通过 combineByKeyWithClassTag 分组
reduceByKey
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
与 groupByKey 有几点不同:
groupByKey返回CompactBuffer[V],而reduceByKey返回VreduceByKey进行mapSideCombine,这样效率比groupByKey高,但操作有限制,不能用平均
sortBy
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
this.keyBy[K](f)
.sortByKey(ascending, numPartitions)
.values
}
底层调用 sortByKey 实现
sortByKey
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
分区是用
RangePartitioner而不是一般的HashPartitioner,表示每个分区之间是有大小关系,说明sortByKey输出是按全局排序
join
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}
join 底层使用 cogroup,返回两个 RDD都存在的 key 值
leftOuterJoin
def leftOuterJoin[W](
other: RDD[(K, W)],
partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._2.isEmpty) {
pair._1.iterator.map(v => (v, None))
} else {
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
}
}
}
左连接:返回值 Tuple的 第二个值是 Option 表示有可能为空,底层也是 cogroup
rightOuterJoin
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues { pair =>
if (pair._1.isEmpty) {
pair._2.iterator.map(w => (None, w))
} else {
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
}
}
}
右连接:返回值 Tuple的 第一个值是 Option 表示有可能为空,底层也是 cogroup
fullouterJoin
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], Option[W]))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues {
case (vs, Seq()) => vs.iterator.map(v => (Some(v), None))
case (Seq(), ws) => ws.iterator.map(w => (None, Some(w)))
case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w))
}
}
全连接:返回值 Tuple的两个值都是 Option 表示有可能为空,底层也是 cogroup
cogroup
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Array(vs, w1s) =>
(vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
}
}
返回值的 key = 两个 RDD 之和,value 是个 Tuple包含 CompactBuffer中是每个 RDD 内相同 key 的值
zipWithIndex
给 RDD 中每个元素增加 index变为 Tuple
repartition
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
重新分区,默认产生shuffle,底层是 coalesce实现
coalesce
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
...
默认不产生shuffle,需要指定 rdd.coalesce(num,true)
Action
https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions
Action 最明显的特征是其会调用 runJob 来执行一个 Job
runJob
collect
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
将 RDD 内所有元素组装成数组,返回到 driver's,所以返回的内容不能太大防止 driver's 内存溢出。
foreach
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
作用在每个元素,结果在 driver's,与 map 有两点不同
foreach无返回值foreach结果在driver's,而map结果在Partitionaction
foreachPartition
def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}
作用在每个分区,输入是 Iterator,结果是在 Partition,与 mapPartitions不同
foreachPartition无返回值action
count
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
返回 RDD 元素个数
reduce
def reduce(f: (T, T) => T): T = withScope {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
}
}
}
sc.runJob(this, reducePartition, mergeResult)
// Get the final result out of our Option, or throw an exception if the RDD was empty
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
函数f 是先作用在每个Partition ,然后再将 f 作用在 Partition 之间(此时分区只有一个值)
当 f 是 求平均值时,分区数对最终的结果是有影响的(分区内的元素分布变了)
first
def first(): T = withScope {
take(1) match {
case Array(t) => t
case _ => throw new UnsupportedOperationException("empty collection")
}
}
获取 RDD 第一个元素,底层是 take 实现
take
def take(num: Int): Array[T] = withScope {
val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2)
if (num == 0) {
new Array[T](0)
} else {
val buf = new ArrayBuffer[T]
val totalParts = this.partitions.length
var partsScanned = 0
while (buf.size < num && partsScanned < totalParts) {
// The number of partitions to try in this iteration. It is ok for this number to be
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1L
val left = num - buf.size
if (partsScanned > 0) {
// If we didn't find any rows after the previous iteration, quadruple and retry.
// Otherwise, interpolate the number of partitions we need to try, but overestimate
// it by 50%. We also cap the estimation in the end.
if (buf.isEmpty) {
numPartsToTry = partsScanned * scaleUpFactor
} else {
// As left > 0, numPartsToTry is always >= 1
numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
}
}
val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
res.foreach(buf ++= _.take(num - buf.size))
partsScanned += p.size
}
buf.toArray
}
}
top
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
takeOrdered(num)(ord.reverse)
}
降序排列后,获取前几个元素
takeOrdered
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
if (num == 0) {
Array.empty
} else {
val mapRDDs = mapPartitions { items =>
// Priority keeps the largest elements, so let's reverse the ordering.
val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
queue ++= collectionUtils.takeOrdered(items, num)(ord)
Iterator.single(queue)
}
if (mapRDDs.partitions.length == 0) {
Array.empty
} else {
mapRDDs.reduce { (queue1, queue2) =>
queue1 ++= queue2
queue1
}.toArray.sorted(ord)
}
}
}
重点在于BoundedPriorityQueue, 分两步:
new BoundedPriorityQueue[T](num)(ord.reverse)固定长度,获取分区内的前num元素queue1 ++= queue2整个RDD
countByKey
def countByKey(): Map[K, Long] = self.withScope {
self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
}
统计每个 key 个数,结果保存在 driver's
collectAsMap
def collectAsMap(): Map[K, V] = self.withScope {
val data = self.collect()
val map = new mutable.HashMap[K, V]
map.sizeHint(data.length)
data.foreach { pair => map.put(pair._1, pair._2) }
map
}
将 pairs RDD 收集并组装成 Map存到driver's