Flink 1.11
Flink 任务在运行之前会经历以下几个阶段:
Program -> StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行计划
JobGraph
生成 ExecutionGraph
JobVertex
DAG 提交任务以后(JobManager 生成),从 Source 节点开始排序,根据 JobVertex 生成ExecutionJobVertex
,根据jobVertex
的IntermediateDataSet
构建IntermediateResult
,然后IntermediateResult
构建上下游的依赖关系,形成ExecutionJobVertex
层面的 DAG 即ExecutionGraph
。
JobManager 在会启动一系列服务,其中包含 Dispatcher
// org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory
dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(...)
// org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory
public DispatcherRunner createDispatcherRunner(
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler,
JobGraphStoreFactory jobGraphStoreFactory,
Executor ioExecutor,
RpcService rpcService,
PartialDispatcherServices partialDispatcherServices) throws Exception {
final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory = dispatcherLeaderProcessFactoryFactory.createFactory(
jobGraphStoreFactory,
ioExecutor,
rpcService,
partialDispatcherServices,
fatalErrorHandler);
return DefaultDispatcherRunner.create(
leaderElectionService,
fatalErrorHandler,
dispatcherLeaderProcessFactory);
}
// org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory
public DispatcherLeaderProcessFactory createFactory(
JobGraphStoreFactory jobGraphStoreFactory,
Executor ioExecutor,
RpcService rpcService,
PartialDispatcherServices partialDispatcherServices,
FatalErrorHandler fatalErrorHandler) {
final JobGraph jobGraph;
try {
jobGraph = jobGraphRetriever.retrieveJobGraph(partialDispatcherServices.getConfiguration());
} catch (FlinkException e) {
throw new FlinkRuntimeException("Could not retrieve the JobGraph.", e);
}
final DefaultDispatcherGatewayServiceFactory defaultDispatcherServiceFactory = new DefaultDispatcherGatewayServiceFactory(
JobDispatcherFactory.INSTANCE,
rpcService,
partialDispatcherServices);
return new JobDispatcherLeaderProcessFactory(
defaultDispatcherServiceFactory,
jobGraph,
fatalErrorHandler);
}
// org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactory
JobDispatcherLeaderProcessFactory(
AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory dispatcherGatewayServiceFactory,
JobGraph jobGraph,
FatalErrorHandler fatalErrorHandler) {
this.dispatcherGatewayServiceFactory = dispatcherGatewayServiceFactory;
this.jobGraph = jobGraph;
this.fatalErrorHandler = fatalErrorHandler;
}
// 以上已取出 JobGraph
// 接下里开始 转换
public DispatcherLeaderProcess create(UUID leaderSessionID) {
return new JobDispatcherLeaderProcess(leaderSessionID, dispatcherGatewayServiceFactory, jobGraph, fatalErrorHandler);
}
// JM 是高可用的,当为主时会调用 Dispatcher.grantLeadership
// DefaultDispatcherRunner.grantLeadership
// -> DefaultDispatcherRunner.startNewDispatcherLeaderProcess
// -> DefaultDispatcherRunner.createNewDispatcherLeaderProcess
// -> JobDispatcherLeaderProcessFactory.create
// -> new JobDispatcherLeaderProcess
// -> MiniDispatcher.submitJob
参考资料
Flink 源码阅读笔记(3)- ExecutionGraph 的生成