背景
Spark on YARN
每次启动时会将本地的 spark
jar
和 conf
上传到 HDFS
,这样会消耗很长的时间
[hadoop@danner000 jars]$ spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster spark-examples_2.11-2.4.4.jar 3
...
`19/10/18 13:58:26 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.`
`19/10/18 13:58:30 INFO yarn.Client: Uploading resource file:/tmp/spark-294ab9b7-97ff-4ffa-8e4f-ae44a89dd5da/__spark_libs__1410305138065236635.zip -> hdfs://192.168.22.147:9000/user/hadoop/.sparkStaging/application_1571146456067_0024/__spark_libs__1410305138065236635.zip`
19/10/18 13:58:44 INFO yarn.Client: Uploading resource file:/home/hadoop/app/spark-2.4.4-bin-2.6.0-cdh5.15.1/examples/jars/spark-examples_2.11-2.4.4.jar -> hdfs://192.168.22.147:9000/user/hadoop/.sparkStaging/application_1571146456067_0024/spark-examples_2.11-2.4.4.jar
19/10/18 13:58:45 INFO yarn.Client: Uploading resource file:/tmp/spark-294ab9b7-97ff-4ffa-8e4f-ae44a89dd5da/__spark_conf__5888474803491307773.zip -> hdfs://192.168.22.147:9000/user/hadoop/.sparkStaging/application_1571146456067_0024/__spark_conf__.zip
...
查看上面日志,是由于没有设置 spark.yarn.archive
或 spark.yarn.jars
,所以每次启动的时候都会上传libs
优化
既然知道是哪个属性的原因,那我们就从源码里看看如何设置
// org.apache.spark.deploy.yarn.Client
private def createContainerLaunchContext {
...
val appStagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
...
val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)
...
}
// org.apache.spark.deploy.yarn.Client
def prepareLocalResources {
...
def distribute {
val trimmedPath = path.trim()
val localURI = Utils.resolveURI(trimmedPath)
if (localURI.getScheme != LOCAL_SCHEME) {
if (addDistributedUri(localURI)) {
val localPath = getQualifiedLocalPath(localURI, hadoopConf)
val linkname = targetDir.map(_ + "/").getOrElse("") +
destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName())
val destPath = copyFileToRemote(destDir, localPath, replication, symlinkCache)
val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
distCacheMgr.addResource(
destFs, hadoopConf, destPath, localResources, resType, linkname, statCache,
appMasterOnly = appMasterOnly)
(false, linkname)
} else {
(false, null)
}
} else {
(true, trimmedPath)
}
}
...
/**
* Add Spark to the cache. There are two settings that control what files to add to the cache:
* - if a Spark archive is defined, use the archive. The archive is expected to contain
* jar files at its root directory.
* - if a list of jars is provided, filter the non-local ones, resolve globs, and
* add the found files to the cache.
*
* Note that the archive cannot be a "local" URI. If none of the above settings are found,
* then upload all files found in $SPARK_HOME/jars.
*/
val sparkArchive = sparkConf.get(SPARK_ARCHIVE)
if (sparkArchive.isDefined) {
val archive = sparkArchive.get
require(!isLocalUri(archive), s"${SPARK_ARCHIVE.key} cannot be a local URI.")
distribute(Utils.resolveURI(archive).toString,
resType = LocalResourceType.ARCHIVE,
destName = Some(LOCALIZED_LIB_DIR))
}else {
sparkConf.get(SPARK_JARS) match {
case Some(jars) =>{
// 操作类似 SPARK_ARCHIVE,把 SPARK_JARS 上传;两者设置一个即可
...
}
case None =>
// No configuration, so fall back to uploading local jar files.
logWarning(s"Neither ${SPARK_JARS.key} nor ${SPARK_ARCHIVE.key} is set, falling back " + "to uploading libraries under SPARK_HOME.")
// 把 spark/jars 所有 jar 上传
}
...
}
private[yarn] def copyFileToRemote {
val destFs = destDir.getFileSystem(hadoopConf)
val srcFs = srcPath.getFileSystem(hadoopConf)
var destPath = srcPath
if (force || !compareFs(srcFs, destFs) || "file".equals(srcFs.getScheme)) {
destPath = new Path(destDir, destName.getOrElse(srcPath.getName()))
logInfo(s"Uploading resource $srcPath -> $destPath")
FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf)
destFs.setReplication(destPath, replication)
destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION))
} else {
logInfo(s"Source and destination file systems are the same. Not copying $srcPath")
}
...
}
- 设置
SPARK_ARCHIVE
后,将SPARK_ARCHIVE
目录分发 (distribute) distribute
中判断是否为本地文件和是否已上传copyFileToRemote
判断原文件和目标文件是否为同个文件系统,若相同则不上传destPath
在createContainerLaunchContext
函数被赋值appStagingDirPath
,根据 hadoop job 执行流程 可知StagingDir
是Yarn job
为执行任务存放文件而临时创建的目录;在本案例中就是HDFS
目录
由以上分析可知,只需将 SPARK_ARCHIVE
设置为 hdfs
目录就可以避免每次上传的困扰。
- 将
spark/jars/*.jar
打包成zip
并上传到HDFS
spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster `--conf spark.yarn.archive=hdfs://192.168.22.147:9000/lib/dep/spark/spark_jar.zip` spark-examples_2.11-2.4.4.jar 3
...
19/10/18 12:39:16 INFO yarn.Client: Preparing resources for our AM container
19/10/18 12:39:16 INFO yarn.Client: `Source and destination file systems are the same. Not copying hdfs://192.168.22.147:9000/lib/dep/spark/spark_jar.tar`
...
spark.yarn.archive
也可以设置在spark-defaults.conf
;spark.yarn.jars
相同操作,两者等效
Conf
看日志可知,每次启动也都上传,它会上传 SPARK_CONF_DIR
和 HADOOP_CONF_DIR
目录下的文件。但此 Conf
无法优化,因为就是算是源文件和目标文件在同个文件系统,也会强制复制
// This code forces the archive to be copied, so that unit tests pass (since in that case both
// file systems are the same and the archive wouldn't normally be copied). In most (all?)
// deployments, the archive would be copied anyway, since it's a temp file in the local file
// system.
val remoteConfArchivePath = new Path(destDir, LOCALIZED_CONF_ARCHIVE)
val remoteFs = FileSystem.get(remoteConfArchivePath.toUri(), hadoopConf)
sparkConf.set(CACHED_CONF_ARCHIVE, remoteConfArchivePath.toString())
val localConfArchive = new Path(createConfArchive().toURI())
copyFileToRemote(destDir, localConfArchive, replication, symlinkCache, force = true,
destName = Some(LOCALIZED_CONF_ARCHIVE))