Spark SQL实现原理-逻辑计划优化的实现

2021/7/6 19:34:54

本文主要是介绍Spark SQL实现原理-逻辑计划优化的实现,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

逻辑计划优化(Logical Optimization)阶段把标准的基于规则(Rule-based)的优化策略应用于已经分析的逻辑计划(Resolved Logical Plan)。

说明:为了对总体架构有一个更加宏观的掌握,所以逻辑计划分析规则的实现会在后续逐渐补上,先继续分析总体框架的实现。

优化规则的分类

逻辑计划的默认优化规则集Optimizer#defaultBatches变量中定义。和逻辑计划的分析规则一样,逻辑计划的优化规则也通过规则集(Batch对象)的方式进行组织,每个规则集包括多个优化规则。

规则集的定义实现代码如下(有删减):

def defaultBatches: Seq[Batch] = {
	...
    Batch("Union", Once, CombineUnions) ::
    Batch("LocalRelation early", fixedPoint, ...) ::
    Batch("Pullup Correlated Expressions", Once, ...) ::
    Batch("Subquery", Once, OptimizeSubqueries) ::
    Batch("Replace Operators", fixedPoint,...) ::
    Batch("Aggregate", fixedPoint, ...) :: Nil ++) :+
    Batch("Join Reorder", Once, ...) :+
    Batch("Remove Redundant Sorts", Once, ...) :+
    Batch("Decimal Optimizations", fixedPoint, ...) :+
  ...

由于优化规则集比较多,有些场合下并非所有的规则集都需要用到,为了让用户可以排除掉一些不需要的规则集,Spark SQL添加了配置项:spark.sql.optimizer.excludedRules,默认是空。可以通过该配置项来配置需要排除掉的优化规则名列表,通过逗号分隔。这些规则保存在excludedRules变量中。

通过排除优化规则的选项给了用户一定的控制权,但对于Spark SQL来说,有些优化规则是必须的,是不能去掉的。所以,Spark SQL在优化器中又定义了一个变量:nonExcludableRules,用来保存必须保留的优化规则。其代码实现(有删减)如下:

  def nonExcludableRules: Seq[String] =
    EliminateDistinct.ruleName ::
      EliminateSubqueryAliases.ruleName ::
      EliminateView.ruleName ::
      ReplaceExpressions.ruleName ::
      ComputeCurrentTime.ruleName ::
			...

这样,最后被使用的规则集就是:

(defaultBatches - (excludedRules - nonExcludableRules))
// 也就是
默认规则集 -(排除规则集 -  保留规则集)

可以这么理解,原则上就是默认规则集减去用户配置的排除规则集,但系统保留的规则集不能排除,所以要从用户配置的列表中减去。换句话说,即使用户配置了要排除的规则集列表,但要是这些规则集在nonExcludableRules(系统保留规则集)中,它们也不会被排除。

操作优化规则集

在优化规则集中,有一大类就是:操作优化规则集。操作优化规则集定义了对操作的各种优化,是非常重要的优化规则,我们在查看逻辑计划时也经常可以看到。操作优化规则集包括:

  • 常量折叠(constant folding)
  • 谓词下推(predicate pushdown)
  • 操作合并(operator combine)
  • 投影修剪(projection pruning)
  • 空值传播(null propagation)
  • 布尔表达式简化(Boolean expression simplification)
  • 其他规则

优化器:Optimizer

实现逻辑计划规则优化的是Optimizer类对象,Optimizer是一个抽象父类,其实现类只有一个,就是:SparkOptimizer。而Optimizer类继承了RuleExecutor,这几者的关系如下:

 RuleExecutor
      |
 Optimizer
      |
 SparkOptimizer

各种逻辑计划的优化规则集是在抽象父类Optimizer中定义的,这样其实现类就可以直接使用这些规则。

逻辑计划优化的执行

在前面的文章分析过,逻辑计划优化的执行的函数调用如下:

  // 2.对分析后的逻辑计划进行优化,得到优化后的逻辑计划
  lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)

该函数最终调用的是父类的execute函数,也就是RuleExecutor中定义的execute函数。该函数的实现逻辑在《Spark SQL实现原理-逻辑计划分析的实现》一文中已经介绍过了。其大体思路就是:后续遍历逻辑计划树的每个节点,根据优化规则集和执行策略来处理逻辑计划树的每个节点,直到逻辑计划树不在变化或到达执行阈值就结束执行。具体的实现逻辑如下:
1.按顺序遍历规则集列表:Optimizer#batches中的每一个规则集(Batch);

2.顺序遍历规则集中的每一个规则(Rule),并使用每一个规则来处理逻辑计划,每个处理结果传给下一个处理规则。

3.当一个规则集中的规则都遍历(使用)完成后,进行以下判断:

​ 1)查看是否达到执行策略的阈值(迭代次数),若已经大于等于阈值,则不再遍历和执行当前规则集。

​ 2)查看使用本次规则集前后的逻辑计划是否相等,若相等说明不需要再执行当前规则集了。

满足1)和2)中的任意一个,直接跳到4执行,否则继续使用当前规则集。

4.遍历和使用下一个规则集的每个规则,并按第3步的逻辑进行处理。

如何编写优化规则

除了Spark SQL自带的各种逻辑计划优化规则集,还可以自己编写优化规则,在《Spark SQL: Relational Data Processing in Spark》一文中介绍了一个自定义优化规则。

该规则的目的是:当将固定精度的 DECIMAL 类型添加到 Spark SQL 时,想优化小精 DECIMAL 上的求和和或平均值等聚合操作; 用 12 行代码编写了一个规则,在 SUM 和 AVG 表达式中找到这样的小数,并将它们转换为未缩放的 64 位 LONG,对其进行聚合,然后将结果转换回DECIMAL类型。

object DecimalAggregates extends Rule[LogicalPlan] {
  /** Maximum number of decimal digits in a Long */
  val MAX_LONG_DIGITS = 18
  
  def apply(plan: LogicalPlan): LogicalPlan = {
    plan transformAllExpressions {
      case Sum(e @ DecimalType.Expression(prec , scale))
            if prec + 10 <= MAX_LONG_DIGITS =>
        MakeDecimal(Sum(LongValue(e)), prec + 10, scale)
    }
}

可以在规则中使用任意 Scala代码使得这些类型的优化变得容易表达,这些优化超越了子树结构的模式匹配。可见编写逻辑计划优化规则其实并不难,只要遵循以下接口的编写规范就可以。

object YourName extends Rule[LogicalPlan] {
	// ...
  def apply(plan: LogicalPlan): LogicalPlan = {
    plan transformAllExpressions {
      case xx1(...) if ... =>    // xx1是你想优化的逻辑计划节点对象
        // ...
        xxx2(...)           // 优化后的目标逻辑计划节点对象
    }
}

但要想写好逻辑计划优化规则,首先需要熟悉现有的优化规则和各个逻辑计划节点,然后根据需求来进行抽象出需要优化的逻辑。

逻辑计划优化的查看

可以通过几种方式来查看优化后的逻辑计划,通过scala终端来举例介绍一下。

(1)通过explain(true)来查看

通过explain(true)可以看到整个逻辑计划到物理计划的全过程:

scala> var ds1 = spark.range(100)
ds1: org.apache.spark.sql.Dataset[Long] = [id: bigint]

scala> var ds2 = spark.range(200)
ds2: org.apache.spark.sql.Dataset[Long] = [id: bigint]

scala> ds1.filter("id>10").union(ds2).filter("id>20").select("id").explain(true)
== Parsed Logical Plan ==
'Project [unresolvedalias('id, None)]
+- Filter (id#0L > cast(20 as bigint))
   +- Union
      :- Filter (id#0L > cast(10 as bigint))
      :  +- Range (0, 100, step=1, splits=Some(1))
      +- Range (0, 200, step=1, splits=Some(1))

== Analyzed Logical Plan ==
id: bigint
Project [id#0L]
+- Filter (id#0L > cast(20 as bigint))
   +- Union
      :- Filter (id#0L > cast(10 as bigint))
      :  +- Range (0, 100, step=1, splits=Some(1))
      +- Range (0, 200, step=1, splits=Some(1))

== Optimized Logical Plan ==
Union
:- Filter ((id#0L > 10) && (id#0L > 20))
:  +- Range (0, 100, step=1, splits=Some(1))
+- Filter (id#2L > 20)
   +- Range (0, 200, step=1, splits=Some(1))

//...

另外,通过下面的命令可以看到逻辑计划节点和参数情况:

scala> ds1.filter("id>10").union(ds2).filter("id>20").select("id").queryExecution.optimizedPlan.prettyJson

(2)通过queryExecution对象来查看

通过queryExecution可以只单独查看优化后的逻辑计划。

scala> ds1.filter("id>10").union(ds2).filter("id>20").select("id").queryExecution.optimizedPlanres9: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =Union:- Filter ((id#0L > 10) && (id#0L > 20)):  +- Range (0, 100, step=1, splits=Some(1))+- Filter (id#2L > 20)   +- Range (0, 200, step=1, splits=Some(1))

小结

本文分析了逻辑计划优化的总体实现流程,并对实现自己的优化规则进行了简单的介绍,最后介绍如何查看逻辑计划优化的结果。逻辑计划的优化可以说是Catalyst项目的核心,接下来会通过一系列文章介绍各种逻辑计划优化规则的使用和实现原理。



这篇关于Spark SQL实现原理-逻辑计划优化的实现的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程