Flink 定时器

Posted by danner on December 20, 2020

Flink1.12

定时器在 Flink 中是很常用的功能,在很多地方都会用到:Window、Interval Join、状态清理 …。下面以 Window 案例来分析定时器机制。Window 中的定时器服务的实现是 InternalTimerServiceImpl。在一个算子中可以有多个 InternalTimerService,由 InternalTimeServiceManagerImpl 来管理这些 TimerService。

InternalTimeServiceManager

// WindowOperator 新建 InternalTimerService
// internalTimerService = getInternalTimerService("window-timers", windowSerializer, this);

// org.apache.flink.streaming.api.operators.AbstractStreamOperator
public <K, N> InternalTimerService<N> getInternalTimerService(
      String name, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {
  // InternalTimeServiceManagerImpl
  InternalTimeServiceManager<K> keyedTimeServiceHandler =
          (InternalTimeServiceManager<K>) timeServiceManager;
  // 生产一般为 rocksdb
  KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend();
  checkState(keyedStateBackend != null, "Timers can only be used on keyed operators.");
  return keyedTimeServiceHandler.getInternalTimerService(
          name, keyedStateBackend.getKeySerializer(), namespaceSerializer, triggerable);
}

// org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl
private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;

public <N> InternalTimerService<N> getInternalTimerService(
      String name,
      TypeSerializer<K> keySerializer,
      TypeSerializer<N> namespaceSerializer,
      Triggerable<K, N> triggerable) {
  checkNotNull(keySerializer, "Timers can only be used on keyed operators.");

  // the following casting is to overcome type restrictions.
  TimerSerializer<K, N> timerSerializer =
          new TimerSerializer<>(keySerializer, namespaceSerializer);

  InternalTimerServiceImpl<K, N> timerService =
          registerOrGetTimerService(name, timerSerializer);
  // 启动 timerService
  timerService.startTimerService(
          timerSerializer.getKeySerializer(),
          timerSerializer.getNamespaceSerializer(),
          triggerable);

  return timerService;
}
<N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(
        String name, TimerSerializer<K, N> timerSerializer) {
    InternalTimerServiceImpl<K, N> timerService =
            (InternalTimerServiceImpl<K, N>) timerServices.get(name);
    // 根据 name 查找,若无新建
    if (timerService == null) {
        // InternalTimerService 包含两个优先队列:事件时间和处理时间
        timerService =
                new InternalTimerServiceImpl<>(
                        localKeyGroupRange,
                        keyContext,
                        processingTimeService,
                        createTimerPriorityQueue(
                                PROCESSING_TIMER_PREFIX + name, timerSerializer),
                        createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer));
        // 保存
        timerServices.put(name, timerService);
    }
    return timerService;
}
// 触发 TimerService 的 watermark 判断
public void advanceWatermark(Watermark watermark) throws Exception {
  for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
      service.advanceWatermark(watermark.getTimestamp());
  }
}

InternalTimeServiceManagerImplMap结构体保存 TimerService。

InternalTimerService

在新建 InternalTimerService 时,传入很多参数,仔细分析这些参数用途。

// org.apache.flink.streaming.api.operators.InternalTimerServiceImpl
InternalTimerServiceImpl(
      KeyGroupRange localKeyGroupRange,
      KeyContext keyContext,
      ProcessingTimeService processingTimeService,
      KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue,
      KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue) {
  
  this.keyContext = checkNotNull(keyContext);
  this.processingTimeService = checkNotNull(processingTimeService);
  this.localKeyGroupRange = checkNotNull(localKeyGroupRange);
  // Flink 自身实现的 优先队列
  // 终点关注 eventTime
  this.processingTimeTimersQueue = checkNotNull(processingTimeTimersQueue);
  this.eventTimeTimersQueue = checkNotNull(eventTimeTimersQueue);

  // find the starting index of the local key-group range
  int startIdx = Integer.MAX_VALUE;
  for (Integer keyGroupIdx : localKeyGroupRange) {
      startIdx = Math.min(keyGroupIdx, startIdx);
  }
  this.localKeyGroupRangeStartIdx = startIdx;
}
// 注册 定时器,加入 eventTimeTimersQueue 即可
public void registerEventTimeTimer(N namespace, long time) {
  // 记录 时间戳,key,namespace
  eventTimeTimersQueue.add(
          new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
// Watermark 发生变化时,回调
public void advanceWatermark(long time) throws Exception {
  currentWatermark = time;
  InternalTimer<K, N> timer;
  // 依次判断队列中满足条件的定时器,触发
  while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
      eventTimeTimersQueue.poll();
      keyContext.setCurrentKey(timer.getKey());
      // 回调 onEventTime 函数
      triggerTarget.onEventTime(timer);
  }
}

TimeService 有专门的队列记录注册的定时器,定时器包含自身标识的信息:key、namespace,这样才回调时才能找到具体 event。当 Watermark 发生变化时,由 advanceWatermark 函数依次判断从队列中取出满足条件的定时器。

Window

注册

// triggerContext.onElement(element)
public TriggerResult onElement(StreamRecord<IN> element) throws Exception {
  return trigger.onElement(element.getValue(), element.getTimestamp(), window, this);
}
TriggerContext.registerEventTimeTimer(window.maxTimestamp());

public void registerEventTimeTimer(long time) {
  internalTimerService.registerEventTimeTimer(window, time);
}
public void registerEventTimeTimer(N namespace, long time) {
  eventTimeTimersQueue.add(
          new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
// Timer 包含时间戳,key,Window
public TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) {
  this.timestamp = timestamp;
  this.key = key;
  this.namespace = namespace;
  this.timerHeapIndex = NOT_CONTAINED;
}

回调

// triggerTarget.onEventTime(timer);
public void onEventTime(InternalTimer<K, W> timer) throws Exception {
  // 从 Timer 中获取 Window
  triggerContext.key = timer.getKey();
  triggerContext.window = timer.getNamespace();
  ...
  triggerContext.onEventTime(timer.getTimestamp());
  ...
}
public TriggerResult onEventTime(long time) throws Exception {
  // 传入 Window
  return trigger.onEventTime(time, window, this);
}

在 Window 的案例中,Timer 以 window 为标识(namespace),在回调时区分接下来处理哪个 window 数据。

PriorityQueue

Watermark 发生变化时,需要判断队列中满足条件 Timer。最朴素的想法就是按时间戳排序,最早的在前面(Flink 也正是这么做的)。这里 Flink 重新设计了优先队列,但存储和 Statebackend 相关,生产上一般是使用 RocksDBCachingPriorityQueueSetRocksDBCachingPriorityQueueSet 采用 Cache + RocksDB,前 N 个元素会在 Cache 中缓存一份,以减少与 RocksDB 交互;RocksDB 是存全量的数据,写入 Cache 后紧接着写 RocksDB。

// org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet
// 在 rocksDB 的基础上实现优先级队列,底层是 TreeSet。前 N 个元素会内存中缓存一份,这样可以减少与 rocksDB 交互。
// 队列新增元素
public boolean add(@Nonnull E toAdd) {
  // 序列化:时间戳、key、Window
  final byte[] toAddBytes = serializeElement(toAdd);
  // 内存中 Cache 是否已用完
  final boolean cacheFull = orderedCache.isFull();

  if ((!cacheFull && allElementsInCache)
          || LEXICOGRAPHIC_BYTE_COMPARATOR.compare(toAddBytes, orderedCache.peekLast()) < 0) {
      if (cacheFull) {
          // Cache 用完了
          // 移除最后一个元素,Cache 空出位置
          orderedCache.pollLast();
          allElementsInCache = false;
      }
      // 新元素添加到 Cache
      if (orderedCache.add(toAddBytes)) {
          // 同时也添加到 rocksDB
          addToRocksDB(toAddBytes);
          if (toAddBytes == orderedCache.peekFirst()) {
              peekCache = null;
              return true;
          }
      }
  } else {
      // Cache 放不下,直接写 rocksDB
      addToRocksDB(toAddBytes);
      allElementsInCache = false;
  }
  return false;
}
// org.apache.flink.streaming.api.operators.TimerSerializer
// Timer 序列化,按时间戳、key、Window 去重
public void serialize(TimerHeapInternalTimer<K, N> record, DataOutputView target)
      throws IOException {
  target.writeLong(MathUtils.flipSignBit(record.getTimestamp()));
  keySerializer.serialize(record.getKey(), target);
  namespaceSerializer.serialize(record.getNamespace(), target);
}

// 查看第一个元素
public E peek() {
  if (peekCache != null) {
      return peekCache;
  }
  // 从 Cache 中查看,并保留元素,在 poll 时返回
  byte[] firstBytes = orderedCache.peekFirst();
  if (firstBytes != null) {
      peekCache = deserializeElement(firstBytes);
      return peekCache;
  } else {
      return null;
  }
}
// 移除第一个元素
public E poll() {
  final byte[] firstBytes = orderedCache.pollFirst();

  if (firstBytes == null) {
      return null;
  }
  // 移除时,rocksDB 中同时进行
  removeFromRocksDB(firstBytes);

  if (orderedCache.isEmpty()) {
      seekHint = firstBytes;
  }

  if (peekCache != null) {
      E fromCache = peekCache;
      peekCache = null;
      return fromCache;
  } else {
      return deserializeElement(firstBytes);
  }
}