Flink 内存管理

Posted by danner on August 25, 2020

Flink 1.11

资源调度

当前 Flink 所采用的是自顶向下的资源管理,我们所配置的是 Job 整体的资源,而 Flink 通过 Slot Sharing 机制控制 Slot 的数量和负载均衡,通过调整 Task Manager / Slot 的资源,以适应一个 Slot Sharing Group 的资源需求。Flink 的资源管理配置简单,易用性强,适合拓扑结构简单或规模较小的作业(参考资料一中探讨自下而上的细粒度资源控制)。

上图是 Cluster 到 Job 的资源调度过程,包含两个过程:

  • Slot Allocation(红色箭头)

Scheduler 向 Slot Pool 发送请求,如果 Slot 资源足够则直接分配。如果 Slot 资源不够,则由 Slot Pool 再向 Slot Manager 发送请求(此时即为 Job 向 Cluster 请求资源),如果 Slot Manager 判断集群当中有足够的资源可以满足需求,那么就会向 Task Manager 发送 Assign 指令,Task Manager 就会提供 Slot 给 Slot Pool,Slot Pool 再去满足 Scheduler 的资源请求。

  • Starting TaskManagers(蓝色箭头)

在 Active Resource Manager 资源部署模式下,当 Resource Manager 判定 Flink Cluster 中没有足够的资源去满足需求时,它会进一步去底层的资源调度系统请求资源,由调度系统把新的 Task Manager 启动起来,并且 TaskManager 向 Resource Manager 注册,则完成了新 Slot 的补充。

内存分配

以上是在 job 层面的资源申请和调度。内存作为重要资源,在 TaskManager 中是如何分配的呢?

上图是 TaskManager 中的内存分配总览,可分为以下几个部分(详细内容参考资料二)

  • Total Process Memory - TaskManager 进程总内存

    • Total Flink Memory - Flink 总内存

      • 堆内

        • Framework 内存

        Flink Runtime 底层占用的内存,taskmanager.memory.framework.heap.size 控制,默认 128MB

        • Task 内存

        算子逻辑和用户代码、自定义数据结构真正占用的内存。taskmanager.memory.task.heap.size,无默认值,不建议设置,由 Flink 总内存减去框架、托管、网络三部分的内存推算得出。

      • 堆外

        • Managed memory - 托管内存

        纯堆外内存,由 MemoryManager 管理,用于中间结果缓存、排序、哈希表等,以及 RocksDB 状态后端。taskmanager.memory.managed.fraction 占 Flink 内存的比例,默认 0.4

        • 直接内存

          • Framework 堆外内存

          taskmanager.memory.framework.off-heap.size,默认 128MB

          • Task 堆外内存

          taskmanager.memory.task.off-heap.size ,默认为0 不使用

          • Network 内存

          纯堆外内存,用于 TaskManager 之间(shuffle、广播等)及与外部组件的数据传输,以直接内存形式分配。taskmanager.memory.network.fraction 占 Flink 内存的比例,默认 0.1;taskmanager.memory.network.mintaskmanager.memory.network.max 控制其最小值和最大值,默认分别为 64MB和1GB。

    • Metaspace - JVM 元空间

    默认 96MB,taskmanager.memory.jvm-metaspace.size 控制

    • Overhead - JVM 额外开销

    为JVM预留的其他本地内存,用于线程栈、代码缓存等,taskmanager.memory.jvm-overhead.fraction 占总内存比例默认 0.1,taskmanager.memory.jvm-overhead.mintaskmanager.memory.jvm-overhead.max 控制其最小值和最大值,默认分别为192MB和1GB。

一大堆参数该如何调整?一般情况只需设置 TaskManager 内存(总内存),其他按默认去分配即可。如果真要调整建议只修改网络和托管内存。当然你也可以不设置 TM 总内存,而只设置网络或其他内存,那这个时候该如何分配?在FLIP-49 已经阐述的非常详细

  • If both Task Heap Memory and Managed Memory are configured, we use these to derive Total Flink Memory

    • If Network Memory is configured explicitly, we use that value
    • Otherwise, we compute it such that it makes up the configured fraction of the final Total Flink Memory (see getAbsoluteOrInverseFraction())
  • If Total Flink Memory is configured, but not Task Heap Memory and Managed Memory, then we derive Network Memory and Managed Memory, and leave the rest (excluding Framework Heap Memory, Framework Off-Heap Memory and Task Off-Heap Memory) as Task Heap Memory.

    • If Network Memory is configured explicitly, we use that value
    • Otherwise we compute it such that it makes up the configured fraction of the Total Flink Memory (see getAbsoluteOrFraction())
    • If Managed Memory is configured explicitly, we use that value
    • Otherwise we compute it such that it makes up the configured fraction of the Total Flink Memory (see getAbsoluteOrFraction())
  • If only the Total Process Memory is configured, we derive the Total Flink Memory in the following way

    • We get (or compute relative) and subtract the JVM Overhead from Total Process Memory (see getAbsoluteOrFraction())
    • We subtract JVM Metaspace from the remaining
    • We leave the rest as Total Flink Memory

源码分析

在详细了解内存分配之后,我们近入源码来看看下。Job 提交到 Yarn 之前,Client 会校验资源是否充足。

// org.apache.flink.yarn.YarnClusterDescriptor
// 提交任务中在 clusterDescriptor.deployJobCluster 调用
private ClusterSpecification validateClusterResources(
  ClusterSpecification clusterSpecification,
  int yarnMinAllocationMB,
  Resource maximumResourceCapability,
  ClusterResourceDescription freeClusterResources) throws YarnDeploymentException {
  // JM 内存默认 768M,TM 内存默认1G
  int jobManagerMemoryMb = clusterSpecification.getMasterMemoryMB();
  final int taskManagerMemoryMb = clusterSpecification.getTaskManagerMemoryMB();
  logIfComponentMemNotIntegerMultipleOfYarnMinAllocation("JobManager", jobManagerMemoryMb, yarnMinAllocationMB);
  logIfComponentMemNotIntegerMultipleOfYarnMinAllocation("TaskManager", taskManagerMemoryMb, yarnMinAllocationMB);
  // set the memory to minAllocationMB to do the next checks correctly
  if (jobManagerMemoryMb < yarnMinAllocationMB) {
    jobManagerMemoryMb =  yarnMinAllocationMB;
  }
  final String note = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
  if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) {
    throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n"
        + "Maximum Memory: " + maximumResourceCapability.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note);
  }
  if (taskManagerMemoryMb > maximumResourceCapability.getMemory()) {
    throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n"
        + "Maximum Memory: " + maximumResourceCapability.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + note);
  }
  // 集群内存资源是否充足
  final String noteRsc = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
      "connecting from the beginning because the resources are currently not available in the cluster. " +
      "The allocation might take more time than usual because the Flink YARN client needs to wait until " +
      "the resources become available.";
  if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
    LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
        + "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc);
  }
  if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
    LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
        + "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc);
  }
  return new ClusterSpecification.ClusterSpecificationBuilder()
      .setMasterMemoryMB(jobManagerMemoryMb)
      .setTaskManagerMemoryMB(taskManagerMemoryMb)
      .setSlotsPerTaskManager(clusterSpecification.getSlotsPerTaskManager())
      .createClusterSpecification();

}
// TM 内存分配,提交任务中生成ClusterSpecification 时调用
// org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils
public static TaskExecutorProcessSpec processSpecFromConfig(final Configuration config) {
	return createMemoryProcessSpec(config, PROCESS_MEMORY_UTILS.memoryProcessSpecFromConfig(config));
}
private static TaskExecutorProcessSpec createMemoryProcessSpec(
		final Configuration config,
		final CommonProcessMemorySpec<TaskExecutorFlinkMemory> processMemory) {
  // CommonProcessMemorySpec TaskExecutorProcessSpec 类注释描述内存分布
	TaskExecutorFlinkMemory flinkMemory = processMemory.getFlinkMemory();
	JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead = processMemory.getJvmMetaspaceAndOverhead();
	// Process = Flink + jvm meta + jvm overhead
  return new TaskExecutorProcessSpec(getCpuCores(config), flinkMemory, jvmMetaspaceAndOverhead);
}

// org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils
public CommonProcessMemorySpec<FM> memoryProcessSpecFromConfig(Configuration config) {
	if (options.getRequiredFineGrainedOptions().stream().allMatch(config::contains)) {
		// all internal memory options are configured, use these to derive total Flink and process memory
		return deriveProcessSpecWithExplicitInternalMemory(config);
	} else if (config.contains(options.getTotalFlinkMemoryOption())) {
		// internal memory options are not configured, total Flink memory is configured,
		// derive from total flink memory
		return deriveProcessSpecWithTotalFlinkMemory(config);
	} else if (config.contains(options.getTotalProcessMemoryOption())) {
		// total Flink memory is not configured, total process memory is configured,
		// derive from total process memory
    // 设置 TM 总内存为基点,开始计算每个部件占用内存
		return deriveProcessSpecWithTotalProcessMemory(config);
	}
	return failBecauseRequiredOptionsNotConfigured();
}
private CommonProcessMemorySpec<FM> deriveProcessSpecWithTotalProcessMemory(Configuration config) {
	MemorySize totalProcessMemorySize = getMemorySizeFromConfig(config, options.getTotalProcessMemoryOption());
	// 根据比例得到 JvmMetaspaceAndOverhead
  JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead =
		deriveJvmMetaspaceAndOverheadWithTotalProcessMemory(config, totalProcessMemorySize);
	// totalFlinkMemorySize = totalProcessMemorySize - jvmMetaspaceAndOverhead
  MemorySize totalFlinkMemorySize = totalProcessMemorySize.subtract(
		jvmMetaspaceAndOverhead.getTotalJvmMetaspaceAndOverheadSize());
	FM flinkInternalMemory = flinkMemoryUtils.deriveFromTotalFlinkMemory(config, totalFlinkMemorySize);
	return new CommonProcessMemorySpec<>(flinkInternalMemory, jvmMetaspaceAndOverhead);
}

// org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils
// 计算 Flink 内存分布
public TaskExecutorFlinkMemory deriveFromTotalFlinkMemory(
	final Configuration config,
	final MemorySize totalFlinkMemorySize) {
  // 获取 flink framework 和task 对外内存大小
	final MemorySize frameworkHeapMemorySize = getFrameworkHeapMemorySize(config);
	final MemorySize frameworkOffHeapMemorySize = getFrameworkOffHeapMemorySize(config);
	final MemorySize taskOffHeapMemorySize = getTaskOffHeapMemorySize(config);

	final MemorySize taskHeapMemorySize;
	final MemorySize networkMemorySize;
	final MemorySize managedMemorySize;
  // flink 根据task 内存是否显式分配,划分为两种方式:
	if (isTaskHeapMemorySizeExplicitlyConfigured(config)) {
    // task 直接分配时,network = Flink - framework - task - managed
		taskHeapMemorySize = getTaskHeapMemorySize(config);
		managedMemorySize = deriveManagedMemoryAbsoluteOrWithFraction(config, totalFlinkMemorySize);
		final MemorySize totalFlinkExcludeNetworkMemorySize =
			frameworkHeapMemorySize.add(frameworkOffHeapMemorySize).add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize);
		if (totalFlinkExcludeNetworkMemorySize.getBytes() > totalFlinkMemorySize.getBytes()) {
			...
		}
		networkMemorySize = totalFlinkMemorySize.subtract(totalFlinkExcludeNetworkMemorySize);
		sanityCheckNetworkMemoryWithExplicitlySetTotalFlinkAndHeapMemory(config, networkMemorySize, totalFlinkMemorySize);
	} else {
    // 未直接设置 task,task = Flink - framework - task offHeap - managed - network
		managedMemorySize = deriveManagedMemoryAbsoluteOrWithFraction(config, totalFlinkMemorySize);

		networkMemorySize = isUsingLegacyNetworkConfigs(config) ? getNetworkMemorySizeWithLegacyConfig(config) :
			deriveNetworkMemoryWithFraction(config, totalFlinkMemorySize);
		final MemorySize totalFlinkExcludeTaskHeapMemorySize =
			frameworkHeapMemorySize.add(frameworkOffHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize).add(networkMemorySize);
		if (totalFlinkExcludeTaskHeapMemorySize.getBytes() > totalFlinkMemorySize.getBytes()) {
			...
		}
		taskHeapMemorySize = totalFlinkMemorySize.subtract(totalFlinkExcludeTaskHeapMemorySize);
	}
	final TaskExecutorFlinkMemory flinkInternalMemory = new TaskExecutorFlinkMemory(
		frameworkHeapMemorySize,
		frameworkOffHeapMemorySize,
		taskHeapMemorySize,
		taskOffHeapMemorySize,
		networkMemorySize,
		managedMemorySize);
	sanityCheckTotalFlinkMemory(config, flinkInternalMemory);

	return flinkInternalMemory;
}

当设置 task 堆内内存时,network 内存需要被计算

当未设置 task 堆内内存时,task 堆内内存需要被计算

一般而言,我们都是第二种方式

Network

// org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils
// org.apache.flink.configuration.TaskManagerOptions
public static final ConfigOption<MemorySize> NETWORK_MEMORY_MIN =
  key("taskmanager.memory.network.min")
  .memoryType()
  .defaultValue(MemorySize.parse("64m"))
public static final ConfigOption<MemorySize> NETWORK_MEMORY_MAX =
  key("taskmanager.memory.network.max")
    .memoryType()
    .defaultValue(MemorySize.parse("1g"))
public static final ConfigOption<Float> NETWORK_MEMORY_FRACTION =
  key("taskmanager.memory.network.fraction")
    .floatType()
    .defaultValue(0.1f)
private static void adjustNetworkMemoryForLocalExecution(Configuration config) {
  if (!config.contains(TaskManagerOptions.NETWORK_MEMORY_MIN) &&
    config.contains(TaskManagerOptions.NETWORK_MEMORY_MAX)) {
    config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, config.get(TaskManagerOptions.NETWORK_MEMORY_MAX));
  }
  if (!config.contains(TaskManagerOptions.NETWORK_MEMORY_MAX) &&
    config.contains(TaskManagerOptions.NETWORK_MEMORY_MIN)) {
    config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, config.get(TaskManagerOptions.NETWORK_MEMORY_MIN));
  }
  setConfigOptionToDefaultIfNotSet(config, TaskManagerOptions.NETWORK_MEMORY_MIN, DEFAULT_SHUFFLE_MEMORY_SIZE);
  setConfigOptionToDefaultIfNotSet(config, TaskManagerOptions.NETWORK_MEMORY_MAX, DEFAULT_SHUFFLE_MEMORY_SIZE);
}

案例

当给 TM 分配 3G 内存时,Flink 是如何分配。先看 Flink 启动时 JVM 参数,再回推这些参数如何得到:

  • -Xmx1265002064 -Xms1265002064 (1.17G)
  • -XX:MaxDirectMemorySize=414061694 (394M)
  • -XX:MaxMetaspaceSize=100663296 (96M)

内存分配:

  • Total Process Memory 3G
    • Total Flink Memory 3G -96M - 307M = 2669M
      • 堆内
        • Framework 内存 128MB
        • Task 内存 (2669-128-1067-128-266) = 1080M
      • 堆外
        • Managed memory - 托管内存 2669*0.4=1067M=1.04G
        • 直接内存
          • Framework 堆外内存 128M
          • Network 内存 2669*0.1=266M
    • Metaspace 96M
    • Overhead 3*0.1 = 0.3G = 307M

Jvm heap = 128+1080=1208M=1.17G

直接内存 = 128+266 = 394M

参考资料

数仓系列 深入解读 Flink 资源管理机制

Flink 1.10之改进的TaskManager内存模型与配置

FLIP-49: Unified Memory Configuration for TaskExecutors

Flink 原理与实现:内存管理

详解 Flink 容器化环境下的 OOM Killed