Flink 延迟标记

Posted by danner on August 5, 2021

Flink 有四类元素在流转 StreamRecordWatermarkStreamStatusLatencyMarker。本文分析 LatencyMarker,延迟测量功能(event 从Source流到当前算子经过的时间,不包含event 处理的时间)。

先看LatencyMarker结构体

public final class LatencyMarker extends StreamElement {

    /** The time the latency mark is denoting. */
    private final long markedTime;

    private final OperatorID operatorId;

    private final int subtaskIndex;

只有三个属性:

  • markedTime:创建时携带的时间戳

  • operatorId:算子ID

  • subtaskIndex:算子并发实例(sub-task) 的Index

下面从LatencyMarker 流转过程分析

Source

LatencyMarker 从Source 节点周期性发出, metrics.latency.interval设置间隔时间。

// org.apache.flink.streaming.api.operators.StreamSource#run(java.lang.Object, org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer, org.apache.flink.streaming.api.operators.Output<org.apache.flink.streaming.runtime.streamrecord.StreamRecord<OUT>>, org.apache.flink.streaming.runtime.tasks.OperatorChain<?,?>)
...
    final long latencyTrackingInterval =
            getExecutionConfig().isLatencyTrackingConfigured()
                    ? getExecutionConfig().getLatencyTrackingInterval()
                    : configuration.getLong(MetricOptions.LATENCY_INTERVAL);

    LatencyMarksEmitter<OUT> latencyEmitter = null;
    if (latencyTrackingInterval > 0) {
        latencyEmitter =
                new LatencyMarksEmitter<>(
                        getProcessingTimeService(),
                        collector,
                        latencyTrackingInterval,
                        this.getOperatorID(),
                        getRuntimeContext().getIndexOfThisSubtask());
    }
...


// org.apache.flink.streaming.api.operators.StreamSource.LatencyMarksEmitter
public LatencyMarksEmitter(
        final ProcessingTimeService processingTimeService,
        final Output<StreamRecord<OUT>> output,
        long latencyTrackingInterval,
        final OperatorID operatorId,
        final int subtaskIndex) {

    latencyMarkTimer =
            processingTimeService.scheduleWithFixedDelay(
                    new ProcessingTimeCallback() {
                        @Override
                        public void onProcessingTime(long timestamp) throws Exception {
                            try {
                                // ProcessingTimeService callbacks are executed under the
                                // checkpointing lock
                                output.emitLatencyMarker(
                                        new LatencyMarker(
                                                processingTimeService
                                                        .getCurrentProcessingTime(),
                                                operatorId,
                                                subtaskIndex));
                            } catch (Throwable t) {
                                // we catch the Throwables here so that we don't trigger the
                                // processing
                                // timer services async exception handler
                                LOG.warn("Error while emitting latency marker.", t);
                            }
                        }
                    },
                    0L,
                    latencyTrackingInterval);
}

调用 processingTimeService 线程调度每隔 latencyTrackingInterval 时间,往下游发送 LatencyMarker。注意LatencyMarker 时间是当前机器的时间 -> processingTimeService .getCurrentProcessingTime()。

Operator

每个 Operator 会收到上游的LatencyMarker

// org.apache.flink.streaming.api.operators.AbstractStreamOperator
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
    // all operators are tracking latencies
    this.latencyStats.reportLatency(marker);

    // everything except sinks forwards latency markers
    this.output.emitLatencyMarker(marker);
}

做了两件事:

  • 延迟指标上报

  • LatencyMarker 继续往下游发

指标上报

看看延迟指标是如何计算的

// org.apache.flink.streaming.api.operators.AbstractStreamOperator
    final String configuredGranularity =
            taskManagerConfig.getString(MetricOptions.LATENCY_SOURCE_GRANULARITY);
    LatencyStats.Granularity granularity;
    try {
        granularity =
                LatencyStats.Granularity.valueOf(
                        configuredGranularity.toUpperCase(Locale.ROOT));
    } catch (IllegalArgumentException iae) {
        granularity = LatencyStats.Granularity.OPERATOR;
        LOG.warn(
                "Configured value {} option for {} is invalid. Defaulting to {}.",
                configuredGranularity,
                MetricOptions.LATENCY_SOURCE_GRANULARITY.key(),
                granularity);
    }
    TaskManagerJobMetricGroup jobMetricGroup = this.metrics.parent().parent();
    this.latencyStats =
            new LatencyStats(
                    jobMetricGroup.addGroup("latency"),
                    historySize,
                    container.getIndexInSubtaskGroup(),
                    getOperatorID(),
                    granularity);


// org.apache.flink.streaming.util.LatencyStats
public void reportLatency(LatencyMarker marker) {
    // 不同粒度返回的 uniqueName不同
    final String uniqueName =
            granularity.createUniqueHistogramName(marker, operatorId, subtaskIndex);

    DescriptiveStatisticsHistogram latencyHistogram = this.latencyStats.get(uniqueName);
    if (latencyHistogram == null) {
        latencyHistogram = new DescriptiveStatisticsHistogram(this.historySize);
        this.latencyStats.put(uniqueName, latencyHistogram);
        granularity
                .createSourceMetricGroups(metricGroup, marker, operatorId, subtaskIndex)
                .addGroup("operator_id", String.valueOf(operatorId))
                .addGroup("operator_subtask_index", String.valueOf(subtaskIndex))
                .histogram("latency", latencyHistogram);
    }

    long now = System.currentTimeMillis();
    latencyHistogram.update(now - marker.getMarkedTime());
}

根据 metrics.latency.granularity 分配延迟标记的粒度

  • single:每个算子单独统计延迟;

  • operator(默认值):每个下游算子都统计自己与Source算子之间的延迟;

  • subtask:每个下游算子的sub-task都统计自己与Source算子的sub-task之间的延迟。

一般情况下采用默认的operator粒度即可,这样在Sink端观察到的latency metric就是我们最想要的全链路(端到端)延迟,以下也是以该粒度讲解。subtask粒度太细,会增大所有并行度的负担,不建议使用。

上报延迟指标也很简单,处理 LatencyMarker 时间 - 此LatencyMarker生成的时间 -> LatencyMarker 从Source 流到此算子经过的时间。

下发

当有多个下游(并行度)时,只会随机选择一个下游发送,减少数据传输和性能损耗。

// everything except sinks forwards latency markers
this.output.emitLatencyMarker(marker);

// org.apache.flink.streaming.runtime.io.RecordWriterOutput
public void emitLatencyMarker(LatencyMarker latencyMarker) {
    serializationDelegate.setInstance(latencyMarker);

    try {
        recordWriter.randomEmit(serializationDelegate);
    } catch (Exception e) {
        throw new RuntimeException(e.getMessage(), e);
    }
}

// org.apache.flink.runtime.io.network.api.writer.RecordWriter
/** This is used to send LatencyMarks to a random target channel. */
public void randomEmit(T record) throws IOException {
    checkErroneous();

    int targetSubpartition = rng.nextInt(numberOfChannels);
    emit(record, targetSubpartition);
}

Sink

Sink 收到 LatencyMarker,只会上报延迟指标。

// org.apache.flink.streaming.api.operators.StreamSink
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
    // all operators are tracking latencies
    this.latencyStats.reportLatency(marker);

    // sinks don't forward latency markers
}

总结

  • LatencyMarker不参与window、MiniBatch的缓存计时以及计算,直接被中间Operator下发。

  • 每个中间Operator、以及Sink都会统计自己与Source节点的链路延迟,我们在监控页面,一般展示Source至Sink链路延迟

  • 延迟粒度细分到Task,可以用来排查哪台机器的Task时延偏高,进行对比和运维排查

  • 从实现原理来看,发送时延标记间隔配置大一些(例如30秒一次),一般不会影响系统处理业务数据的性能

    • 所有的StreamSource Task都按间隔发送时延标记,中间节点有多个输出通道的,随机选择一个通道下发,不会复制多份数据出来

参考资料

FLink全链路时延—测量方式 - it610.com

Flink链路延迟测量的LatencyMarker机制实现 - 简书