Flink 启动流程之 yarn-session

Posted by danner on June 30, 2020

之前介绍的 start-cluster 启动的是 Flink Standalone 模式,接下来看 Flink On Yarn 是如何启动。

yarn-session

Flink On YarnSession 模式,运行一个长服务,适用于运行时间短的任务。

# yarn-session.sh
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

# get Flink config
. "$bin"/config.sh

if [ "$FLINK_IDENT_STRING" = "" ]; then
        FLINK_IDENT_STRING="$USER"
fi

JVM_ARGS="$JVM_ARGS -Xmx512m"

CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`

log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log
log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml"

$JAVA_RUN $JVM_ARGS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"

流程很简单,直接去执行 org.apache.flink.yarn.cli.FlinkYarnSessionCli

// org.apache.flink.yarn.cli.FlinkYarnSessionCli
public static void main(final String[] args) {
  // flink conf 目录载入配置:ip,port,jmHeap,tmHeap,solts,parllelism ...
  final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
  final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();

  int retCode;

  try {
    final FlinkYarnSessionCli cli = new FlinkYarnSessionCli(
      flinkConfiguration,
      configurationDirectory,
      "",
      ""); // no prefix for the YARN session

    SecurityUtils.install(new SecurityConfiguration(flinkConfiguration));
    retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.run(args));
   ...
  System.exit(retCode);
}

看到上面的代码肯定很熟悉:获取 Hadoop Context,在其上执行真正操作。

// org.apache.flink.yarn.cli.FlinkYarnSessionCli
public int run(String[] args) throws CliArgsException, FlinkException {
  final CommandLine cmd = parseCommandLineOptions(args, true);
  ...
  // 设置资源参数
  final Configuration configuration = applyCommandLineOptionsToConfiguration(cmd);
  final ClusterClientFactory<ApplicationId> yarnClusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
  // YarnClusterDescriptor 很重要,Flink On Yarn 的信息
  final YarnClusterDescriptor yarnClusterDescriptor = (YarnClusterDescriptor) yarnClusterClientFactory.createClusterDescriptor(configuration);

  try {
    // 可以查询集群,输出Memory,vCores,HealthReport
    if (cmd.hasOption(query.getOpt())) {
      final String description = yarnClusterDescriptor.getClusterDescription();
      System.out.println(description);
      return 0;
    } else {
      final ClusterClientProvider<ApplicationId> clusterClientProvider;
      final ApplicationId yarnApplicationId;
      
      if (cmd.hasOption(applicationId.getOpt())) {
        // 设置固定 appid,并获取 clusterClient
        yarnApplicationId = ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt()));
        clusterClientProvider = yarnClusterDescriptor.retrieve(yarnApplicationId);
      } else {
        final ClusterSpecification clusterSpecification = yarnClusterClientFactory.getClusterSpecification(configuration);

        clusterClientProvider = yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
        // ClusterClient:封装了将程序提交到远程集群所需的功能,很重要
        ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
        //------------------ ClusterClient deployed, handle connection details
        yarnApplicationId = clusterClient.getClusterId();
        // web UI
        try {
          System.out.println("JobManager Web Interface: " + clusterClient.getWebInterfaceURL());
         // yarnApplicationId 记录到本地文件,一般为 /tmp/.yarn-properties-user
         // contnt: applicationID=application_1593474472940_0001
          // flink run 启动时会根据此 id 来提交
          writeYarnPropertiesFile(
            yarnApplicationId,
            dynamicPropertiesEncoded);
        } 
      ...
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

        final YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
          yarnClusterDescriptor.getYarnClient(),
          yarnApplicationId,
          new ScheduledExecutorServiceAdapter(scheduledExecutorService));
        Thread shutdownHook = ShutdownHookUtil.addShutdownHook(
          () -> shutdownCluster(
            clusterClientProvider.getClusterClient(),
            scheduledExecutorService,
            yarnApplicationStatusMonitor),
          getClass().getSimpleName(),
          LOG);
        try {
          runInteractiveCli(
            yarnApplicationStatusMonitor,
            acceptInteractiveInput);
        } 
      ...
  return 0;
}

YarnClusterDescriptor

The descriptor with deployment information for deploying a Flink cluster on Yarn

clusterClientServiceLoader = DefaultClusterClientServiceLoader
  final ClusterClientFactory<ApplicationId> yarnClusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);

final YarnClusterDescriptor yarnClusterDescriptor = (YarnClusterDescriptor) yarnClusterClientFactory.createClusterDescriptor(configuration);

// org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
// 加载所有 ClusterClientFactory 类
private static final ServiceLoader<ClusterClientFactory> defaultLoader = ServiceLoader.load(ClusterClientFactory.class);
	@Override
	public <ClusterID> ClusterClientFactory<ClusterID> getClusterClientFactory(final Configuration configuration) {
		checkNotNull(configuration);

	final List<ClusterClientFactory> compatibleFactories = new ArrayList<>();
	final Iterator<ClusterClientFactory> factories = defaultLoader.iterator();
		while (factories.hasNext()) {
			try {
				final ClusterClientFactory factory = factories.next();
        // 比对当前的 ClusterClientFactory name 和configuration 中 target 是否相同
				if (factory != null && factory.isCompatibleWith(configuration)) {
					compatibleFactories.add(factory);
				}
			} catch (Throwable e) {
				if (e.getCause() instanceof NoClassDefFoundError) {
					LOG.info("Could not load factory due to missing dependencies.");
				} else {
					throw e;
				}
			}
		}

		if (compatibleFactories.size() > 1) {
			...
			throw new IllegalStateException("Multiple compatible client factories found for:\n" + String.join("\n", configStr) + ".");
		}
		return compatibleFactories.isEmpty() ? null : (ClusterClientFactory<ClusterID>) compatibleFactories.get(0);
	}
// org.apache.flink.yarn.YarnClusterClientFactory
@Override
public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {
  checkNotNull(configuration);
  return getClusterDescriptor(configuration);
}
private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) {
  final YarnClient yarnClient = YarnClient.createYarnClient();
  final YarnConfiguration yarnConfiguration = new YarnConfiguration();
  yarnClient.init(yarnConfiguration);
  yarnClient.start();
  return new YarnClusterDescriptor(
    configuration,
    yarnConfiguration,
    yarnClient,
    YarnClientYarnClusterInformationRetriever.create(yarnClient),
    false);
}

根据 Job target 创建对应集群的 ClusterClientFactory,然后通过 configyarnconfigyarnclient 构建 YarnClusterDescriptor 。

ClusterClient

WebUI 服务的本地代理(IP:Port 与JM 相同),可以通过 REST API 提交 jar 运行任务

// 获取 flink config.masterMemoryMB,taskManagerMemoryMB,slotsPerTaskManager
final ClusterSpecification clusterSpecification = yarnClusterClientFactory.getClusterSpecification(configuration);
clusterClientProvider = yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();

// org.apache.flink.yarn.YarnClusterDescriptor
public ClusterClientProvider<ApplicationId> deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException {
  try {
    return deployInternal(
      clusterSpecification,
      "Flink session cluster",
      getYarnSessionClusterEntrypoint(),
      null,
      false);
  } catch (Exception e) {
    throw new ClusterDeploymentException("Couldn't deploy Yarn session cluster", e);
  }
}
private ClusterClientProvider<ApplicationId> deployInternal(
			ClusterSpecification clusterSpecification,
			String applicationName,
			String yarnClusterEntrypoint,
			@Nullable JobGraph jobGraph,
			boolean detached) throws Exception {

		if (UserGroupInformation.isSecurityEnabled()) {
			// note: UGI::hasKerberosCredentials inaccurately reports false
			// for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
			// so we check only in ticket cache scenario.
			boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);

			UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
			if (loginUser.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS
					&& useTicketCache && !loginUser.hasKerberosCredentials()) {
				LOG.error("Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials");
				throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user " +
						"does not have Kerberos credentials");
			}
		}
		isReadyForDeployment(clusterSpecification);
		// ------------------ Check if the specified queue exists --------------------
		checkYarnQueues(yarnClient);
		// ------------------ Check if the YARN ClusterClient has the requested resources --------------
		// Create application via yarnClient
		final YarnClientApplication yarnApplication = yarnClient.createApplication();
		final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();

		Resource maxRes = appResponse.getMaximumResourceCapability();

		final ClusterResourceDescription freeClusterMem;
		try {
      // 获取集群空闲资源
			freeClusterMem = getCurrentFreeClusterResources(yarnClient);
		} catch (YarnException | IOException e) {
			failSessionDuringDeployment(yarnClient, yarnApplication);
			throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
		}
		final int yarnMinAllocationMB = yarnConfiguration.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);

		final ClusterSpecification validClusterSpecification;
		try {
      // 检查资源是否满足
			validClusterSpecification = validateClusterResources(
					clusterSpecification,
					yarnMinAllocationMB,
					maxRes,
					freeClusterMem);
		} catch (YarnDeploymentException yde) {
			failSessionDuringDeployment(yarnClient, yarnApplication);
			throw yde;
		}
		LOG.info("Cluster specification: {}", validClusterSpecification);
		final ClusterEntrypoint.ExecutionMode executionMode = detached ?
				ClusterEntrypoint.ExecutionMode.DETACHED
				: ClusterEntrypoint.ExecutionMode.NORMAL;
		flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString());
   // 通过 YARN 将封装好的 ApplicationSubmissionContext 提交到 yarn,返回 report
   // yarnClusterEntrypoint 就是yarn 集群入口点,类似之前分析的 StandaloneSessionClusterEntrypoint
		ApplicationReport report = startAppMaster(
				flinkConfiguration,
				applicationName,
				yarnClusterEntrypoint,
				jobGraph,
				yarnClient,
				yarnApplication,
				validClusterSpecification);
		// print the application id for user to cancel themselves.
		if (detached) {
			final ApplicationId yarnApplicationId = report.getApplicationId();
			logDetachedClusterInformation(yarnApplicationId, LOG);
		}
		setClusterEntrypointInfoToConfig(report);
		return () -> {
			try {
				return new RestClusterClient<>(flinkConfiguration, report.getApplicationId());
			} catch (Exception e) {
				throw new RuntimeException("Error while creating RestClusterClient.", e);
			}
		};
	}

/**
 * A {@link ClusterClient} implementation that communicates via HTTP REST requests.
 * org.apache.flink.client.program.rest.RestClusterClient
 */

runInteractiveCli

命令行的一些交互,主要是在命令行输出任务的一些状态

启动日志

2020-06-30 07:48:10,167 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, localhost
2020-06-30 07:48:10,169 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2020-06-30 07:48:10,169 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2020-06-30 07:48:10,169 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.memory.process.size, 1568m
2020-06-30 07:48:10,169 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2020-06-30 07:48:10,169 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2020-06-30 07:48:10,170 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.execution.failover-strategy, region
2020-06-30 07:48:10,804 WARN  org.apache.hadoop.util.NativeCodeLoader                       - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2020-06-30 07:48:10,933 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to danner (auth:SIMPLE)
2020-06-30 07:48:10,975 INFO  org.apache.flink.runtime.security.modules.JaasModule          - Jaas file will be created as /var/folders/rq/kmxm07hx54jbnhfvg93gv62c0000gn/T/jaas-8888642517590934443.conf.
2020-06-30 07:48:10,997 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The configuration directory ('/Users/luyongtao/app/flink-1.10.0/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2020-06-30 07:48:11,115 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at /0.0.0.0:8032
2020-06-30 07:48:11,354 INFO  org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils  - The derived from fraction jvm overhead memory (156.800mb (164416719 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2020-06-30 07:48:11,610 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1568, slotsPerTaskManager=1}
2020-06-30 07:48:14,570 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - Submitting application master application_1593474472940_0001
2020-06-30 07:48:14,740 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1593474472940_0001
2020-06-30 07:48:14,740 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - Waiting for the cluster to be allocated
2020-06-30 07:48:14,743 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - Deploying cluster, current state ACCEPTED
2020-06-30 07:48:22,574 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - YARN application has been deployed successfully.
2020-06-30 07:48:22,575 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - Found Web Interface localhost:54279 of application 'application_1593474472940_0001'.
JobManager Web Interface: http://localhost:54279

总结

获取 yarnClusterDescriptorYARN 集群部署 JobManager