本文是接着 spark-shell 脚本剖析,上文描述作业最终是通过 org.apache.spark.deploy.SparkSubmit
提交任务。那么这次就来看看具体的执行流程。
选择集群
// org.apache.spark.deploy.SparkSubmit
override def main(args: Array[String]): Unit = {
val submit = new SparkSubmit() {
self =>
override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
new SparkSubmitArguments(args) {
...
}
}
...
override def doSubmit(args: Array[String]): Unit = {
try {
super.doSubmit(args)
} catch {
...
}
}
}
submit.doSubmit(args)
}
def doSubmit(args: Array[String]): Unit = {
...
// 创建 SparkSubmitArguments 来解析 参数
val appArgs = parseArguments(args)
if (appArgs.verbose) {
logInfo(appArgs.toString)
}
// action 不同调用对应的函数,本例是 submit
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
def doRunMain(): Unit = {
...
runMain(args, uninitLog)
...
}
...
doRunMain()
...
}
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
// 解析参数并准备环境变量 (args,sparkConf)
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
...
// classpath
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
// 不同的运行环境 yarn,k8s,mesos,mainclass不同,是对应环境的 application
// 反射,本例中 org.apache.spark.deploy.yarn.YarnClusterApplication
// 如果是 client 模式,childMainClass 就是 --class_name 指定的类
mainClass = Utils.classForName(childMainClass)
...
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.newInstance().asInstanceOf[SparkApplication]
} else {
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
new JavaMainApplication(mainClass)
}
...
// 启动 application
app.start(childArgs.toArray, sparkConf)
}
总结起来就是一句话:解析参数准备运行时环境变量,启动对应集群的Application Master
集群
// org.apache.spark.deploy.yarn.YarnClusterApplication
private[spark] class YarnClusterApplication extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
// SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
// so remove them from sparkConf here for yarn mode.
conf.remove("spark.jars")
conf.remove("spark.files")
new Client(new ClientArguments(args), conf).run()
}
}
// org.apache.spark.deploy.yarn.Client
// 先确定 Executor、Driver 内存资源、创建 job 目录
def run(): Unit = {
this.appId = submitApplication()
// 查看任务提交状态
...
}
def submitApplication(): ApplicationId = {
// 任务提交到 yarn 之前的准备工作
...
// Get a new application from our RM
val newApp = yarnClient.createApplication()
...
// Set up the appropriate contexts to launch our AM
// 包含把 spark jar 上产到 job 目录,详细看 Spark on YARN 加速启动
val containerContext = createContainerLaunchContext(newAppResponse)
// 包含 JVM 参数,GC
val appContext = createApplicationSubmissionContext(newApp, containerContext)
// Finally, submit and monitor the application
logInfo(s"Submitting application $appId to ResourceManager")
// 实际调用 YarnClientImpl.submitApplication
yarnClient.submitApplication(appContext)
...
}