Flink1.12
定时器在 Flink 中是很常用的功能,在很多地方都会用到:Window、Interval Join、状态清理 …。下面以 Window 案例来分析定时器机制。Window 中的定时器服务的实现是 InternalTimerServiceImpl
。在一个算子中可以有多个 InternalTimerService
,由 InternalTimeServiceManagerImpl
来管理这些 TimerService。
InternalTimeServiceManager
// WindowOperator 新建 InternalTimerService
// internalTimerService = getInternalTimerService("window-timers", windowSerializer, this);
// org.apache.flink.streaming.api.operators.AbstractStreamOperator
public <K, N> InternalTimerService<N> getInternalTimerService(
String name, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {
// InternalTimeServiceManagerImpl
InternalTimeServiceManager<K> keyedTimeServiceHandler =
(InternalTimeServiceManager<K>) timeServiceManager;
// 生产一般为 rocksdb
KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend();
checkState(keyedStateBackend != null, "Timers can only be used on keyed operators.");
return keyedTimeServiceHandler.getInternalTimerService(
name, keyedStateBackend.getKeySerializer(), namespaceSerializer, triggerable);
}
// org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl
private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;
public <N> InternalTimerService<N> getInternalTimerService(
String name,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
Triggerable<K, N> triggerable) {
checkNotNull(keySerializer, "Timers can only be used on keyed operators.");
// the following casting is to overcome type restrictions.
TimerSerializer<K, N> timerSerializer =
new TimerSerializer<>(keySerializer, namespaceSerializer);
InternalTimerServiceImpl<K, N> timerService =
registerOrGetTimerService(name, timerSerializer);
// 启动 timerService
timerService.startTimerService(
timerSerializer.getKeySerializer(),
timerSerializer.getNamespaceSerializer(),
triggerable);
return timerService;
}
<N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(
String name, TimerSerializer<K, N> timerSerializer) {
InternalTimerServiceImpl<K, N> timerService =
(InternalTimerServiceImpl<K, N>) timerServices.get(name);
// 根据 name 查找,若无新建
if (timerService == null) {
// InternalTimerService 包含两个优先队列:事件时间和处理时间
timerService =
new InternalTimerServiceImpl<>(
localKeyGroupRange,
keyContext,
processingTimeService,
createTimerPriorityQueue(
PROCESSING_TIMER_PREFIX + name, timerSerializer),
createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer));
// 保存
timerServices.put(name, timerService);
}
return timerService;
}
// 触发 TimerService 的 watermark 判断
public void advanceWatermark(Watermark watermark) throws Exception {
for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
service.advanceWatermark(watermark.getTimestamp());
}
}
InternalTimeServiceManagerImpl
以 Map结构体保存 TimerService。
InternalTimerService
在新建 InternalTimerService
时,传入很多参数,仔细分析这些参数用途。
// org.apache.flink.streaming.api.operators.InternalTimerServiceImpl
InternalTimerServiceImpl(
KeyGroupRange localKeyGroupRange,
KeyContext keyContext,
ProcessingTimeService processingTimeService,
KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue,
KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue) {
this.keyContext = checkNotNull(keyContext);
this.processingTimeService = checkNotNull(processingTimeService);
this.localKeyGroupRange = checkNotNull(localKeyGroupRange);
// Flink 自身实现的 优先队列
// 终点关注 eventTime
this.processingTimeTimersQueue = checkNotNull(processingTimeTimersQueue);
this.eventTimeTimersQueue = checkNotNull(eventTimeTimersQueue);
// find the starting index of the local key-group range
int startIdx = Integer.MAX_VALUE;
for (Integer keyGroupIdx : localKeyGroupRange) {
startIdx = Math.min(keyGroupIdx, startIdx);
}
this.localKeyGroupRangeStartIdx = startIdx;
}
// 注册 定时器,加入 eventTimeTimersQueue 即可
public void registerEventTimeTimer(N namespace, long time) {
// 记录 时间戳,key,namespace
eventTimeTimersQueue.add(
new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
// Watermark 发生变化时,回调
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;
InternalTimer<K, N> timer;
// 依次判断队列中满足条件的定时器,触发
while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
eventTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
// 回调 onEventTime 函数
triggerTarget.onEventTime(timer);
}
}
TimeService 有专门的队列记录注册的定时器,定时器包含自身标识的信息:key、namespace,这样才回调时才能找到具体 event。当 Watermark 发生变化时,由 advanceWatermark
函数依次判断从队列中取出满足条件的定时器。
Window
注册
// triggerContext.onElement(element)
public TriggerResult onElement(StreamRecord<IN> element) throws Exception {
return trigger.onElement(element.getValue(), element.getTimestamp(), window, this);
}
TriggerContext.registerEventTimeTimer(window.maxTimestamp());
public void registerEventTimeTimer(long time) {
internalTimerService.registerEventTimeTimer(window, time);
}
public void registerEventTimeTimer(N namespace, long time) {
eventTimeTimersQueue.add(
new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
// Timer 包含时间戳,key,Window
public TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) {
this.timestamp = timestamp;
this.key = key;
this.namespace = namespace;
this.timerHeapIndex = NOT_CONTAINED;
}
回调
// triggerTarget.onEventTime(timer);
public void onEventTime(InternalTimer<K, W> timer) throws Exception {
// 从 Timer 中获取 Window
triggerContext.key = timer.getKey();
triggerContext.window = timer.getNamespace();
...
triggerContext.onEventTime(timer.getTimestamp());
...
}
public TriggerResult onEventTime(long time) throws Exception {
// 传入 Window
return trigger.onEventTime(time, window, this);
}
在 Window 的案例中,Timer 以 window 为标识(namespace),在回调时区分接下来处理哪个 window 数据。
PriorityQueue
Watermark 发生变化时,需要判断队列中满足条件 Timer。最朴素的想法就是按时间戳排序,最早的在前面(Flink 也正是这么做的)。这里 Flink 重新设计了优先队列,但存储和 Statebackend 相关,生产上一般是使用 RocksDBCachingPriorityQueueSet
。RocksDBCachingPriorityQueueSet
采用 Cache + RocksDB,前 N 个元素会在 Cache 中缓存一份,以减少与 RocksDB 交互;RocksDB 是存全量的数据,写入 Cache 后紧接着写 RocksDB。
// org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet
// 在 rocksDB 的基础上实现优先级队列,底层是 TreeSet。前 N 个元素会内存中缓存一份,这样可以减少与 rocksDB 交互。
// 队列新增元素
public boolean add(@Nonnull E toAdd) {
// 序列化:时间戳、key、Window
final byte[] toAddBytes = serializeElement(toAdd);
// 内存中 Cache 是否已用完
final boolean cacheFull = orderedCache.isFull();
if ((!cacheFull && allElementsInCache)
|| LEXICOGRAPHIC_BYTE_COMPARATOR.compare(toAddBytes, orderedCache.peekLast()) < 0) {
if (cacheFull) {
// Cache 用完了
// 移除最后一个元素,Cache 空出位置
orderedCache.pollLast();
allElementsInCache = false;
}
// 新元素添加到 Cache
if (orderedCache.add(toAddBytes)) {
// 同时也添加到 rocksDB
addToRocksDB(toAddBytes);
if (toAddBytes == orderedCache.peekFirst()) {
peekCache = null;
return true;
}
}
} else {
// Cache 放不下,直接写 rocksDB
addToRocksDB(toAddBytes);
allElementsInCache = false;
}
return false;
}
// org.apache.flink.streaming.api.operators.TimerSerializer
// Timer 序列化,按时间戳、key、Window 去重
public void serialize(TimerHeapInternalTimer<K, N> record, DataOutputView target)
throws IOException {
target.writeLong(MathUtils.flipSignBit(record.getTimestamp()));
keySerializer.serialize(record.getKey(), target);
namespaceSerializer.serialize(record.getNamespace(), target);
}
// 查看第一个元素
public E peek() {
if (peekCache != null) {
return peekCache;
}
// 从 Cache 中查看,并保留元素,在 poll 时返回
byte[] firstBytes = orderedCache.peekFirst();
if (firstBytes != null) {
peekCache = deserializeElement(firstBytes);
return peekCache;
} else {
return null;
}
}
// 移除第一个元素
public E poll() {
final byte[] firstBytes = orderedCache.pollFirst();
if (firstBytes == null) {
return null;
}
// 移除时,rocksDB 中同时进行
removeFromRocksDB(firstBytes);
if (orderedCache.isEmpty()) {
seekHint = firstBytes;
}
if (peekCache != null) {
E fromCache = peekCache;
peekCache = null;
return fromCache;
} else {
return deserializeElement(firstBytes);
}
}