每次Checkpoint 完成后,会生成 _metadata
文件,记录Checkpoint元数据信息。
文件对应到代码是 CheckpointMetadata
类。
// org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata
/** The metadata of a snapshot (checkpoint or savepoint). */
public class CheckpointMetadata implements Disposable {
/** The checkpoint ID. */
private final long checkpointId;
/** The operator states. */
private final Collection<OperatorState> operatorStates;
/** The states generated by the CheckpointCoordinator. */
private final Collection<MasterState> masterStates;
只有三个属性:
-
checkpointID
-
operatorStates:算子状态
-
masterStates:CheckpointCoordinator 产生的状态
operatorStates
是重点,记录每个算子以及对应状态的文件。
OperatorState
算子状态
// org.apache.flink.runtime.checkpoint.OperatorState
public class OperatorState implements CompositeStateHandle {
private static final long serialVersionUID = -4845578005863201810L;
/** The id of the operator. */
private final OperatorID operatorID;
/** The handles to states created by the parallel tasks: subtaskIndex -> subtaskstate. */
private final Map<Integer, OperatorSubtaskState> operatorSubtaskStates;
/** The state of the operator coordinator. Null, if no such state exists. */
@Nullable private ByteStreamStateHandle coordinatorState;
/** The parallelism of the operator when it was checkpointed. */
private final int parallelism;
/**
* The maximum parallelism (for number of keygroups) of the operator when the job was first
* created.
*/
private final int maxParallelism;
-
operatorSubtaskStates
:-
当前算子并行度index -> subtask state
-
OperatorState 表示一个算子级别的,OperatorSubtaskState 是 subtask 级别的
-
如果一个算子有 10 个并行度,那么 OperatorState 有 10 个 OperatorSubtaskState
-
-
coordinatorState
-
parallelism
:此checkpoint保存时,任务的并行度(可能与恢复任务时并行度不同 - 从checkpoint重启并修改并行度) -
maxParallelism
:算子最大并行度;若重启后设置的最大并行度大于checkpoint中的最大并行度,则无法从状态恢复 - 涉及到 keyGroup 和状态的分配问题(看参考资料)。
OperatorSubtaskState
OperatorSubtaskState
才是保存状态的基本单元。
// org.apache.flink.runtime.checkpoint.OperatorSubtaskState
public class OperatorSubtaskState implements CompositeStateHandle {
private final StateObjectCollection<OperatorStateHandle> managedOperatorState;
private final StateObjectCollection<OperatorStateHandle> rawOperatorState;
private final StateObjectCollection<KeyedStateHandle> managedKeyedState;
private final StateObjectCollection<KeyedStateHandle> rawKeyedState;
状态根据托管类型分为两种:managed
和 raw
。raw
还没使用过,这里只分析managed
。
-
managedOperatorState
:operator 状态 - Kafka offset-
OperatorStateHandle
-
stateNameToPartitionOffsets:状态名称 -> (状态分布模式,状态在文件中地址)
-
StreamStateHandle
:状态文件句柄 - 文件地址
-
-
-
managedKeyedState
:Key状态 - key后的算子-
KeyedStateHandle
-
StreamStateHandle
:状态文件句柄 - 文件地址 -
KeyGroupRangeOffsets
-
KeyGroupRange:当前subTask 负责的 keyGroup 范围
-
offsets:每个keyGroup 状态对应到 state文件的开始地址
-
-
-
反序列化
_metadata
文件转成 CheckpointMetadata
,肯定是涉及到反序列化。
// org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase
/* <p>Basic checkpoint metadata layout:
*
* <pre>
* +--------------+---------------+-----------------+
* | checkpointID | master states | operator states |
* +--------------+---------------+-----------------+
*
* Master state:
* +--------------+---------------------+---------+------+---------------+
* | magic number | num remaining bytes | version | name | payload bytes |
* +--------------+---------------------+---------+------+---------------+
* </pre>
*/
public abstract class MetadataV2V3SerializerBase {
protected CheckpointMetadata deserializeMetadata(
DataInputStream dis, @Nullable String externalPointer) throws IOException {
final DeserializationContext context =
externalPointer == null ? null : new DeserializationContext(externalPointer);
// first: checkpoint ID
final long checkpointId = dis.readLong();
if (checkpointId < 0) {
throw new IOException("invalid checkpoint ID: " + checkpointId);
}
// second: master state
final List<MasterState> masterStates;
final int numMasterStates = dis.readInt();
if (numMasterStates == 0) {
masterStates = Collections.emptyList();
} else if (numMasterStates > 0) {
masterStates = new ArrayList<>(numMasterStates);
for (int i = 0; i < numMasterStates; i++) {
masterStates.add(deserializeMasterState(dis));
}
} else {
throw new IOException("invalid number of master states: " + numMasterStates);
}
// third: operator states
final int numTaskStates = dis.readInt();
final List<OperatorState> operatorStates = new ArrayList<>(numTaskStates);
for (int i = 0; i < numTaskStates; i++) {
operatorStates.add(deserializeOperatorState(dis, context));
}
return new CheckpointMetadata(checkpointId, operatorStates, masterStates);
}
参考资料
Flink 源码:JM 端从 Checkpoint 恢复流程