Flink 源码之构建 ExecutionGraph

Posted by danner on November 5, 2020

Flink 1.11

Flink 任务在运行之前会经历以下几个阶段:

Program -> StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行计划

JobGraph 生成 ExecutionGraph

JobVertex DAG 提交任务以后(JobManager 生成),从 Source 节点开始排序,根据 JobVertex 生成 ExecutionJobVertex,根据 jobVertexIntermediateDataSet 构建 IntermediateResult,然后 IntermediateResult 构建上下游的依赖关系,形成 ExecutionJobVertex 层面的 DAG 即 ExecutionGraph

JobManager 在会启动一系列服务,其中包含 Dispatcher

// org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory
dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(...)
// org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory
public DispatcherRunner createDispatcherRunner(
    LeaderElectionService leaderElectionService,
    FatalErrorHandler fatalErrorHandler,
    JobGraphStoreFactory jobGraphStoreFactory,
    Executor ioExecutor,
    RpcService rpcService,
    PartialDispatcherServices partialDispatcherServices) throws Exception {

  final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory = dispatcherLeaderProcessFactoryFactory.createFactory(
    jobGraphStoreFactory,
    ioExecutor,
    rpcService,
    partialDispatcherServices,
    fatalErrorHandler);

  return DefaultDispatcherRunner.create(
    leaderElectionService,
    fatalErrorHandler,
    dispatcherLeaderProcessFactory);
}
// org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory
public DispatcherLeaderProcessFactory createFactory(
    JobGraphStoreFactory jobGraphStoreFactory,
    Executor ioExecutor,
    RpcService rpcService,
    PartialDispatcherServices partialDispatcherServices,
    FatalErrorHandler fatalErrorHandler) {

  final JobGraph jobGraph;

  try {
    jobGraph = jobGraphRetriever.retrieveJobGraph(partialDispatcherServices.getConfiguration());
  } catch (FlinkException e) {
    throw new FlinkRuntimeException("Could not retrieve the JobGraph.", e);
  }

  final DefaultDispatcherGatewayServiceFactory defaultDispatcherServiceFactory = new DefaultDispatcherGatewayServiceFactory(
    JobDispatcherFactory.INSTANCE,
    rpcService,
    partialDispatcherServices);

  return new JobDispatcherLeaderProcessFactory(
    defaultDispatcherServiceFactory,
    jobGraph,
    fatalErrorHandler);
}
// org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactory
JobDispatcherLeaderProcessFactory(
    AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory dispatcherGatewayServiceFactory,
    JobGraph jobGraph,
    FatalErrorHandler fatalErrorHandler) {
  this.dispatcherGatewayServiceFactory = dispatcherGatewayServiceFactory;
  this.jobGraph = jobGraph;
  this.fatalErrorHandler = fatalErrorHandler;
}
// 以上已取出 JobGraph
// 接下里开始 转换
public DispatcherLeaderProcess create(UUID leaderSessionID) {
  return new JobDispatcherLeaderProcess(leaderSessionID, dispatcherGatewayServiceFactory, jobGraph, fatalErrorHandler);
}
// JM 是高可用的,当为主时会调用 Dispatcher.grantLeadership 
// DefaultDispatcherRunner.grantLeadership 
// -> DefaultDispatcherRunner.startNewDispatcherLeaderProcess
// -> DefaultDispatcherRunner.createNewDispatcherLeaderProcess
// -> JobDispatcherLeaderProcessFactory.create
// -> new JobDispatcherLeaderProcess
// -> MiniDispatcher.submitJob

参考资料

Flink 源码阅读笔记(3)- ExecutionGraph 的生成

Flink 四层转化流程

Flink原理与实现:如何生成ExecutionGraph及物理执行图

Flink 如何生成 ExecutionGraph

作业调度