Flink 1.10
前面几篇文章从各个组件的角度阐述了 Flink 任务的启动,本节将以 Flink On Yarn
的 Per Job
模式将所有知识点串起来。
以下从本地和集群两个角度来说明
本地
flink run
当执行上面的命令后,开始任务的提交:
- Flink 启动流程之 flink run 中最后是去执行任务的代码
- 深入剖析 Flink Straming WC流程 中生成
JobGraph
,并提交到集群;不同之处在于本例是 Yarn 集群
下面来说说在 Yarn 集群上,JobGraph 是如何提交的。此时的 Executor 是 YarnJobClusterExecutor
,由于生成 StreamGraph
代码是相同的,直接从 YarnJobClusterExecutor
开始 (直接执行 flink run 是跑在yarn 集群的 per job任务)。
// org.apache.flink.yarn.executors.YarnJobClusterExecutor
public class YarnJobClusterExecutor extends AbstractJobClusterExecutor<ApplicationId, YarnClusterClientFactory> {
public static final String NAME = "yarn-per-job";
public YarnJobClusterExecutor() {
super(new YarnClusterClientFactory());
}
}
// org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor
public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements PipelineExecutor {
private static final Logger LOG = LoggerFactory.getLogger(AbstractJobClusterExecutor.class);
private final ClientFactory clusterClientFactory;
public AbstractJobClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) {
this.clusterClientFactory = checkNotNull(clusterClientFactory);
}
@Override
public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
// 生成 jobgraph 过程后续介绍
final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration);
// 获取 YarnClusterDescriptor,deployJobCluster 就提交 jobGraph 到yarn
// 这里可以去看看 yarn-seesion 的启动流程,clusterClientFactory = YarnClusterClientFactory
try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
// 计算内存,slot
final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor
.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
return CompletableFuture.completedFuture(
new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID()));
}
}
}
// org.apache.flink.yarn.YarnClusterDescriptor#deployJobCluster
public ClusterClientProvider<ApplicationId> deployJobCluster(
ClusterSpecification clusterSpecification,
JobGraph jobGraph,
boolean detached) throws ClusterDeploymentException {
try {
return deployInternal(
clusterSpecification,
"Flink per-job cluster",
getYarnJobClusterEntrypoint(),
jobGraph,
detached);
} catch (Exception e) {
throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
}
}
-> startAppMaster
// org.apache.flink.yarn.YarnClusterDescriptor#startAppMaster
// 以下是 per-job 流程
一:初始化 HDFS,获取 HDFS 目录(当前用户在 hsfs 上的目录)
二:收集要上传到 HDFS 的文件 (systemShipFiles): flink jar、配置文件等
三:用户缓存文件上传到 HDFS,并将目录保存到 jobGraph
四:上传之前收集文件目录上传到 HDFS
五:上传 flink-conf 配置文件
六:per-job 模式下:jobGraph 写入文件上传到 HDFS
七:准备 AppMasterContainer
八:构造 appMasterEnv
九:向 yarn 提交任务,死等 yarn 返回任务状态:
running:正常提交
finish:任务执行完毕
FAILED/KILLED:提交异常
至此本地的提交流程结束,结合源码可以封装个自己的 Yarn
提交代码。
集群
clusterDescriptor.deployJobCluster
执行时会执行 ClusterEntrypoint。Flink 启动流程之 JobManager 中介绍的是 Standalone
模式下,作为对比看看 YarnJobClusterEntrypoint 有什么不同。
// org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
// ------------------------------------------------------------------------
// The executable entry point for the Yarn Application Master Process
// for a single Flink job.
// ------------------------------------------------------------------------
public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
protected DefaultDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) throws IOException {
return DefaultDispatcherResourceManagerComponentFactory.createJobComponentFactory(
YarnResourceManagerFactory.getInstance(),
// 获取 JobGraph HDFS 地址
FileJobGraphRetriever.createFrom(configuration, getUsrLibDir(configuration)));
}
...
// Standalone 模式下相同步骤
}
集群上主要是启动 WebMonitorEndpoint
、ResourceManager
、DispatcherRunner
组件;DispatcherRunner
根据获取到的 JobGraph 启动 JobMaster;JobMaster 将 JobMaster 生成 ExecutionGraph ,向 ResourceManager 申请 slot 资源开始部署 task。
最新版本1.11支持 Application 模式,可以将
JobGraph
生成也放到集群,减少本地压力。
总结
以下内容摘抄参考资料一
本地流程
- 与 Session-Cluster 模式类似,入口也为CliFrontend#main
- 解析处理参数
- 根据用户 jar、main、程序参数、savepoint 信息生成 PackagedProgram
- 根据 PackagedProgram 创建 JobGraph(对于非分离模式还是和Session模式一样,模式Session-Cluster)
- 获取集群资源信息(createClusterDescriptor,getClusterSpecification)
- 部署集群 YarnClusterDesriptor#deployJobCluster -> AbstractYarnClusterDescriptor#deployInternal;后面流程与Session-Cluster类似,值得注意的是在 AbstractYarnClusterDescriptor#startAppMaster 中与 Session-Cluster 有一个显著不同的就是其会将任务的JobGraph上传至Hdfs供后续服务端使用
- 初始化文件系统(HDFS)
- 将 log4j、logback、flink-conf.yaml、jar 包上传至 HDFS
- 构造 AppMaster 的 Container(确定 Container 进程的入口类 YarnJobClusterEntrypoint ),构造相应的 Env
- YarnClient 向 Yarn 提交 Container 申请
- 跟踪 ApplicationReport 状态(确定是否启动成功,可能会由于资源不够,一直等待)
- 启动成功后将对应的 ip 和 port 写入 flinkConfiguration 中
- 创建与将集群交互的 ClusterClient
远端流程
- 远端宿主在 Container 中的集群入口为YarnJobClusterEntrypoint#main
- ClusterEntrypoint#runClusterEntrypoint -> ClusterEntrypoint#startCluster启动集群
- 创建 JobDispatcherResourceManagerComponentFactory(用于创建JobDispatcherResourceManagerComponent)
- 创建 ResourceManager(YarnResourceManager)、Dispatcher(MiniDispatcher),其中在创建 MiniDispatcher 时会从之前的JobGraph文件中读取出JobGraph,并启动进行ZK选举
- 当为主时会调用Dispatcher#grantLeadership方法
- Dispatcher#recoverJobs 恢复任务,获取 JobGraph
- Dispatcher#tryAcceptLeadershipAndRunJobs 确认获取主并开始运行任务
- Dispatcher#runJob 开始运行任务 ==> 创建 JobManagerRunner 并启动
- 创建并 JobMaster
- 生成 ExecutionGraph,向 ResourceManager 申请 slot 部署