Spark 作业执行流程

Posted by danner on November 5, 2019

本文是接着 spark-shell 脚本剖析,上文描述作业最终是通过 org.apache.spark.deploy.SparkSubmit 提交任务。那么这次就来看看具体的执行流程。

选择集群

// org.apache.spark.deploy.SparkSubmit
 override def main(args: Array[String]): Unit = {
    val submit = new SparkSubmit() {
      self =>
   override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
        new SparkSubmitArguments(args) {
        ...
        }
      }
		...
      override def doSubmit(args: Array[String]): Unit = {
        try {
          super.doSubmit(args)
        } catch {
          ...
        }
      }
    }
	submit.doSubmit(args)
 }
def doSubmit(args: Array[String]): Unit = {
	...
    // 创建 SparkSubmitArguments 来解析 参数
    val appArgs = parseArguments(args)
    if (appArgs.verbose) {
      logInfo(appArgs.toString)
    }
    // action 不同调用对应的函数,本例是 submit
    appArgs.action match {  
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()
    }
  }
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    def doRunMain(): Unit = {
		...
      runMain(args, uninitLog)
	  ...
    }
	...
    doRunMain()
	...
 }
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    // 解析参数并准备环境变量  (args,sparkConf)
    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
	...
    // classpath
	 for (jar <- childClasspath) {
      addJarToClasspath(jar, loader)
    }
    // 不同的运行环境 yarn,k8s,mesos,mainclass不同,是对应环境的 application
    // 反射,本例中 org.apache.spark.deploy.yarn.YarnClusterApplication
    // 如果是 client 模式,childMainClass 就是 --class_name 指定的类
	mainClass = Utils.classForName(childMainClass)
	...
	 val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      mainClass.newInstance().asInstanceOf[SparkApplication]
    } else {
      // SPARK-4170
      if (classOf[scala.App].isAssignableFrom(mainClass)) {
        logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
      }
      new JavaMainApplication(mainClass)
    }
	...
    // 启动 application
	app.start(childArgs.toArray, sparkConf)
 }

总结起来就是一句话:解析参数准备运行时环境变量,启动对应集群的Application Master

集群

// org.apache.spark.deploy.yarn.YarnClusterApplication
private[spark] class YarnClusterApplication extends SparkApplication {
  override def start(args: Array[String], conf: SparkConf): Unit = {
    // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
    // so remove them from sparkConf here for yarn mode.
    conf.remove("spark.jars")
    conf.remove("spark.files")
    new Client(new ClientArguments(args), conf).run()
  }
}
// org.apache.spark.deploy.yarn.Client
// 先确定 Executor、Driver 内存资源、创建 job 目录
def run(): Unit = {
    this.appId = submitApplication()
	// 查看任务提交状态
	...
}
def submitApplication(): ApplicationId = {
    // 任务提交到 yarn 之前的准备工作
    ...
     // Get a new application from our RM
      val newApp = yarnClient.createApplication()
    ...
     // Set up the appropriate contexts to launch our AM
      // 包含把 spark jar 上产到 job 目录,详细看 Spark on YARN 加速启动
      val containerContext = createContainerLaunchContext(newAppResponse)
      // 包含 JVM 参数,GC
      val appContext = createApplicationSubmissionContext(newApp, containerContext)

      // Finally, submit and monitor the application
      logInfo(s"Submitting application $appId to ResourceManager")
      // 实际调用 YarnClientImpl.submitApplication
      yarnClient.submitApplication(appContext)
    ...
}