Flink CheckPoint 之元数据管理

Posted by danner on February 13, 2021

每次Checkpoint 完成后,会生成 _metadata文件,记录Checkpoint元数据信息。

文件对应到代码是 CheckpointMetadata 类。

// org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata
/** The metadata of a snapshot (checkpoint or savepoint). */
public class CheckpointMetadata implements Disposable {

    /** The checkpoint ID. */
    private final long checkpointId;

    /** The operator states. */
    private final Collection<OperatorState> operatorStates;

    /** The states generated by the CheckpointCoordinator. */
    private final Collection<MasterState> masterStates;

只有三个属性:

  • checkpointID

  • operatorStates:算子状态

  • masterStates:CheckpointCoordinator 产生的状态

operatorStates 是重点,记录每个算子以及对应状态的文件。

OperatorState

算子状态

// org.apache.flink.runtime.checkpoint.OperatorState
public class OperatorState implements CompositeStateHandle {

    private static final long serialVersionUID = -4845578005863201810L;

    /** The id of the operator. */
    private final OperatorID operatorID;

    /** The handles to states created by the parallel tasks: subtaskIndex -> subtaskstate. */
    private final Map<Integer, OperatorSubtaskState> operatorSubtaskStates;

    /** The state of the operator coordinator. Null, if no such state exists. */
    @Nullable private ByteStreamStateHandle coordinatorState;

    /** The parallelism of the operator when it was checkpointed. */
    private final int parallelism;

    /**
     * The maximum parallelism (for number of keygroups) of the operator when the job was first
     * created.
     */
    private final int maxParallelism;
  • operatorSubtaskStates

    • 当前算子并行度index -> subtask state

    • OperatorState 表示一个算子级别的,OperatorSubtaskState 是 subtask 级别

    • 如果一个算子有 10 个并行度,那么 OperatorState 有 10 个 OperatorSubtaskState

  • coordinatorState

  • parallelism:此checkpoint保存时,任务的并行度(可能与恢复任务时并行度不同 - 从checkpoint重启并修改并行度)

  • maxParallelism:算子最大并行度;若重启后设置的最大并行度大于checkpoint中的最大并行度,则无法从状态恢复 - 涉及到 keyGroup 和状态的分配问题(看参考资料)。

OperatorSubtaskState

OperatorSubtaskState才是保存状态的基本单元。

// org.apache.flink.runtime.checkpoint.OperatorSubtaskState
public class OperatorSubtaskState implements CompositeStateHandle {
    private final StateObjectCollection<OperatorStateHandle> managedOperatorState;

    private final StateObjectCollection<OperatorStateHandle> rawOperatorState;

    private final StateObjectCollection<KeyedStateHandle> managedKeyedState;

    private final StateObjectCollection<KeyedStateHandle> rawKeyedState;

状态根据托管类型分为两种:managedrawraw还没使用过,这里只分析managed

  • managedOperatorState:operator 状态 - Kafka offset

    • OperatorStateHandle

      • stateNameToPartitionOffsets:状态名称 -> (状态分布模式,状态在文件中地址)

      • StreamStateHandle:状态文件句柄 - 文件地址

  • managedKeyedState:Key状态 - key后的算子

    • KeyedStateHandle

      • StreamStateHandle:状态文件句柄 - 文件地址

      • KeyGroupRangeOffsets

        • KeyGroupRange:当前subTask 负责的 keyGroup 范围

        • offsets:每个keyGroup 状态对应到 state文件的开始地址

反序列化

_metadata文件转成 CheckpointMetadata,肯定是涉及到反序列化。

// org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase
 /* <p>Basic checkpoint metadata layout:
 *
 * <pre>
 *  +--------------+---------------+-----------------+
 *  | checkpointID | master states | operator states |
 *  +--------------+---------------+-----------------+
 *
 *  Master state:
 *  +--------------+---------------------+---------+------+---------------+
 *  | magic number | num remaining bytes | version | name | payload bytes |
 *  +--------------+---------------------+---------+------+---------------+
 * </pre>
 */
 public abstract class MetadataV2V3SerializerBase {
    protected CheckpointMetadata deserializeMetadata(
            DataInputStream dis, @Nullable String externalPointer) throws IOException {

        final DeserializationContext context =
                externalPointer == null ? null : new DeserializationContext(externalPointer);

        // first: checkpoint ID
        final long checkpointId = dis.readLong();
        if (checkpointId < 0) {
            throw new IOException("invalid checkpoint ID: " + checkpointId);
        }

        // second: master state
        final List<MasterState> masterStates;
        final int numMasterStates = dis.readInt();

        if (numMasterStates == 0) {
            masterStates = Collections.emptyList();
        } else if (numMasterStates > 0) {
            masterStates = new ArrayList<>(numMasterStates);
            for (int i = 0; i < numMasterStates; i++) {
                masterStates.add(deserializeMasterState(dis));
            }
        } else {
            throw new IOException("invalid number of master states: " + numMasterStates);
        }

        // third: operator states
        final int numTaskStates = dis.readInt();
        final List<OperatorState> operatorStates = new ArrayList<>(numTaskStates);

        for (int i = 0; i < numTaskStates; i++) {
            operatorStates.add(deserializeOperatorState(dis, context));
        }

        return new CheckpointMetadata(checkpointId, operatorStates, masterStates);
    }

参考资料

Flink 源码:Checkpoint 元数据详解

Flink 清理过期 Checkpoint 目录的正确姿势

Flink 源码:JM 端从 Checkpoint 恢复流程

Flink 源码:TM 端恢复及创建 OperatorState 的流程

Flink 源码:TM 端恢复及创建 KeyedState 的流程