深入剖析 Flink Straming WC流程

Posted by danner on May 26, 2020

Flink 版本:1.10


def main(args: Array[String]) {
    // Checking input parameters
    val params = ParameterTool.fromArgs(args)

    // set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.readTextFile(params.get("input"))
     val counts: DataStream[(String, Int)] = text
      // split up the lines in pairs (2-tuples) containing: (word,1)
      .flatMap(_.toLowerCase.split("\\W+"))
      .filter(_.nonEmpty)
      .map((_, 1))
      // group by the tuple field "0" and sum up tuple field "1"
      .keyBy(0)
      .sum(1)
  
    counts.print()
    // execute program
     env.execute("Streaming WordCount")

最简单的 WC 代码,熟悉 Spark 同学对此一定非常熟悉。

  • StreamExecutionEnvironment.getExecutionEnvironment 获取 streaming 执行环境
  • Source:读取文本
  • Transformation:一连串的算子操作
  • Sink:Print
  • Exe:env.execute() 生成 DAG 提交 job 开始执行

StreamExecutionEnvironment

StreamExecutionEnvironment 是抽象类,实现类如下:

通常只关注以下两个:

  • LocalStreamEnvironment:当前 JVM 运行,例如 IDEA /Standalone 中
  • RemoteStreamEnvironment:远程运行环境

getExecutionEnvironment API 会自动根据当前执行环境选择创建一个实现类:IDEA -> LocalStreamEnvironment;集群运行 -> RemoteStreamEnvironment

Source

ContinuousFileReaderOperator<OUT> reader =
  new ContinuousFileReaderOperator<>(inputFormat);

SingleOutputStreamOperator<OUT> source = addSource(monitoringFunction, sourceName)
  .transform("Split Reader: " + sourceName, typeInfo, reader);

return new DataStreamSource<>(source);

在 Streaming 中,都是用 DataStream 来表示且每个 DataStream 都会包含 Operator(算子 => Transformation) ;Source 会表示成 DataStreamSource

Transformation

在 Streaming 上做一连串的操作,可以理解为 Pipeline

Sink

public DataStreamSink<T> print() {
  PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
  return addSink(printFunction).name("Print to Std. Out");
}
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
  // read the output type of the input Transform to coax out errors about MissingTypeInfo
  transformation.getOutputType();

  // configure the type if needed
  if (sinkFunction instanceof InputTypeConfigurable) {
    ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
  }

  StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));

  DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);

  getExecutionEnvironment().addOperator(sink.getTransformation());
  return sink;
}

与 Source 类似,生成一个 DataStreamSink。

Execute

execute 时,Flink 程序才算是正在执行,与 Spark 一样都是延迟加载(DAG 优化)。

public JobExecutionResult execute(String jobName) throws Exception {
  Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
  // 生成 StreamGraph
  return execute(getStreamGraph(jobName));
}

StreamGraphGenerator

public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
  StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
  if (clearTransformations) {
    this.transformations.clear();
  }
  return streamGraph;
}

本文不介绍 StreamGraph 生成过程,后文会详细阐述

JobGraph

public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
		final JobClient jobClient = executeAsync(streamGraph);

		try {
			final JobExecutionResult jobExecutionResult;

			if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
				jobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
			} else {
				jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
			}

			jobListeners.forEach(jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));

			return jobExecutionResult;
		} catch (Throwable t) {
			jobListeners.forEach(jobListener -> {
				jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t));
			});
			ExceptionUtils.rethrowException(t);

			// never reached, only make javac happy
			return null;
		}
	}

Executor

public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
  checkNotNull(streamGraph, "StreamGraph cannot be null.");
  checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");
 
  // 根据 execution.target 生成不同的 PipelineExecutorFactory
  // LocalExecutorFactory,RemoteExecutorFactory,YarnSessionClusterExecutorFactory,
  // YarnJobClusterExecutorFactory,KubernetesSessionClusterExecutorFactory
  final PipelineExecutorFactory executorFactory =
    executorServiceLoader.getExecutorFactory(configuration);

  checkNotNull(
    executorFactory,
    "Cannot find compatible factory for specified execution.target (=%s)",
    configuration.get(DeploymentOptions.TARGET));
	// new LocalExecutor().execute
  // IDEA 中 {execution.attached=true, execution.target=local}
  CompletableFuture<JobClient> jobClientFuture = executorFactory
    .getExecutor(configuration)
    .execute(streamGraph, configuration);

  try {
    JobClient jobClient = jobClientFuture.get();
    jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
    return jobClient;
  } catch (Throwable t) {
    jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, t));
    ExceptionUtils.rethrow(t);

    // make javac happy, this code path will not be reached
    return null;
  }
}
// org.apache.flink.client.deployment.executors.LocalExecutor
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {
  checkNotNull(pipeline);
  checkNotNull(configuration);

  // we only support attached execution with the local executor.
  checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
  // 
  final JobGraph jobGraph = getJobGraph(pipeline, configuration);
  final MiniCluster miniCluster = startMiniCluster(jobGraph, configuration);
  final MiniClusterClient clusterClient = new MiniClusterClient(configuration, miniCluster);

  CompletableFuture<JobID> jobIdFuture = clusterClient.submitJob(jobGraph);

  jobIdFuture
    .thenCompose(clusterClient::requestJobResult)
    .thenAccept((jobResult) -> clusterClient.shutDownCluster());

  return jobIdFuture.thenApply(jobID ->
           new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID));
}

// org.apache.flink.client.deployment.executors.LocalExecutor
private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) {
		// This is a quirk in how LocalEnvironment used to work. It sets the default parallelism
		// to <num taskmanagers> * <num task slots>. Might be questionable but we keep the behaviour
		// for now.
		if (pipeline instanceof Plan) {
			Plan plan = (Plan) pipeline;
			final int slotsPerTaskManager = configuration.getInteger(
					TaskManagerOptions.NUM_TASK_SLOTS, plan.getMaximumParallelism());
			final int numTaskManagers = configuration.getInteger(
					ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);

			plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
		}

		return FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, 1);
	}

本文不介绍 JobGraph 生成过程,后文会详细阐述。

MiniCluster

// org.apache.flink.client.deployment.executors.LocalExecutor
private MiniCluster startMiniCluster(final JobGraph jobGraph, final Configuration configuration) throws Exception {
		...
		int numTaskManagers = configuration.getInteger(
				ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
				ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER);
		// we have to use the maximum parallelism as a default here, otherwise streaming
		// pipelines would not run
		int numSlotsPerTaskManager = configuration.getInteger(
				TaskManagerOptions.NUM_TASK_SLOTS,
				jobGraph.getMaximumParallelism());
    // 配置 MiniCluster:NumTaskManagers,NumSlotsPerTaskManager
		final MiniClusterConfiguration miniClusterConfiguration =
				new MiniClusterConfiguration.Builder()
						.setConfiguration(configuration)
						.setNumTaskManagers(numTaskManagers)
						.setRpcServiceSharing(RpcServiceSharing.SHARED)
						.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
						.build();
		final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
    // 启动
		miniCluster.start();
		configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());
		return miniCluster;
	}
// MiniCluster to execute Flink jobs locally
// org.apache.flink.runtime.minicluster.MiniCluster
public void start() throws Exception {
    // 准备配置
    try {
      initializeIOFormatClasses(configuration);
			//开启 Metrics Registry
      LOG.info("Starting Metrics Registry");
      metricRegistry = createMetricRegistry(configuration);

      // RPC 服务
      // bring up all the RPC services
      LOG.info("Starting RPC Service(s)");
      AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
      final RpcServiceFactory dispatcherResourceManagreComponentRpcServiceFactory;
			...
        // start a new service per component, possibly with custom bind addresses
        final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress();
        final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();
				// DedicatedRpc
        dispatcherResourceManagreComponentRpcServiceFactory = new DedicatedRpcServiceFactory(akkaRpcServiceConfig, jobManagerBindAddress);
        // taskManagerRpc
        taskManagerRpcServiceFactory = new DedicatedRpcServiceFactory(akkaRpcServiceConfig, taskManagerBindAddress);
      }
	     // MetricsRpc
      RpcService metricQueryServiceRpcService = MetricUtils.startMetricsRpcService(
        configuration,
        commonRpcService.getAddress());
      
      // Starting high-availability services
      haServices = createHighAvailabilityServices(configuration, ioExecutor);
			// Blob 服务:flink 用来管理二进制大文件的服务
      // http://chenyuzhao.me/2017/02/08/jobmanager%E5%9F%BA%E6%9C%AC%E7%BB%84%E4%BB%B6/
      blobServer = new BlobServer(configuration, haServices.createBlobStore());
      blobServer.start();
      // 心跳
      heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
			// 启动 TM
      startTaskManagers();

      MetricQueryServiceRetriever metricQueryServiceRetriever = new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService());

      setupDispatcherResourceManagerComponents(configuration, dispatcherResourceManagreComponentRpcServiceFactory, metricQueryServiceRetriever);

      resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();
      dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();
      clusterRestEndpointLeaderRetrievalService = haServices.getClusterRestEndpointLeaderRetriever();
			// dispatcher
      dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
        commonRpcService,
        DispatcherGateway.class,
        DispatcherId::fromUuid,
        20,
        Time.milliseconds(20L));
      // resourceManager
      resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(
        commonRpcService,
        ResourceManagerGateway.class,
        ResourceManagerId::fromUuid,
        20,
        Time.milliseconds(20L));
      webMonitorLeaderRetriever = new LeaderRetriever();

      resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
      dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
      clusterRestEndpointLeaderRetrievalService.start(webMonitorLeaderRetriever);
    }
    ...
    }
    // create a new termination future
    terminationFuture = new CompletableFuture<>();
    // now officially mark this as running
    running = true;
    LOG.info("Flink Mini Cluster started successfully");
  }
}

MiniCluster 启动一堆服务

Submit Job

// org.apache.flink.runtime.minicluster.MiniCluster
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
	final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();
	final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture);
  // 上传任务所需文件和 jobGraph
	final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);
  // job 提交到调度器
    final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture
      .thenCombine(
      dispatcherGatewayFuture,
      (Void ack, DispatcherGateway dispatcherGateway) -> 
      // dispatcher 提交任务
      dispatcherGateway.submitJob(jobGraph, rpcTimeout))
      .thenCompose(Function.identity());
  	return acknowledgeCompletableFuture.thenApply(
			(Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
	}
// org.apache.flink.runtime.dispatcher
// 每一步都有任务异常的处理 (throwable != null)
	private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {
    // 记录 jobGraph ,并 run
		jobGraphWriter.putJobGraph(jobGraph);
    final CompletableFuture<Void> runJobFuture = runJob(jobGraph);
    return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> {
      if (throwable != null) {
        jobGraphWriter.removeJobGraph(jobGraph.getJobID());
      }
    }));
  }
	private CompletableFuture<Void> runJob(JobGraph jobGraph) {
		Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
		// jobManagerRunner
    final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
		jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
    // 提交
		return jobManagerRunnerFuture
			.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner))
			.thenApply(FunctionUtils.nullFn())
			.whenCompleteAsync(
				(ignored, throwable) -> {
					if (throwable != null) {
						jobManagerRunnerFutures.remove(jobGraph.getJobID());
					}
				},
				getMainThreadExecutor());
	}

流程

启动日志

org.apache.flink.runtime.minicluster.MiniCluster              - Starting Flink Mini Cluster
org.apache.flink.runtime.minicluster.MiniCluster              - Starting Metrics Registry
org.apache.flink.runtime.metrics.MetricRegistryImpl           - No metrics reporter configured, no metrics will be exposed/reported.
org.apache.flink.runtime.minicluster.MiniCluster              - Starting RPC Service(s)
akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Trying to start actor system at :0
akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
akka.remote.Remoting                                          - Starting remoting
akka.remote.Remoting                                          - Remoting started; listening on addresses :[akka.tcp://flink-metrics@127.0.0.1:64276]
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Actor system started at akka.tcp://flink-metrics@127.0.0.1:64276
org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService at akka://flink-metrics/user/MetricQueryService .
org.apache.flink.runtime.minicluster.MiniCluster              - Starting high-availability services
org.apache.flink.runtime.blob.BlobServer                      - Created BLOB server storage directory /var/folders/rq/kmxm07hx54jbnhfvg93gv62c0000gn/T/blobStore-7dd1e563-2fa2-4da1-aee2-8b67927bd0d2
org.apache.flink.runtime.blob.BlobServer                      - Started BLOB server at 0.0.0.0:64292 - max concurrent requests: 50 - max backlog: 1000
org.apache.flink.runtime.blob.PermanentBlobCache              - Created BLOB cache storage directory /var/folders/rq/kmxm07hx54jbnhfvg93gv62c0000gn/T/blobStore-a753f5f4-a6d1-4d7f-a74a-306e3b1f13ea
org.apache.flink.runtime.blob.TransientBlobCache              - Created BLOB cache storage directory /var/folders/rq/kmxm07hx54jbnhfvg93gv62c0000gn/T/blobStore-ec1da097-ca2d-4465-85eb-c2bcfb73e22e
org.apache.flink.runtime.minicluster.MiniCluster              - Starting 1 TaskManger(s)
org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Starting TaskManager with ResourceID: fa24c9b8-8108-428b-bd5c-94d169dcdf9c
org.apache.flink.runtime.taskexecutor.TaskManagerServices     - Temporary file directory '/var/folders/rq/kmxm07hx54jbnhfvg93gv62c0000gn/T': total 233 GB, usable 80 GB (34.33% usable)
org.apache.flink.runtime.io.disk.FileChannelManagerImpl       - FileChannelManager uses directory /var/folders/rq/kmxm07hx54jbnhfvg93gv62c0000gn/T/flink-io-dcafdb6b-a36c-4bfa-9ec6-0e5e4f982631 for spill files.
org.apache.flink.runtime.io.disk.FileChannelManagerImpl       - FileChannelManager uses directory /var/folders/rq/kmxm07hx54jbnhfvg93gv62c0000gn/T/flink-netty-shuffle-0d753714-eb7c-4a29-815e-d8289ccdb1ec for spill files.
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 64 MB for network buffer pool (number of memory segments: 2048, bytes per segment: 32768).
org.apache.flink.runtime.io.network.NettyShuffleEnvironment   - Starting the network environment and its components.
org.apache.flink.runtime.taskexecutor.KvStateService          - Starting the kvState service and its components.
org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have a max timeout of 10000 ms
org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at akka://flink/user/taskmanager_0 .
org.apache.flink.runtime.taskexecutor.JobLeaderService        - Start job leader service.
org.apache.flink.runtime.filecache.FileCache                  - User file cache uses directory /var/folders/rq/kmxm07hx54jbnhfvg93gv62c0000gn/T/flink-dist-cache-cb29d64c-fc7d-4083-b8a7-f95651ab0dcd
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Starting rest endpoint.
org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Log file environment variable 'log.file' is not set.
org.apache.flink.runtime.webmonitor.WebMonitorUtils           - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (fallback keys: [{key=jobmanager.web.log.path, isDeprecated=true}])'.
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Failed to load web based job submission extension. Probable reason: flink-runtime-web is not in the classpath.
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Rest endpoint listening at localhost:50064
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender http://localhost:50064
org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - http://localhost:50064 was granted leadership with leaderSessionID=230a5f5d-ae28-4b29-9173-b832e3604619
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Received confirmation of leadership for leader http://localhost:50064 , session=230a5f5d-ae28-4b29-9173-b832e3604619
org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender LeaderContender: DefaultDispatcherRunner
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender LeaderContender: StandaloneResourceManager
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - ResourceManager akka://flink/user/resourcemanager was granted leadership with fencing token b3ae8d20f361fef5ef89578541eb4363
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess  - Start SessionDispatcherLeaderProcess.
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  - Starting the SlotManager.
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess  - Recover all persisted job graphs.
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess  - Successfully recovered 0 persisted job graphs.
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Received confirmation of leadership for leader akka://flink/user/resourcemanager , session=ef895785-41eb-4363-b3ae-8d20f361fef5
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Connecting to ResourceManager akka://flink/user/resourcemanager(b3ae8d20f361fef5ef89578541eb4363).
org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher .
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Resolved ResourceManager address, beginning registration
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Registration at ResourceManager attempt 1 (timeout=100ms)
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering TaskManager with ResourceID 6babbfff-6da0-4d9f-96cc-2f00a8656ed7 (akka://flink/user/taskmanager_0) at ResourceManager
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService  - Received confirmation of leadership for leader akka://flink/user/dispatcher , session=1665e1f7-0cb0-40f1-8c1b-451b6b8d9de6
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Successful registration at resource manager akka://flinkorg.apache.flink.runtime.minicluster.MiniCluster              - Starting Flink Mini Cluster
org.apache.flink.runtime.minicluster.MiniCluster              - Starting Metrics Registry
org.apache.flink.runtime.metrics.MetricRegistryImpl           - No metrics reporter configured, no metrics will be exposed/reported.
org.apache.flink.runtime.minicluster.MiniCluster              - Starting RPC Service(s)
akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Trying to start actor system at :0
akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
akka.remote.Remoting                                          - Starting remoting
akka.remote.Remoting                                          - Remoting started; listening on addresses :[akka.tcp://flink-metrics@127.0.0.1:64276]
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Actor system started at akka.tcp://flink-metrics@127.0.0.1:64276
org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.metrics.dump.MetricQueryService at akka://flink-metrics/user/MetricQueryService .
org.apache.flink.runtime.minicluster.MiniCluster              - Starting high-availability services
org.apache.flink.runtime.blob.BlobServer                      - Created BLOB server storage directory /var/folders/rq/kmxm07hx54jbnhfvg93gv62c0000gn/T/blobStore-7dd1e563-2fa2-4da1-aee2-8b67927bd0d2
org.apache.flink.runtime.blob.BlobServer                      - Started BLOB server at 0.0.0.0:64292 - max concurrent requests: 50 - max backlog: 1000
org.apache.flink.runtime.blob.PermanentBlobCache              - Created BLOB cache storage directory /var/folders/rq/kmxm07hx54jbnhfvg93gv62c0000gn/T/blobStore-a753f5f4-a6d1-4d7f-a74a-306e3b1f13ea
org.apache.flink.runtime.blob.TransientBlobCache              - Created BLOB cache storage directory /var/folders/rq/kmxm07hx54jbnhfvg93gv62c0000gn/T/blobStore-ec1da097-ca2d-4465-85eb-c2bcfb73e22e
org.apache.flink.runtime.minicluster.MiniCluster              - Starting 1 TaskManger(s)
org.apache.flink.runtime.taskexecutor.TaskManagerRunner       - Starting TaskManager with ResourceID: fa24c9b8-8108-428b-bd5c-94d169dcdf9c
org.apache.flink.runtime.taskexecutor.TaskManagerServices     - Temporary file directory '/var/folders/rq/kmxm07hx54jbnhfvg93gv62c0000gn/T': total 233 GB, usable 80 GB (34.33% usable)
org.apache.flink.runtime.io.disk.FileChannelManagerImpl       - FileChannelManager uses directory /var/folders/rq/kmxm07hx54jbnhfvg93gv62c0000gn/T/flink-io-dcafdb6b-a36c-4bfa-9ec6-0e5e4f982631 for spill files.
org.apache.flink.runtime.io.disk.FileChannelManagerImpl       - FileChannelManager uses directory /var/folders/rq/kmxm07hx54jbnhfvg93gv62c0000gn/T/flink-netty-shuffle-0d753714-eb7c-4a29-815e-d8289ccdb1ec for spill files.
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 64 MB for network buffer pool (number of memory segments: 2048, bytes per segment: 32768).
org.apache.flink.runtime.io.network.NettyShuffleEnvironment   - Starting the network environment and its components.
org.apache.flink.runtime.taskexecutor.KvStateService          - Starting the kvState service and its components.
org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have a max timeout of 10000 ms
org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at akka://flink/user/taskmanager_0 .
org.apache.flink.runtime.taskexecutor.JobLeaderService        - Start job leader service.