Flink 有四类元素在流转 StreamRecord
、Watermark
、StreamStatus
、LatencyMarker
。本文分析 LatencyMarker
,延迟测量功能(event 从Source流到当前算子经过的时间,不包含event 处理的时间)。
先看LatencyMarker
结构体
public final class LatencyMarker extends StreamElement {
/** The time the latency mark is denoting. */
private final long markedTime;
private final OperatorID operatorId;
private final int subtaskIndex;
只有三个属性:
-
markedTime:创建时携带的时间戳
-
operatorId:算子ID
-
subtaskIndex:算子并发实例(sub-task) 的Index
下面从LatencyMarker
流转过程分析
Source
LatencyMarker
从Source 节点周期性发出, metrics.latency.interval
设置间隔时间。
// org.apache.flink.streaming.api.operators.StreamSource#run(java.lang.Object, org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer, org.apache.flink.streaming.api.operators.Output<org.apache.flink.streaming.runtime.streamrecord.StreamRecord<OUT>>, org.apache.flink.streaming.runtime.tasks.OperatorChain<?,?>)
...
final long latencyTrackingInterval =
getExecutionConfig().isLatencyTrackingConfigured()
? getExecutionConfig().getLatencyTrackingInterval()
: configuration.getLong(MetricOptions.LATENCY_INTERVAL);
LatencyMarksEmitter<OUT> latencyEmitter = null;
if (latencyTrackingInterval > 0) {
latencyEmitter =
new LatencyMarksEmitter<>(
getProcessingTimeService(),
collector,
latencyTrackingInterval,
this.getOperatorID(),
getRuntimeContext().getIndexOfThisSubtask());
}
...
// org.apache.flink.streaming.api.operators.StreamSource.LatencyMarksEmitter
public LatencyMarksEmitter(
final ProcessingTimeService processingTimeService,
final Output<StreamRecord<OUT>> output,
long latencyTrackingInterval,
final OperatorID operatorId,
final int subtaskIndex) {
latencyMarkTimer =
processingTimeService.scheduleWithFixedDelay(
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
try {
// ProcessingTimeService callbacks are executed under the
// checkpointing lock
output.emitLatencyMarker(
new LatencyMarker(
processingTimeService
.getCurrentProcessingTime(),
operatorId,
subtaskIndex));
} catch (Throwable t) {
// we catch the Throwables here so that we don't trigger the
// processing
// timer services async exception handler
LOG.warn("Error while emitting latency marker.", t);
}
}
},
0L,
latencyTrackingInterval);
}
调用 processingTimeService 线程调度每隔 latencyTrackingInterval 时间,往下游发送 LatencyMarker
。注意LatencyMarker
时间是当前机器的时间 -> processingTimeService
.getCurrentProcessingTime()。
Operator
每个 Operator 会收到上游的LatencyMarker
。
// org.apache.flink.streaming.api.operators.AbstractStreamOperator
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
// all operators are tracking latencies
this.latencyStats.reportLatency(marker);
// everything except sinks forwards latency markers
this.output.emitLatencyMarker(marker);
}
做了两件事:
-
延迟指标上报
-
LatencyMarker
继续往下游发
指标上报
看看延迟指标是如何计算的
// org.apache.flink.streaming.api.operators.AbstractStreamOperator
final String configuredGranularity =
taskManagerConfig.getString(MetricOptions.LATENCY_SOURCE_GRANULARITY);
LatencyStats.Granularity granularity;
try {
granularity =
LatencyStats.Granularity.valueOf(
configuredGranularity.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException iae) {
granularity = LatencyStats.Granularity.OPERATOR;
LOG.warn(
"Configured value {} option for {} is invalid. Defaulting to {}.",
configuredGranularity,
MetricOptions.LATENCY_SOURCE_GRANULARITY.key(),
granularity);
}
TaskManagerJobMetricGroup jobMetricGroup = this.metrics.parent().parent();
this.latencyStats =
new LatencyStats(
jobMetricGroup.addGroup("latency"),
historySize,
container.getIndexInSubtaskGroup(),
getOperatorID(),
granularity);
// org.apache.flink.streaming.util.LatencyStats
public void reportLatency(LatencyMarker marker) {
// 不同粒度返回的 uniqueName不同
final String uniqueName =
granularity.createUniqueHistogramName(marker, operatorId, subtaskIndex);
DescriptiveStatisticsHistogram latencyHistogram = this.latencyStats.get(uniqueName);
if (latencyHistogram == null) {
latencyHistogram = new DescriptiveStatisticsHistogram(this.historySize);
this.latencyStats.put(uniqueName, latencyHistogram);
granularity
.createSourceMetricGroups(metricGroup, marker, operatorId, subtaskIndex)
.addGroup("operator_id", String.valueOf(operatorId))
.addGroup("operator_subtask_index", String.valueOf(subtaskIndex))
.histogram("latency", latencyHistogram);
}
long now = System.currentTimeMillis();
latencyHistogram.update(now - marker.getMarkedTime());
}
根据 metrics.latency.granularity
分配延迟标记的粒度
-
single:每个算子单独统计延迟;
-
operator(默认值):每个下游算子都统计自己与Source算子之间的延迟;
-
subtask:每个下游算子的sub-task都统计自己与Source算子的sub-task之间的延迟。
一般情况下采用默认的operator粒度即可,这样在Sink端观察到的latency metric就是我们最想要的全链路(端到端)延迟,以下也是以该粒度讲解。subtask粒度太细,会增大所有并行度的负担,不建议使用。
上报延迟指标也很简单,处理 LatencyMarker
时间 - 此LatencyMarker
生成的时间 -> LatencyMarker
从Source 流到此算子经过的时间。
下发
当有多个下游(并行度)时,只会随机选择一个下游发送,减少数据传输和性能损耗。
// everything except sinks forwards latency markers
this.output.emitLatencyMarker(marker);
// org.apache.flink.streaming.runtime.io.RecordWriterOutput
public void emitLatencyMarker(LatencyMarker latencyMarker) {
serializationDelegate.setInstance(latencyMarker);
try {
recordWriter.randomEmit(serializationDelegate);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
// org.apache.flink.runtime.io.network.api.writer.RecordWriter
/** This is used to send LatencyMarks to a random target channel. */
public void randomEmit(T record) throws IOException {
checkErroneous();
int targetSubpartition = rng.nextInt(numberOfChannels);
emit(record, targetSubpartition);
}
Sink
Sink 收到 LatencyMarker
,只会上报延迟指标。
// org.apache.flink.streaming.api.operators.StreamSink
protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
// all operators are tracking latencies
this.latencyStats.reportLatency(marker);
// sinks don't forward latency markers
}
总结
-
LatencyMarker不参与window、MiniBatch的缓存计时以及计算,直接被中间Operator下发。
-
每个中间Operator、以及Sink都会统计自己与Source节点的链路延迟,我们在监控页面,一般展示Source至Sink链路延迟
-
延迟粒度细分到Task,可以用来排查哪台机器的Task时延偏高,进行对比和运维排查
-
从实现原理来看,发送时延标记间隔配置大一些(例如30秒一次),一般不会影响系统处理业务数据的性能
- 所有的StreamSource Task都按间隔发送时延标记,中间节点有多个输出通道的,随机选择一个通道下发,不会复制多份数据出来