Spark 持久化

Posted by danner on March 6, 2019

Spark 中的 RDD 若没有做持久化处理,那么每用一次该 RDD 都要从 source 开始重新计算

val inputFile = "input/user.txt"
val rdd = sc.textFile(inputFile)

val seriRdd = rdd.flatMap(_.split(" ")).map(x => {
    (TestSerialization(x), 1)
})
seriRdd.reduceByKey(_+_).printInfo()
seriRdd.filter(_._1.equals(TestSerialization("1000001"))).printInfo()

上面这段代码,seriRdd 的生成过程 textfile->flatmap->map 会被执行两次,是因为 seriRdd 没有被持久化。很显然这很浪费,如果 seriRdd 持久化,就节省了一次计算时间。在 Spark 中, RDDSpark SQLSpark Streaming 默认的持久化机制是不同的。以下都是针对 cache API 而言。

RDD

RDD 中,持久化是 lazy,只有在第一次计算时才会触发持久化操作,但删除持久化是 eagerRDD cache 默认的存储级别是 StorageLevel.MEMORY_ONLY:只保存在内存

Spark SQL

Spark SQL 中可以使用 dataFrame.cache() 来持久化 dataFrame。 同理在 Spark SQL 中,持久化也是 lazy 删除是 eager,但存储级别是 MEMORY_AND_DISK:显然对于 SQL 来说重新计算 DF 的代价比从硬盘读数据还大。

Spark Streaming

Spark Streamingcache 操作也是 lazy,删除是 eager;区别在于存储级别是 MEMORY_ONLY_SERStreaming 中的窗口操作和带状态算子默认都是对 RDD 持久化。

总结

持久化是 lazy,删除持久化是 eager

默认持久化存储级别:

  • RDDmemory_only
  • Spark SQLmemory_and_disk
  • Spark Streamingmemory_only_ser