Flink 启动流程之 flink run

Posted by danner on July 2, 2020

Flink 1.10

Flink run 将我们编译好的代码,提交到集群运行

  • Start-cluster 执行是启动 Standalone 集群, 任务在 Standalone 下运行
  • yarn-session 执行是在 Yarn 集群启动长服务,任务在 Flink session cluster 下运行
  • 事先无执行任何脚本直接执行 flink run ,任务在 Flink per-job cluster 下执行
# flink 
exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
// org.apache.flink.client.cli.CliFrontend
// Implementation of a simple command line frontend for executing programs.
/**
* Submits the job based on the arguments.
*/
public static void main(final String[] args) {
  EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

  // 1. find the configuration directory
  final String configurationDirectory = getConfigurationDirectoryFromEnv();

  // 2. load the global configuration
  final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);

  // 3. load the custom command lines
  final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
    configuration,
    configurationDirectory);

  try {
    final CliFrontend cli = new CliFrontend(
      configuration,
      customCommandLines);
    SecurityUtils.install(new SecurityConfiguration(cli.configuration));
    int retCode = SecurityUtils.getInstalledContext()
      .runSecured(() -> cli.parseParameters(args));
    System.exit(retCode);
  }
  ...
}

都是老套路:加载配置,获取 Context ,我们关注下 ` loadCustomCommandLines`

public static List<CustomCommandLine> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
  List<CustomCommandLine> customCommandLines = new ArrayList<>();
  // FlinkYarnSessionCli 熟悉嘛?就是 yarn-session 
  final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
  try {
    customCommandLines.add(
      loadCustomCommandLine(flinkYarnSessionCLI,
                            configuration,
                            configurationDirectory,
                            "y",
                            "yarn"));
  } 
  ...
  customCommandLines.add(new ExecutorCLI(configuration));
  //Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the
  // active CustomCommandLine in order and DefaultCLI isActive always return true.
  customCommandLines.add(new DefaultCLI(configuration));

  return customCommandLines;
}
public int parseParameters(String[] args) {
  // get action,格式,第一个参数必定是 action
  String action = args[0];

  // remove action from parameters
  final String[] params = Arrays.copyOfRange(args, 1, args.length);
  try {
    // do action
    switch (action) {
      case ACTION_RUN:
        // flink run
        run(params);
        return 0;
      case ACTION_LIST:
        list(params);
        return 0;
      case ACTION_INFO:
        info(params);
        return 0;
      case ACTION_CANCEL:
        cancel(params);
        return 0;
      case ACTION_STOP:
        stop(params);
        return 0;
      case ACTION_SAVEPOINT:
        savepoint(params);
        return 0;
      case "-h":
      case "--help":
        CliFrontendParser.printHelp(customCommandLines);
        return 0;
      case "-v":
      case "--version":
        String version = EnvironmentInformation.getVersion();
        String commitID = EnvironmentInformation.getRevisionInformation().commitId;
        System.out.print("Version: " + version);
        System.out.println(commitID.equals(EnvironmentInformation.UNKNOWN) ? "" : ", Commit ID: " + commitID);
        return 0;
      ...
    
protected void run(String[] args) throws Exception {
    // 解析参数
    final Options commandOptions = CliFrontendParser.getRunCommandOptions();
    final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions);
    final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true);
    final ProgramOptions programOptions = new ProgramOptions(commandLine);
    ...
    // 封装任务包:jar,mainclass,usejar ...;后面有大用
    final PackagedProgram program;
    try {
        LOG.info("Building program from JAR file");
        program = buildProgram(programOptions);
    }
    ...
    final List<URL> jobJars = program.getJobJarAndDependencies();
    final Configuration effectiveConfiguration =
            getEffectiveConfiguration(commandLine, programOptions, jobJars);

    LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

    try {
        executeProgram(effectiveConfiguration, program);
    } finally {
        program.deleteExtractedLibraries();
    }
 }
// DefaultExecutorServiceLoader
protected void executeProgram(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException {
    ClientUtils.executeProgram(DefaultExecutorServiceLoader.INSTANCE, configuration, program);
}
protected void executeProgram(final Configuration configuration, final PackagedProgram program) throws ProgramInvocationException {
  ClientUtils.executeProgram(DefaultExecutorServiceLoader.INSTANCE, configuration, program);
}

// org.apache.flink.client.ClientUtils
public static void executeProgram(
  PipelineExecutorServiceLoader executorServiceLoader,
  Configuration configuration,
  PackagedProgram program) throws ProgramInvocationException {
  checkNotNull(executorServiceLoader);
  final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
  final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
  try {
    Thread.currentThread().setContextClassLoader(userCodeClassLoader);

    LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));

    ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
      executorServiceLoader,
      configuration,
      userCodeClassLoader);
    ContextEnvironment.setAsContext(factory);

    try {
      // 执行用户代码
      program.invokeInteractiveModeForExecution();
    } 
    ...
}
// org.apache.flink.client.program.PackagedProgram
public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
  callMainMethod(mainClass, args);
}
private static void callMainMethod(Class<?> entryClass, String[] args) throws ProgramInvocationException {
    Method mainMethod;
    if (!Modifier.isPublic(entryClass.getModifiers())) {
        throw new ProgramInvocationException("The class " + entryClass.getName() + " must be public.");
    }

    try {
        mainMethod = entryClass.getMethod("main", String[].class);
    } 
    ...
    try {
        mainMethod.invoke(null, (Object) args);
    } 
    ...
}

封装 Config,直接跑用户代码,相关介绍看 深入剖析 Flink Straming WC流程,下节将整个启动流程串起来。