Flink 启动流程

Posted by danner on July 8, 2020

Flink 1.10

前面几篇文章从各个组件的角度阐述了 Flink 任务的启动,本节将以 Flink On YarnPer Job 模式将所有知识点串起来。

以下从本地和集群两个角度来说明

本地

flink run 

当执行上面的命令后,开始任务的提交:

下面来说说在 Yarn 集群上,JobGraph 是如何提交的。此时的 Executor 是 YarnJobClusterExecutor,由于生成 StreamGraph 代码是相同的,直接从 YarnJobClusterExecutor 开始 (直接执行 flink run 是跑在yarn 集群的 per job任务)。

// org.apache.flink.yarn.executors.YarnJobClusterExecutor
public class YarnJobClusterExecutor extends AbstractJobClusterExecutor<ApplicationId, YarnClusterClientFactory> {

	public static final String NAME = "yarn-per-job";

	public YarnJobClusterExecutor() {
		super(new YarnClusterClientFactory());
	}
}
// org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor
public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements PipelineExecutor {

	private static final Logger LOG = LoggerFactory.getLogger(AbstractJobClusterExecutor.class);
	private final ClientFactory clusterClientFactory;
	public AbstractJobClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) {
		this.clusterClientFactory = checkNotNull(clusterClientFactory);
	}
	@Override
	public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
		// 生成 jobgraph 过程后续介绍
    final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration);
    // 获取 YarnClusterDescriptor,deployJobCluster 就提交 jobGraph 到yarn
    // 这里可以去看看 yarn-seesion 的启动流程,clusterClientFactory = 	YarnClusterClientFactory
		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
			final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
			// 计算内存,slot
      final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
			final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor
					.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
			LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
			return CompletableFuture.completedFuture(
					new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID()));
		}
	}
}
// org.apache.flink.yarn.YarnClusterDescriptor#deployJobCluster
public ClusterClientProvider<ApplicationId> deployJobCluster(
  ClusterSpecification clusterSpecification,
  JobGraph jobGraph,
  boolean detached) throws ClusterDeploymentException {
  try {
    return deployInternal(
      clusterSpecification,
      "Flink per-job cluster",
      getYarnJobClusterEntrypoint(),
      jobGraph,
      detached);
  } catch (Exception e) {
    throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
  }
}
-> startAppMaster
// org.apache.flink.yarn.YarnClusterDescriptor#startAppMaster
// 以下是 per-job 流程
初始化 HDFS获取 HDFS 目录(当前用户在 hsfs 上的目录)
收集要上传到 HDFS 的文件 (systemShipFiles) flink jar配置文件等
用户缓存文件上传到 HDFS并将目录保存到 jobGraph
上传之前收集文件目录上传到 HDFS
上传 flink-conf 配置文件
per-job 模式下jobGraph 写入文件上传到 HDFS
准备 AppMasterContainer
构造 appMasterEnv
 yarn 提交任务死等 yarn 返回任务状态
	  running正常提交
		finish任务执行完毕
		FAILED/KILLED提交异常

至此本地的提交流程结束,结合源码可以封装个自己的 Yarn 提交代码。

集群

clusterDescriptor.deployJobCluster 执行时会执行 ClusterEntrypointFlink 启动流程之 JobManager 中介绍的是 Standalone 模式下,作为对比看看 YarnJobClusterEntrypoint 有什么不同。

// org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
// ------------------------------------------------------------------------
//  The executable entry point for the Yarn Application Master Process
//  for a single Flink job.
// ------------------------------------------------------------------------
public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
  protected DefaultDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) throws IOException {
	return DefaultDispatcherResourceManagerComponentFactory.createJobComponentFactory(
		YarnResourceManagerFactory.getInstance(),
    // 获取 JobGraph HDFS 地址
		FileJobGraphRetriever.createFrom(configuration, getUsrLibDir(configuration)));
  }
...
  // Standalone 模式下相同步骤
}

集群上主要是启动 WebMonitorEndpointResourceManagerDispatcherRunner 组件;DispatcherRunner 根据获取到的 JobGraph 启动 JobMaster;JobMaster 将 JobMaster 生成 ExecutionGraph ,向 ResourceManager 申请 slot 资源开始部署 task。

最新版本1.11支持 Application 模式,可以将 JobGraph 生成也放到集群,减少本地压力。

总结

以下内容摘抄参考资料一

本地流程

  • 与 Session-Cluster 模式类似,入口也为CliFrontend#main
  • 解析处理参数
  • 根据用户 jar、main、程序参数、savepoint 信息生成 PackagedProgram
  • 根据 PackagedProgram 创建 JobGraph(对于非分离模式还是和Session模式一样,模式Session-Cluster)
  • 获取集群资源信息(createClusterDescriptor,getClusterSpecification)
  • 部署集群 YarnClusterDesriptor#deployJobCluster -> AbstractYarnClusterDescriptor#deployInternal;后面流程与Session-Cluster类似,值得注意的是在 AbstractYarnClusterDescriptor#startAppMaster 中与 Session-Cluster 有一个显著不同的就是其会将任务的JobGraph上传至Hdfs供后续服务端使用
    • 初始化文件系统(HDFS)
    • 将 log4j、logback、flink-conf.yaml、jar 包上传至 HDFS
    • 构造 AppMaster 的 Container(确定 Container 进程的入口类 YarnJobClusterEntrypoint ),构造相应的 Env
    • YarnClient 向 Yarn 提交 Container 申请
    • 跟踪 ApplicationReport 状态(确定是否启动成功,可能会由于资源不够,一直等待)
  • 启动成功后将对应的 ip 和 port 写入 flinkConfiguration 中
  • 创建与将集群交互的 ClusterClient

远端流程

  • 远端宿主在 Container 中的集群入口为YarnJobClusterEntrypoint#main
  • ClusterEntrypoint#runClusterEntrypoint -> ClusterEntrypoint#startCluster启动集群
  • 创建 JobDispatcherResourceManagerComponentFactory(用于创建JobDispatcherResourceManagerComponent)
  • 创建 ResourceManager(YarnResourceManager)、Dispatcher(MiniDispatcher),其中在创建 MiniDispatcher 时会从之前的JobGraph文件中读取出JobGraph,并启动进行ZK选举
  • 当为主时会调用Dispatcher#grantLeadership方法
    • Dispatcher#recoverJobs 恢复任务,获取 JobGraph
    • Dispatcher#tryAcceptLeadershipAndRunJobs 确认获取主并开始运行任务
      • Dispatcher#runJob 开始运行任务 ==> 创建 JobManagerRunner 并启动
      • 创建并 JobMaster
      • 生成 ExecutionGraph,向 ResourceManager 申请 slot 部署

参考资料

【Flink】深入理解Flink-On-Yarn模式

一张图轻松掌握 Flink on YARN 基础架构与启动流程