Flink SQL 执行流程(二)

Posted by danner on December 13, 2020

Flink 1.12.1

Flink SQL 解析和执行流程如下,本节剖析 Operation -> Transformation 的具体流程。

在上一节Flink SQL 执行流程(一) 将 SQL 解析成 Operation 树,本文关注逻辑计划树 -> 物理计划树 -> Transformation。

接着上一节返回 CatalogSinkModifyOperation(内部包含PlannerQueryOperation) 继续往下走。

//org.apache.flink.table.sqlexec.SqlToOperationConverter#convertSqlInsert
  return new CatalogSinkModifyOperation(
          identifier,
          query,
          insert.getStaticPartitionKVs(),
          insert.isOverwrite(),
          Collections.emptyMap());
// org.apache.flink.table.api.internal.TableEnvironmentImpl#executeSql
return executeOperation(operations.get(0));
// org.apache.flink.table.api.internal.TableEnvironmentImpl#executeOperation
private TableResult executeOperation(Operation operation) {
  if (operation instanceof ModifyOperation) {
      // insert 操作,还需要优化
      return executeInternal(Collections.singletonList((ModifyOperation) operation));
    
public TableResult executeInternal(List<ModifyOperation> operations) {
  // 重点:Operation -> Transformation
  List<Transformation<?>> transformations = translate(operations);
  // insert 表名
  List<String> sinkIdentifierNames = extractSinkIdentifierNames(operations);
  String jobName = getJobName("insert-into_" + String.join(",", sinkIdentifierNames));
  // org.apache.flink.table.planner.delegation.StreamExecutor#createPipeline
  Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName);
  try {
      JobClient jobClient = execEnv.executeAsync(pipeline);
      TableSchema.Builder builder = TableSchema.builder();
      Object[] affectedRowCounts = new Long[operations.size()];
      for (int i = 0; i < operations.size(); ++i) {
          // use sink identifier name as field name
          builder.field(sinkIdentifierNames.get(i), DataTypes.BIGINT());
          affectedRowCounts[i] = -1L;
      }

      return TableResultImpl.builder()
              .jobClient(jobClient)
              .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
              .tableSchema(builder.build())
              .data(
                      new InsertResultIterator(
                              jobClient, Row.of(affectedRowCounts), userClassLoader))
              .build();
  } catch (Exception e) {
      throw new TableException("Failed to execute sql", e);
  }
}
  • Operation -> Transformation
  • sinkIdentifierNames:要插入数据的表全路径
  • jobName:insert-into_表全路径
  • Transformation -> StreamGraph
  • executeAsync:异步提交任务,内部流程与 DataStream 流程相同先转换为 JobGraph、上传jar 和配置文件、启动 Yarn AppMaster

显然这就是 Flink 任务的提交流程,本文关注的是第一步(客户端提交流程之前的文章已介绍)。

// org.apache.flink.table.api.internal.TableEnvironmentImpl#translate
private List<Transformation<?>> translate(List<ModifyOperation> modifyOperations) {
  return planner.translate(modifyOperations);
}
// org.apache.flink.table.planner.delegation.PlannerBase#translate
override def translate(
    modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
  if (modifyOperations.isEmpty) {
    return List.empty[Transformation[_]]
  }
  // 在转化前先准备 env,transformation 要添加到 env
  getExecEnv.configure(
    getTableConfig.getConfiguration,
    Thread.currentThread().getContextClassLoader)
  overrideEnvParallelism()
  // Operation -> Calcite RelNode 逻辑计划
  val relNodes = modifyOperations.map(translateToRel)
  // 优化 Calcite RelNode
  val optimizedRelNodes = optimize(relNodes)
  // 优化后的 RelNode -> execNodes
  val execNodes = translateToExecNodePlan(optimizedRelNodes)
  // execNodes 转化为 transformation
  translateToPlan(execNodes)
}

Operation -> Calcite RelNode

// org.apache.flink.table.planner.delegation.PlannerBase#translateToRel
private[flink] def translateToRel(modifyOperation: ModifyOperation): RelNode = {
  modifyOperation match {
    ...
    case catalogSink: CatalogSinkModifyOperation =>
      // 获取包含的 queryOperation
      val input = getRelBuilder.queryOperation(modifyOperation.getChild).build()
      val identifier = catalogSink.getTableIdentifier
      val dynamicOptions = catalogSink.getDynamicOptions
      // getTableSink 构建 Sink,返回(CatalogTable, TableSink实现类)
      getTableSink(identifier, dynamicOptions).map {
        case (table, sink: TableSink[_]) =>
          // check the logical field type and physical field type are compatible
          val queryLogicalType = FlinkTypeFactory.toLogicalRowType(input.getRowType)
          // validate logical schema and physical schema are compatible
          validateLogicalPhysicalTypesCompatible(table, sink, queryLogicalType)
          // validate TableSink
          validateTableSink(catalogSink, identifier, sink, table.getPartitionKeys)
          // validate query schema and sink schema, and apply cast if possible
          val query = validateSchemaAndApplyImplicitCast(
            input,
            TableSchemaUtils.getPhysicalSchema(table.getSchema),
            catalogSink.getTableIdentifier,
            getTypeFactory)
          LogicalLegacySink.create(
            query,
            sink,
            identifier.toString,
            table,
            catalogSink.getStaticPartitions.toMap)

        case (table, sink: DynamicTableSink) =>
          // 新版本 dynamic 
          DynamicSinkUtils.toRel(getRelBuilder, input, catalogSink, sink, table)
      } match {
        // 返回 RelNode
        case Some(sinkRel) => sinkRel
        case None =>
          throw new TableException(s"Sink ${catalogSink.getTableIdentifier} does not exists")
      }

QueryOperationConverter

// org.apache.flink.table.planner.calcite.FlinkRelBuilder
def queryOperation(queryOperation: QueryOperation): RelBuilder = {
  val relNode = queryOperation.accept(toRelNodeConverter)
  push(relNode)
  this
}
private val toRelNodeConverter = {
  new QueryOperationConverter(this)
}

QueryOperationConverter :将 Flink 中 QueryOperation 转换为 Calcite 中的关系表达 RelNode。

构建 Sink

// org.apache.flink.table.planner.delegation.PlannerBase#getTableSink
private def getTableSink(
    objectIdentifier: ObjectIdentifier,
    dynamicOptions: JMap[String, String])
  : Option[(CatalogTable, Any)] = {
  // 根据表全路径从 Catalog 获取到 CatalogTable  
  val lookupResult = JavaScalaConversionUtil.toScala(catalogManager.getTable(objectIdentifier))
  lookupResult
    .map(_.getTable) match {
    case Some(table: ConnectorCatalogTable[_, _]) =>
      JavaScalaConversionUtil.toScala(table.getTableSink) match {
        case Some(sink) => Some(table, sink)
        case None => None
      }
    // 二:获取 Catalog
    // 三:获取 TableSink
    case Some(table: CatalogTable) =>
      val catalog = catalogManager.getCatalog(objectIdentifier.getCatalogName)
      val tableToFind = if (dynamicOptions.nonEmpty) {
        table.copy(FlinkHints.mergeTableOptions(dynamicOptions, table.getProperties))
      } else {
        table
      }
      val isTemporary = lookupResult.get.isTemporary
      if (isLegacyConnectorOptions(objectIdentifier, table, isTemporary)) {
        val tableSink = TableFactoryUtil.findAndCreateTableSink(
          catalog.orElse(null),
          objectIdentifier,
          tableToFind,
          getTableConfig.getConfiguration,
          isStreamingMode,
          isTemporary)
        Option(table, tableSink)
      } else {
        val tableSink = FactoryUtil.createTableSink(
          catalog.orElse(null),
          objectIdentifier,
          tableToFind,
          getTableConfig.getConfiguration,
          Thread.currentThread().getContextClassLoader,
          isTemporary)
        Option(table, tableSink)
      }

    case _ => None
  }
}

获取 TableSink 时,先判断创建表的 connect 是老的还是新的写法,对应不同处理。新的写法是创建动态表

  • connect 是否包含 connector.type,包含代表是老的 Sink
  • 若不包含,先尝试用老的方法 findAndCreateTableSink 获取下;没有异常表示还是老的创建方式(很严谨),有异常捕捉到下一步
  • 新的,创建动态表

创建的过程先判断表所属的 Catalog 有没有表创建的工厂类,有则直接创建;没有再利用 Java SPI 去加载对应的表创建工厂类(Flink SQL 之 Create Table)。

DynamicSinkUtils

DynamicTableSink 转换为 RelNode

// org.apache.flink.table.planner.sinks.DynamicSinkUtils
public static RelNode toRel(
      FlinkRelBuilder relBuilder,
      RelNode input,
      CatalogSinkModifyOperation sinkOperation,
      DynamicTableSink sink,
      CatalogTable table) {
  final FlinkTypeFactory typeFactory = ShortcutUtils.unwrapTypeFactory(relBuilder);
  final TableSchema schema = table.getSchema();

  // 1. 校验 Sink 和 operation 的分区性/overwrite/元信息 特性是否一直
  prepareDynamicSink(sinkOperation, sink, table);

  // 2. 校验 operation 和 Sink schema 是否相同
  final RelNode query =
          validateSchemaAndApplyImplicitCast(
                  input, schema, sinkOperation.getTableIdentifier(), typeFactory);
  relBuilder.push(query);

  // 3. 获取表元数据列
  final List<Integer> metadataColumns = extractPersistedMetadataColumns(schema);
  if (!metadataColumns.isEmpty()) {
      pushMetadataProjection(relBuilder, typeFactory, schema, sink);
  }
  final RelNode finalQuery = relBuilder.build();
  return LogicalSink.create(
          finalQuery,
          sinkOperation.getTableIdentifier(),
          table,
          sink,
          sinkOperation.getStaticPartitions());
}

RelNode 优化

一个 Optimization Engine 包含三个组成部分:

  • rules:也就是匹配规则,Calcite 内置上百种 Rules 来优化 relational expression,当然也支持自定义 rules;
  • metadata providers:主要是向优化器提供信息,这些信息会有助于指导优化器向着目标(减少整体 cost)进行优化,信息可以包括行数、table 哪一列是唯一列等,也包括计算 RelNode 树中执行 subexpression cost 的函数;
  • planner engines:它的主要目标是进行触发 rules 来达到指定目标,比如像 cost-based optimizer(CBO)的目标是减少 cost(Cost 包括处理的数据行数、CPU cost、IO cost 等)。

优化器的作用是将解析器生成的关系代数表达式转换成执行计划,供执行引擎执行,在这个过程中,会应用一些规则优化,以帮助生成更高效的执行计划。优化器进行优化的地方如过滤条件的下压(push down),在进行 join 操作前,先进行 filter 操作,这样的话就不需要在 join 时进行全量 join,减少参与 join 的数据量等。

Calcite 中 RelOptPlanner 是 Calcite 中优化器的基类。Calcite 中关于优化器提供了两种实现:

  • HepPlanner:就是基于规则优化 RBO 的实现,它是一个启发式的优化器,按照规则进行匹配,直到达到次数限制(match 次数限制)或者遍历一遍后不再出现 rule match 的情况才算完成;
  • VolcanoPlanner:就是基于成本优化 CBO 的实现,它会一直迭代 rules,直到找到 cost 最小的 paln。
// org.apache.flink.table.planner.delegation.PlannerBase
val optimizedRelNodes = optimize(relNodes)
 -> val optimizedRelNodes = getOptimizer.optimize(relNodes)
// org.apache.flink.table.planner.delegation.StreamPlanner#getOptimizer
override protected def getOptimizer: Optimizer = new StreamCommonSubGraphBasedOptimizer(this)

StreamCommonSubGraphBasedOptimizer

流上的基于子图的优化器,用来优化 Blink 流的 SQL 和 Table。

基于 DAG 的公共子图优化将原始的 RelNode DAG 优化为语义等价的 RelNode DAG。什么是公共子图?子图就是一个逻辑计划树的子树,公共子图也叫公共子树。公共子树就是多颗逻辑计划树相同的子树。Calcite Planner 不支持 DAG (包含了多个 Sink)的优化,所以 RelNode DAG 需要被分解为多颗子树(每颗子树一个根,即只有一个 Sink),每颗子树使用 org.apache.calcite.plan.RelOptPlanner 单独优化。

优化算法的过程如下:

  • 首先将 RelNode DAG 分解为多颗子树(Flink 中对应的类为 RelNodeBlock),然后生成一个 RelNodeBlock DAG。每个 RelNodeBlock 只有一个 Sink,代表一颗子树。
  • 递归优化 RelNodeBlock,优化顺序是从叶子结点(Source)到根结点(Sink)。非根子树(RelNodeBlock)包装为一个 IntermediateRelTable
  • 优化完成之后,将 IntermediateRelTable 重新展开生成优化后的 RelNode DAG。

目前,选择这种优化策略主要基于以下考虑。

  • 一般来说,使用多个 Sink 的用户倾向于使用 View,View 天然就是一个公共子图
  • 经过优化之后,如 Project 下推、filter 下推,在最终的 DAG 中可能就没有公共子图了。

当前的策略可以改进的地方:

  • 如何找到公共子图的切分点,如一些 Physical RelNode 可能是从多个 Logical RelNode 转换而来,所以一个合法的切分点一定不在这几个 Logical RelNode 之间
  • 优化结果是局部最优(每个子图内最优),不是全局最优

逻辑优化使用的是 Calcite 的 Hep 优化器(基于规则),物理优化阶段使用了 Calcite 的 Hep 规则优化器和 Volcano 优化器(基于代价)。

Blink 为 StreamCommonSubGraphBasedOptimizer 添加很多规则(基本涵盖所有的操作)

// org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer#optimizeTree   
val programs = calciteConfig.getStreamProgram
      .getOrElse(FlinkStreamProgram.buildProgram(config.getConfiguration))

StreamCommonSubGraphBasedOptimizer 根据规则对每个操作生成对应的 PhysicalNode:以 join 为例,匹配到 StreamPhysicalJoinRule 就会生成 StreamPhysicalJoin PhysicalNode。

RelNode to ExecNode

在 RelNode -> ExecNode 之前,会先进行之前说的优化算法(公共子图)。

每个 FlinkPhysicalRelNode 都有 translateToExecNode 函数,会将 RelNode 转换为 ExecNode。以 StreamPhysicalJoin 为例,得到 StreamExecJoin

// org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalJoin#translateToExecNode
override def translateToExecNode(): ExecNode[_] = {
    new StreamExecJoin(
        joinSpec,
        getUniqueKeys(left),
        getUniqueKeys(right),
        InputProperty.DEFAULT,
        InputProperty.DEFAULT,
        FlinkTypeFactory.toLogicalRowType(getRowType),
        getRelDetailedDescription)
}

ExecNode to Transformation

// org.apache.flink.table.planner.delegation.StreamPlanner#translateToPlan
override protected def translateToPlan(execGraph: ExecNodeGraph): util.List[Transformation[_]] = {
    val planner = createDummyPlanner()
    // 遍历 execnode
    execGraph.getRootNodes.map {
        case node: StreamExecNode[_] => node.translateToPlan(planner)
        case _ =>
        throw new TableException("Cannot generate DataStream due to an invalid logical plan. " +
            "This is a bug and should not happen. Please file an issue.")
    }
}

// org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase#translateToPlan
public Transformation<T> translateToPlan(Planner planner) {
    if (transformation == null) {
        // 最终是调用 execnode.translateToPlanInternal
        transformation = translateToPlanInternal((PlannerBase) planner);
        if (this instanceof SingleTransformationTranslator) {
            if (inputsContainSingleton()) {
                transformation.setParallelism(1);
                transformation.setMaxParallelism(1);
            }
        }
    }
    return transformation;
}

StreamExecJoin 为例,translateToPlan 函数返回 TwoInputTransformation,Transformation 中包含 StreamingJoinOperator/StreamingSemiAntiJoinOperator。到这里应该很熟悉了,是 DataStream 的套路,至此 SQL 语句最终生成一连串的 Transformation

参考资料

Flink 内核原理与实现