Flink SQL 之 Group By

Posted by danner on May 26, 2021

在之前的Flink 之 Retract 介绍回撤的重要性(订正之前的错误状态)。本文结合 group by 再次回顾下回撤在 Flink 中的应用。

CREATE TABLE table1 (
 name STRING,
 cnt int
) WITH (
'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = '127.0.0.1:9092',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'earliest-offset',
 'format' = 'canal-json'
)
         
select name
sum(cnt),
max(cnt)
from table1
group by name

使用 explainSql 打印执行计划,结合之前 Flink SQL 翻译过程,找到 group by 具体 ExecNode:StreamExecGroupAggregate。Transformation 有两种实现 GroupAggFunctionMiniBatchGroupAggFunction

流程

GroupAggFunction 为例

使用 group by 后按 key 分组存储数据(state),新来一条数据时,经过 state 计算后

  • acc 有值
    • 若之前无值(第一次计算),直接输出 INSERT
    • 若之前有值
      • state 设置不清除且计算后的 state 与之前相同,无输出直接返回
      • state 有设置清楚,先输出一条 UPDATE_BEFORE,再输出一条 UPDATE_AFTER
  • acc 无值,若之前是有值,则输出 DELETE(删除之前的数据)

state 如何计算呢?分为两种情况

  • event 是回撤状态,state 做 retract
  • 不是回撤状态, state 做 acc

计算的具体实现看下面分析。group by 的语义非常简单,这里的重点对 state 计算,state 是指那些数据呢?

代码生成

本案例中 state 是指 sum(cnt) 和max(cnt),当做 acc/retract 时,由一个聚合函数完成所有操作。在未执行前,我们无法确定聚合函数具体的操作是 sum or max or count or 组合 ,在执行时由动态代码生成。

public final class GroupAggsHandler implements org.apache.flink.table.runtime.generated.AggsHandleFunction {
    int agg0_sum;
    boolean agg0_sumIsNull;
    long agg0_count;
    boolean agg0_countIsNull;
    private transient org.apache.flink.table.planner.functions.aggfunctions.MaxWithRetractAggFunction function_org$apache$flink$table$planner$functions$aggfunctions$MaxWithRetractAggFunction$d78f624eeff2a86742b5f64899608448;
    private transient org.apache.flink.table.runtime.typeutils.ExternalSerializer externalSerializer$2;
    private transient org.apache.flink.table.runtime.typeutils.ExternalSerializer externalSerializer$3;
    private org.apache.flink.table.runtime.dataview.StateMapView agg1$map_dataview;
    private org.apache.flink.table.data.binary.BinaryRawValueData agg1$map_dataview_raw_value;
    private org.apache.flink.table.runtime.dataview.StateMapView agg1$map_dataview_backup;
    private org.apache.flink.table.data.binary.BinaryRawValueData agg1$map_dataview_backup_raw_value;
    long agg2_count1;
    boolean agg2_count1IsNull;
    private transient org.apache.flink.table.data.conversion.StructuredObjectConverter converter$4;
    org.apache.flink.table.data.GenericRowData acc$6 = new org.apache.flink.table.data.GenericRowData(4);
    org.apache.flink.table.data.GenericRowData acc$7 = new org.apache.flink.table.data.GenericRowData(4);
    org.apache.flink.table.data.UpdatableRowData field$11;
    private org.apache.flink.table.data.RowData agg1_acc_internal;
    private org.apache.flink.table.planner.functions.aggfunctions.MaxWithRetractAggFunction.MaxWithRetractAccumulator agg1_acc_external;
    org.apache.flink.table.data.GenericRowData aggValue$41 = new org.apache.flink.table.data.GenericRowData(2);
    private org.apache.flink.table.runtime.dataview.StateDataViewStore store;

    public GroupAggsHandler(java.lang.Object[] references) throws Exception {
        function_org$apache$flink$table$planner$functions$aggfunctions$MaxWithRetractAggFunction$d78f624eeff2a86742b5f64899608448 = (((org.apache.flink.table.planner.functions.aggfunctions.MaxWithRetractAggFunction) references[0]));
        externalSerializer$2 = (((org.apache.flink.table.runtime.typeutils.ExternalSerializer) references[1]));
        externalSerializer$3 = (((org.apache.flink.table.runtime.typeutils.ExternalSerializer) references[2]));
        converter$4 = (((org.apache.flink.table.data.conversion.StructuredObjectConverter) references[3]));
    }

    private org.apache.flink.api.common.functions.RuntimeContext getRuntimeContext() {
        return store.getRuntimeContext();
    }

    @Override
    public void open(org.apache.flink.table.runtime.dataview.StateDataViewStore store) throws Exception {
        this.store = store;

        function_org$apache$flink$table$planner$functions$aggfunctions$MaxWithRetractAggFunction$d78f624eeff2a86742b5f64899608448.open(new org.apache.flink.table.functions.FunctionContext(store.getRuntimeContext()));

        agg1$map_dataview = (org.apache.flink.table.runtime.dataview.StateMapView) store.getStateMapView("agg1$map", false, externalSerializer$2, externalSerializer$3);
        agg1$map_dataview_raw_value = org.apache.flink.table.data.binary.BinaryRawValueData.fromObject(agg1$map_dataview);

        agg1$map_dataview_backup = (org.apache.flink.table.runtime.dataview.StateMapView) store.getStateMapView("agg1$map", false, externalSerializer$2, externalSerializer$3);
        agg1$map_dataview_backup_raw_value = org.apache.flink.table.data.binary.BinaryRawValueData.fromObject(agg1$map_dataview_backup);

        converter$4.open(getRuntimeContext().getUserCodeClassLoader());

    }

    /**
     * 累加计算
     * @param accInput
     * @throws Exception
     */
    @Override
    public void accumulate(org.apache.flink.table.data.RowData accInput) throws Exception {

        int field$13;
        boolean isNull$13;
        boolean isNull$14;
        int result$15;
        boolean isNull$18;
        long result$19;
        boolean isNull$21;
        long result$22;
        isNull$13 = accInput.isNullAt(1);
        field$13 = -1;
        if (!isNull$13) {
            field$13 = accInput.getInt(1);
        }

        int result$17 = -1;
        boolean isNull$17;
        if (isNull$13) {

            isNull$17 = agg0_sumIsNull;
            if (!isNull$17) {
                result$17 = agg0_sum;
            }
        } else {
            int result$16 = -1;
            boolean isNull$16;
            if (agg0_sumIsNull) {
                // sum 之前为 null,input 直接赋值
                isNull$16 = isNull$13;
                if (!isNull$16) {
                    result$16 = field$13;
                }
            } else {
                // sum 之前不为 null,input + agg0_sum
                isNull$14 = agg0_sumIsNull || isNull$13;
                result$15 = -1;
                if (!isNull$14) {
                    result$15 = (int) (agg0_sum + field$13);
                }

                isNull$16 = isNull$14;
                if (!isNull$16) {
                    result$16 = result$15;
                }
            }
            // result$17 保存最终 sum 结果
            isNull$17 = isNull$16;
            if (!isNull$17) {
                result$17 = result$16;
            }

        }
        // 到此处 sum 计算结束
        agg0_sum = result$17;
        agg0_sumIsNull = isNull$17;

        long result$20 = -1L;
        boolean isNull$20;
        if (isNull$13) {

            isNull$20 = agg0_countIsNull;
            if (!isNull$20) {
                result$20 = agg0_count;
            }
        } else {
            isNull$18 = agg0_countIsNull || false;
            result$19 = -1L;
            if (!isNull$18) {
                result$19 = (long) (agg0_count + ((long) 1L));

            }
            isNull$20 = isNull$18;
            if (!isNull$20) {
                result$20 = result$19;
            }
        }
        // 计算 sum 已累加的个数
        agg0_count = result$20;
        agg0_countIsNull = isNull$20;
        // 调用 max的 accumulate
        function_org$apache$flink$table$planner$functions$aggfunctions$MaxWithRetractAggFunction$d78f624eeff2a86742b5f64899608448.accumulate(agg1_acc_external, isNull$13 ? null : ((java.lang.Integer) field$13));

        isNull$21 = agg2_count1IsNull || false;
        result$22 = -1L;
        if (!isNull$21) {
            result$22 = (long) (agg2_count1 + ((long) 1L));
        }

        agg2_count1 = result$22;
        agg2_count1IsNull = isNull$21;

    }

    /**
     * 回撤计算
     * @param retractInput
     * @throws Exception
     */
    @Override
    public void retract(org.apache.flink.table.data.RowData retractInput) throws Exception {

        int field$23;
        boolean isNull$23;
        boolean isNull$24;
        int result$25;
        boolean isNull$26;
        int result$27;
        boolean isNull$30;
        long result$31;
        boolean isNull$33;
        long result$34;
        isNull$23 = retractInput.isNullAt(1);
        field$23 = -1;
        if (!isNull$23) {
            field$23 = retractInput.getInt(1);
        }

        int result$29 = -1;
        boolean isNull$29;
        if (isNull$23) {
            isNull$29 = agg0_sumIsNull;
            if (!isNull$29) {
                result$29 = agg0_sum;
            }
        } else {
            int result$28 = -1;
            boolean isNull$28;
            if (agg0_sumIsNull) {
                isNull$24 = false || isNull$23;
                result$25 = -1;
                if (!isNull$24) {
                    result$25 = (int) (((int) 0) - field$23);
                }
                isNull$28 = isNull$24;
                if (!isNull$28) {
                    result$28 = result$25;
                }
            } else {
                isNull$26 = agg0_sumIsNull || isNull$23;
                result$27 = -1;
                if (!isNull$26) {
                    result$27 = (int) (agg0_sum - field$23);
                }
                isNull$28 = isNull$26;
                if (!isNull$28) {
                    result$28 = result$27;
                }
            }
            isNull$29 = isNull$28;
            if (!isNull$29) {
                result$29 = result$28;
            }
        }
        // agg_sum = 之前的agg0_sum - input 值
        agg0_sum = result$29;
        agg0_sumIsNull = isNull$29;

        long result$32 = -1L;
        boolean isNull$32;
        if (isNull$23) {
            isNull$32 = agg0_countIsNull;
            if (!isNull$32) {
                result$32 = agg0_count;
            }
        } else {
            isNull$30 = agg0_countIsNull || false;
            result$31 = -1L;
            if (!isNull$30) {
                result$31 = (long) (agg0_count - ((long) 1L));
            }
            isNull$32 = isNull$30;
            if (!isNull$32) {
                result$32 = result$31;
            }
        }
        // 若input 不为null,agg_count-1
        agg0_count = result$32;
        agg0_countIsNull = isNull$32;
        // max retract
        function_org$apache$flink$table$planner$functions$aggfunctions$MaxWithRetractAggFunction$d78f624eeff2a86742b5f64899608448.retract(agg1_acc_external, isNull$23 ? null : ((java.lang.Integer) field$23));

        isNull$33 = agg2_count1IsNull || false;
        result$34 = -1L;
        if (!isNull$33) {
            result$34 = (long) (agg2_count1 - ((long) 1L));
        }

        agg2_count1 = result$34;
        agg2_count1IsNull = isNull$33;
    }

    @Override
    public void merge(org.apache.flink.table.data.RowData otherAcc) throws Exception {
        throw new java.lang.RuntimeException("This function not require merge method, but the merge method is called.");
    }

    /**
     * acc 初始化 agg0_sum、agg0_count、agg1_acc_internal、agg2_count1
     * @param acc
     * @throws Exception
     */
    @Override
    public void setAccumulators(org.apache.flink.table.data.RowData acc) throws Exception {

        int field$8;
        boolean isNull$8;
        long field$9;
        boolean isNull$9;
        org.apache.flink.table.data.RowData field$10;
        boolean isNull$10;
        long field$12;
        boolean isNull$12;
        isNull$8 = acc.isNullAt(0);
        field$8 = -1;
        if (!isNull$8) {
            field$8 = acc.getInt(0);
        }
        isNull$9 = acc.isNullAt(1);
        field$9 = -1L;
        if (!isNull$9) {
            field$9 = acc.getLong(1);
        }
        isNull$12 = acc.isNullAt(3);
        field$12 = -1L;
        if (!isNull$12) {
            field$12 = acc.getLong(3);
        }

        isNull$10 = acc.isNullAt(2);
        field$10 = null;
        if (!isNull$10) {
            field$10 = acc.getRow(2, 3);
        }
        field$11 = null;
        if (!isNull$10) {
            field$11 = new org.apache.flink.table.data.UpdatableRowData(
                    field$10,
                    3);

            agg1$map_dataview_raw_value.setJavaObject(agg1$map_dataview);
            field$11.setField(2, agg1$map_dataview_raw_value);
        }

        agg0_sum = field$8;
        agg0_sumIsNull = isNull$8;

        agg0_count = field$9;
        agg0_countIsNull = isNull$9;

        agg1_acc_internal = field$11;
        agg1_acc_external = (org.apache.flink.table.planner.functions.aggfunctions.MaxWithRetractAggFunction.MaxWithRetractAccumulator) converter$4.toExternal((org.apache.flink.table.data.RowData) agg1_acc_internal);

        agg2_count1 = field$12;
        agg2_count1IsNull = isNull$12;
    }

    @Override
    public void resetAccumulators() throws Exception {

        agg0_sum = ((int) -1);
        agg0_sumIsNull = true;

        agg0_count = ((long) 0L);
        agg0_countIsNull = false;

        agg1_acc_external = (org.apache.flink.table.planner.functions.aggfunctions.MaxWithRetractAggFunction.MaxWithRetractAccumulator) function_org$apache$flink$table$planner$functions$aggfunctions$MaxWithRetractAggFunction$d78f624eeff2a86742b5f64899608448.createAccumulator();
        agg1_acc_internal = (org.apache.flink.table.data.RowData) converter$4.toInternalOrNull((org.apache.flink.table.planner.functions.aggfunctions.MaxWithRetractAggFunction.MaxWithRetractAccumulator) agg1_acc_external);

        agg2_count1 = ((long) 0L);
        agg2_count1IsNull = false;
    }

    /**
     * 返回 acc
     * @return
     * @throws Exception
     */
    @Override
    public org.apache.flink.table.data.RowData getAccumulators() throws Exception {

        acc$7 = new org.apache.flink.table.data.GenericRowData(4);

        if (agg0_sumIsNull) {
            acc$7.setField(0, null);
        } else {
            acc$7.setField(0, agg0_sum);
        }

        if (agg0_countIsNull) {
            acc$7.setField(1, null);
        } else {
            acc$7.setField(1, agg0_count);
        }

        agg1_acc_internal = (org.apache.flink.table.data.RowData) converter$4.toInternalOrNull((org.apache.flink.table.planner.functions.aggfunctions.MaxWithRetractAggFunction.MaxWithRetractAccumulator) agg1_acc_external);
        if (false) {
            acc$7.setField(2, null);
        } else {
            acc$7.setField(2, agg1_acc_internal);
        }

        if (agg2_count1IsNull) {
            acc$7.setField(3, null);
        } else {
            acc$7.setField(3, agg2_count1);
        }

        return acc$7;
    }

    /**
     * 创建
     *
     * @return
     * @throws Exception
     */
    @Override
    public org.apache.flink.table.data.RowData createAccumulators() throws Exception {

        acc$6 = new org.apache.flink.table.data.GenericRowData(4);

        if (true) {
            acc$6.setField(0, null);
        } else {
            acc$6.setField(0, ((int) -1));
        }

        if (false) {
            acc$6.setField(1, null);
        } else {
            acc$6.setField(1, ((long) 0L));
        }

        org.apache.flink.table.data.RowData acc_internal$5 = (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) converter$4.toInternalOrNull((org.apache.flink.table.planner.functions.aggfunctions.MaxWithRetractAggFunction.MaxWithRetractAccumulator) function_org$apache$flink$table$planner$functions$aggfunctions$MaxWithRetractAggFunction$d78f624eeff2a86742b5f64899608448.createAccumulator());
        if (false) {
            acc$6.setField(2, null);
        } else {
            acc$6.setField(2, acc_internal$5);
        }

        if (false) {
            acc$6.setField(3, null);
        } else {
            acc$6.setField(3, ((long) 0L));
        }

        return acc$6;
    }

    /**
     * 获取 agg
     * @return
     * @throws Exception
     */
    @Override
    public org.apache.flink.table.data.RowData getValue() throws Exception {

        boolean isNull$35;
        boolean result$36;

        aggValue$41 = new org.apache.flink.table.data.GenericRowData(2);

        isNull$35 = agg0_countIsNull || false;
        result$36 = false;
        if (!isNull$35) {
            result$36 = agg0_count == ((long) 0L);
        }

        int result$37 = -1;
        boolean isNull$37;
        if (result$36) {
            isNull$37 = true;
            if (!isNull$37) {
                result$37 = ((int) -1);
            }
        } else {
            isNull$37 = agg0_sumIsNull;
            if (!isNull$37) {
                result$37 = agg0_sum;
            }
        }
        if (isNull$37) {
            aggValue$41.setField(0, null);
        } else {
            aggValue$41.setField(0, result$37);
        }

        java.lang.Integer value_external$38 = (java.lang.Integer)
                function_org$apache$flink$table$planner$functions$aggfunctions$MaxWithRetractAggFunction$d78f624eeff2a86742b5f64899608448.getValue(agg1_acc_external);
        java.lang.Integer value_internal$39 =
                value_external$38;
        boolean valueIsNull$40 = value_internal$39 == null;

        if (valueIsNull$40) {
            aggValue$41.setField(1, null);
        } else {
            aggValue$41.setField(1, value_internal$39);
        }
        return aggValue$41;
    }

    @Override
    public void cleanup() throws Exception {
        agg1$map_dataview.clear();
    }

    @Override
    public void close() throws Exception {
        function_org$apache$flink$table$planner$functions$aggfunctions$MaxWithRetractAggFunction$d78f624eeff2a86742b5f64899608448.close();
    }
}
  • createAccumulators 函数:创建
    • group by 涉及到两个状态的维护:maxsum,合称为 aggState(内存中,每次调用都会初始化)
    • 创建包含四列的 RowData:每个状态维护2类值(具体作用下面介绍) -> MaxWithRetractAggFunction 也会调用自己的createAccumulators
  • setAccumulators 函数:初始化
    • 之前存在或为null 的 accState(flink state保存) 设置进 GroupAggsHandler,初始化 Handle 中 acc 值
    • Sum、agg0_count(sum 已累加的个数)、max、count(参与计算的数据个数) 四个状态,后续 getValue 时会用到
  • getValue 函数:获取之前的 agg 值(setAccumulators 刚初始化进去)
    • agg:Sum 和 max
  • accumulate 函数,累加:input RowData RowKind 是正
    • sum:加上 input 中的值
    • count:input 不为null ,+1
    • max:MaxWithRetractAggFunction.accumulate
    • count:+1
  • retract 函数:回撤操作,input RowData RowKind 是负
    • sum:减去 input 中的值
    • count:input 不为null ,-1
    • max:MaxWithRetractAggFunction.retract
    • count:-1
  • getAccumulators 函数:返回 acc
流程如下:创建、初始化、获取之前状态、累加/回撤、获取当前状态
	createAccumulators
	-> setAccumulators
		-> getValue
			-> accumulate
			-> retract
				-> getValue
				 -> getAccumulators

注意这里的 MaxWithRetractAggFunction ,是Flink 内置函数。计算 state 生成的动态代码中,没有包含Max代码而是直接调用 MaxWithRetractAggFunction。动态代码做的事情就是把state 中的每个部分(sum/count) 都组合起,变成完整的 state 去计算。

MiniBatch

Flink 是 event 触发,来一条计算一次,吞吐量肯定没有批处理好。Flink 提供 miniBatch 设置,将event 攒批后一起处理提升吞吐量(也提高了延迟)。

MiniBatchGroupAggFunction 相对于 GroupAggFunction 多了哪些操作呢?

"table.exec.mini-batch.enabled" = "true"         # 启用
"table.exec.mini-batch.allow-latency" = "5s"    # 缓存超时时长
"table.exec.mini-batch.size" = "5000"            # 缓存大小

以上是参数设置

Buffer

// org.apache.flink.table.runtime.operators.aggregate.MiniBatchGroupAggFunction
@Override
public List<RowData> addInput(@Nullable List<RowData> value, RowData input) throws Exception {
  List<RowData> bufferedRows = value;
  if (value == null) {
    bufferedRows = new ArrayList<>();
  }
  // input 缓存到 bufferedRows
  // input row maybe reused, we need deep copy here
  bufferedRows.add(inputRowSerializer.copy(input));
  return bufferedRows;
}

mini batch 肯定是有攒批的过程,event 输入时先放到 buffer 中,然后一起处理。可以减少与 State 交互,本来是一个event一次,现在是一批一次。

// org.apache.flink.table.runtime.operators.aggregate.MiniBatchGroupAggFunction#finishBundle
for (Map.Entry<RowData, List<RowData>> entry : buffer.entrySet()) {
  RowData currentKey = entry.getKey();
  List<RowData> inputRows = entry.getValue();
  ...
  ctx.setCurrentKey(currentKey);
  RowData acc = accState.value();
  if (acc == null) {
    Iterator<RowData> inputIter = inputRows.iterator();
    while (inputIter.hasNext()) {
      RowData current = inputIter.next();
      if (isRetractMsg(current)) {
        inputIter.remove(); // remove all the beginning retraction messages
      } else {
        break;
      }
    }
	...
  for (RowData input : inputRows) {
    if (isAccumulateMsg(input)) {
      function.accumulate(input);
    } else {
      function.retract(input);
    }
  }

finishBundle 去处理 buffer 中的 event,那什么时候触发呢?

Size

触发:满足大小/时间间隔

MiniBatchGroupAggFunction 函数是 KeyedMapBundleOperator 的 udf。

算子包含 function,相同的道理 KeyedProcessOperator 包含着 GroupAggFunction

event -> Operator -> Function

// org.apache.flink.table.runtime.operators.bundle.AbstractMapBundleOperator#processElement
public void processElement(StreamRecord<IN> element) throws Exception {
    // get the key and value for the map bundle
    final IN input = element.getValue();
    final K bundleKey = getKey(input);
    final V bundleValue = bundle.get(bundleKey);

    // get a new value after adding this element to bundle
    // 保存新的event 到buffer
    final V newBundleValue = function.addInput(bundleValue, input);

    // update to map bundle
    bundle.put(bundleKey, newBundleValue);
    // 计数
    numOfElements++;
    // 计数器是否满足条件可以触发,满足size 就会调用下面的finishBundle 函数
    // 详细原理查看 CountBundleTrigger
    bundleTrigger.onElement(input);
}


// org.apache.flink.table.runtime.operators.bundle.AbstractMapBundleOperator#finishBundle
public void finishBundle() throws Exception {
    if (!bundle.isEmpty()) {
        // 清除event 计数
        numOfElements = 0;
        // 调用finishBundle 处理events
        function.finishBundle(bundle, collector);
        // 清空event 缓存
        bundle.clear();
    }
    // 计时复位
    bundleTrigger.reset();
}
// watermark(时间间隔) 也会触发 finishBundle
public void processWatermark(Watermark mark) throws Exception {
    finishBundle();
    super.processWatermark(mark);
}

间隔

按上面的案例,每5s 也会触发一次计算。

启用 miniBatch后,会在Source 后面多加一个 MiniBatchAssigner 算子,对应的物理节点 StreamExecMiniBatchAssigner

{
    "id" : 2,
    "type" : "MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])",
    "pact" : "Operator",
    "contents" : "MiniBatchAssigner(interval=[5000ms], mode=[ProcTime])",
    "parallelism" : 8,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }

Code:

// org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMiniBatchAssigner#translateToPlanInternal
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
    final Transformation<RowData> inputTransform =
            (Transformation<RowData>) getInputEdges().get(0).translateToPlan(planner);

    final OneInputStreamOperator<RowData, RowData> operator;
    if (miniBatchInterval.getMode() == MiniBatchMode.ProcTime) {
        // 处理时间语义,本案例是 ProcTime
        operator = new ProcTimeMiniBatchAssignerOperator(miniBatchInterval.getInterval());
    } else if (miniBatchInterval.getMode() == MiniBatchMode.RowTime) {
        // 事件时间语义
        operator = new RowTimeMiniBatchAssginerOperator(miniBatchInterval.getInterval());
    } else {
        throw new TableException(
                String.format(
                        "MiniBatchAssigner shouldn't be in %s mode this is a bug, please file an issue.",
                        miniBatchInterval.getMode()));
    }
    return new OneInputTransformation<>(
            inputTransform,
            getDescription(),
            operator,
            InternalTypeInfo.of(getOutputType()),
            inputTransform.getParallelism());
}

// org.apache.flink.table.runtime.operators.wmassigners.ProcTimeMiniBatchAssignerOperator
@Override
public void processElement(StreamRecord<RowData> element) throws Exception {
    long now = getProcessingTimeService().getCurrentProcessingTime();
    long currentBatch = now - now % intervalMs;
    if (currentBatch > currentWatermark) {
        currentWatermark = currentBatch;
        // emit
        output.emitWatermark(new Watermark(currentBatch));
    }
    output.collect(element);
}

@Override
public void onProcessingTime(long timestamp) throws Exception {
    long now = getProcessingTimeService().getCurrentProcessingTime();
    long currentBatch = now - now % intervalMs;
    if (currentBatch > currentWatermark) {
        currentWatermark = currentBatch;
        // emit
        output.emitWatermark(new Watermark(currentBatch));
    }
    getProcessingTimeService().registerTimer(currentBatch + intervalMs, this);
}

intervalMs 就往下游发送 watermark,下游收到 watermark 就会触发 finishBundle 计算。

watermark 是不区分处理时间、事件时间,只是一般情况下我们用 watermark 是为了处理乱序的数据,所以都是使用事件时间。

// org.apache.flink.table.runtime.operators.wmassigners.RowTimeMiniBatchAssginerOperator
@Override
public void processWatermark(Watermark mark) throws Exception {
    // if we receive a Long.MAX_VALUE watermark we forward it since it is used
    // to signal the end of input and to not block watermark progress downstream
    if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
        currentWatermark = Long.MAX_VALUE;
        output.emitWatermark(mark);
        return;
    }

    currentWatermark = Math.max(currentWatermark, mark.getTimestamp());
    if (currentWatermark >= nextWatermark) {
        advanceWatermark();
    }
}

private void advanceWatermark() {
    output.emitWatermark(new Watermark(currentWatermark));
    long start = getMiniBatchStart(currentWatermark, minibatchInterval);
    long end = start + minibatchInterval - 1;
    nextWatermark = end > currentWatermark ? end : end + minibatchInterval;
}

逻辑类似,上游传输的 watermark >= 当前的watermark + 时间间隔,就往下游发送新的 watermark。