目录

Background

对Spark的CBO(cost based optimization) 进行源码分析

Spark CBO 源码分析

CBO是基于Cost来优化plan。

要计算cost就需要统计一些参与计算的表的相关信息,因此spark添加了Statistics和ColumnStat类来统计相关信息。

CBO主要是针对join来计算cost,目前spark-2.3 版本中与CBO相关的参数如下:

参数 默认值 说明
spark.sql.cbo.enabled false Enables CBO for estimation of plan statistics when set true.
spark.sql.cbo.joinReorder.enabled false Enables join reorder in CBO.
spark.sql.cbo.joinReorder.dp.star.filter false Applies star-join filter heuristics to cost based join enumeration.
spark.sql.cbo.joinReorder.dp.threshold 12 The maximum number of joined nodes allowed in the dynamic programming algorithm.
spark.sql.cbo.starSchemaDetection false When true, it enables join reordering based on star schema detection.

下文按照逻辑顺序分析spark cbo 源码。

统计信息类

CBO相关的统计信息类有两个,一个是ColumnStat,代表的是表中列的详细,例如最大值,最小值,空值个数,平均长度,最大长度。另外一个类是Statistics,这个类是对应一个LogicalPlan的统计信息,例如join,aggregate,logicalRelation。

case class Statistics(
    sizeInBytes: BigInt,
    rowCount: Option[BigInt] = None,
    attributeStats: AttributeMap[ColumnStat] = AttributeMap(Nil),
    hints: HintInfo = HintInfo()) 

case class ColumnStat(
    distinctCount: BigInt,
    min: Option[Any],
    max: Option[Any],
    nullCount: BigInt,
    avgLen: Long,
    maxLen: Long,
    histogram: Option[Histogram] = None) 

如上所示,可以看到ColumnStat表示列的详细信息。

而Statistics,中的sizeInBytes和rowCount就代表这个logicalPlan输出数据的大小和行数,而attributeStats 代表这个logicalPlan涉及到的列的统计信息(一个expressID到列信息的映射),和hints。

对于join来说,它的Statistics里的信息就代表join操作输出的大小,行数以及attributeStats。

对于logicalRelation,它的Statistics代表其对应表中schema相关数据的大小,行数,attributeStats。

CatalogStatistics这个类表示存储在外部catalog(例如hive metastore)中的表的信息.

case class CatalogStatistics(
    sizeInBytes: BigInt,
    rowCount: Option[BigInt] = None,
    colStats: Map[String, ColumnStat] = Map.empty)

这些表的信息需要使用 analyze table命令来计算,然后存储到catalog里。

每种LogicalPlan计算Statistics的方法是不同的。

对于LogicalRelation来说,它是读取对应表中schema,使用CatalogStatistics类的toPlanStats可以生成Statistics。

def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean): Statistics = {
  if (cboEnabled && rowCount.isDefined) {
    val attrStats = AttributeMap(planOutput.flatMap(a => colStats.get(a.name).map(a -> _)))
    // Estimate size as number of rows * row size.
    val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats)
    Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats)
  } else {
    // When CBO is disabled or the table doesn't have other statistics, we apply the size-only
    // estimation strategy and only propagate sizeInBytes in statistics.
    Statistics(sizeInBytes = sizeInBytes)
  }
}

下面将介绍其他LogicalPlan的Statistics计算。

Statistics的计算

看LogicalPlanStats类,可以看出,这里,判断cbo是否开启,如果cbo打开,则采用BasicStatsPlanVisitor类来计算相关的Statistics,如果没有cbo,则使用SizeInBytesOnlyStatsPlanVisitor来计算。

从类的名字就可以看出来,只有cbo开启,才会计算rowCount以及attributeStats信息,如果没有cbo,SizeInBytesOnlyStatsPlanVisitor只会计算 size信息。

trait LogicalPlanStats { self: LogicalPlan =>
  def stats: Statistics = statsCache.getOrElse {
    if (conf.cboEnabled) {
      statsCache = Option(BasicStatsPlanVisitor.visit(self))
    } else {
      statsCache = Option(SizeInBytesOnlyStatsPlanVisitor.visit(self))
    }
    statsCache.get
  }
}

其实在BasicStatsPlanVisitor类中对于大部分类型的LogicalPlan都还是调用SizeInBytesOnlyStatsPlanVisitor的方法来计算。

只有针对Aggregate,Join,Filter,Project有另外的计算方法。

这里讲下join操作的Statistics计算过程。

如果没有开启CBO,join操作首先判断是否是 leftAntiJoin或者是LeftSemiJoin,如果是,则把leftChild的sizeInBytes作为计算结果,因为对于leftAntiJoin和leftSemiJoin来说,join之后表的大小是小于leftChild的。而对于其他类型的join,把左右child的sizeInBytes相乘作为join之后的大小,并且关闭掉broadcastHint,因为这些join类型可能造成很大的output。而这种粗糙的代价估计造成的结果就是,对代价估计不准确,如果该join是可以进行broadcastjoin,也可能由于粗糙的代价估计变得不可进行。

如果开启了CBO,对于join操作就不止计算sizeInBytes,还需要计算rowCount,AttributeStats。

代码如下,首先是判断join类型,如果是 inner,cross,leftOuter,RightOuter,FullOuter中的一种,则使用estimateInnerOuterJoin方法。

def estimate: Option[Statistics] = {
  join.joinType match {
    case Inner | Cross | LeftOuter | RightOuter | FullOuter =>
      estimateInnerOuterJoin()
    case LeftSemi | LeftAnti =>
      estimateLeftSemiAntiJoin()
    case _ =>
      logDebug(s"[CBO] Unsupported join type: ${join.joinType}")
      None
  }

这里只针对针对estimateInnerOuterJoin方法,用语言描述一下:

如果是equiJoin:

1、首先估算被equi条件选择的记录条数,即等于innerJoin选择的条数,命名为numInnerJoinedRows;以及这些equi涉及的key在join之后的stats。

即在join中,存在类似 a.co1=b.co1, a.co2=b.co2 这些类似条件,现在是估计满足这些相等条件的记录条数。

使用的公式是: T(A J B) = T(A) * T(B) / max(V(A.ki), V(B.ki)).

2、 预估得到结果的行数。

因为即使满足这些相等条件,也不会只输出这些满足条件的记录。

如果是leftOuterJoin,则会对左边表中所有记录都会输出,不管右边匹配是否为空。

因此,对于leftOuterJoin来说,输出的记录条数等于max(左边表条数,numInnerJoinedRows)。

同样还有rightOuterJoin,输出记录条数=max(右边表条数,numInnerJoinedRows)。

对于全连接,输出记录条数=max(左边表条数,numInnerJoinedRows)+max(右边表条数,numInnerJoinedRows)-numInnerJoinedRows。即类似于A与B的并集-A与B的交集。

3、然后是根据前面的计算结果更新Statistics,包括attributeStats。

如果不是equiJoin:

则按照笛卡尔积来计算,输出行数为两个表行数的乘积

拿到数据之后怎么用

这些Statistics的结果,会怎么运用呢?

spark sql中plan的处理过程可以参考Spark sql catalyst过程详解.

在unresolvedLogicalPlan->resolvedLogicalPlan过程中收集Statistics,然后在

resolvedLogicalPlan->optimizedLogicalPlan过程中,基于这些统计信息,进行costBasedJoinRecorder,即基于统计信息,对join顺序重排序,寻求最优join方案。

在optimizedLogicalPlan->phsicalPlan过程中,基于Statistics中的sizeInBytes信息以及hint选择合适的join策略(broadcastJoin,hashShuffledJoin,sortMergeJoin).

CostBasedJoinReorder

这是一个使用plan的stats信息,来选择合适的join顺序的类。

Optimizer中有两个跟join 顺序有关的rule,一个是reoderJoin,另外一个是CostBasedJoinRecorder。reorderjoin是没有cbo也会触发的rule,这个不会使用统计的信息,只是负责将filter下推,这样最底层的join至少会有一个filter。如果这些join已经每个都有一条condition,那么这些plan就不会变化,因此reorder join不涉及基于代价的优化。

首先看下对cost的定义。cost是有一个基数,是rowCount,然后一个sizeInBytes。

/**
 * This class defines the cost model for a plan.
 * @param card Cardinality (number of rows).
 * @param size Size in bytes.
 */
case class Cost(card: BigInt, size: BigInt) {
  def +(other: Cost): Cost = Cost(this.card + other.card, this.size + other.size)
}

而判断cost的方法是:

A: Cost(ac,as) B: Cost(bc,bs)

如果

(ac/bc)*joinReorderCardWeight +(as/bs)*(1-joinReorderCardWeight)<1,

则认为A比B好。spark.sql.cbo.joinReorder.card.weight默认为0.7。代码如下:

def betterThan(other: JoinPlan, conf: SQLConf): Boolean = {
  if (other.planCost.card == 0 || other.planCost.size == 0) {
    false
  } else {
    val relativeRows = BigDecimal(this.planCost.card) / BigDecimal(other.planCost.card)
    val relativeSize = BigDecimal(this.planCost.size) / BigDecimal(other.planCost.size)
    relativeRows * conf.joinReorderCardWeight +
      relativeSize * (1 - conf.joinReorderCardWeight) < 1
  }
}

costBasedJoinReorder是使用一个动态规划来进行选择合适的join顺序。

下面讲一个这个动态规划算法。

假设有 a j b j c j d a.k1=b.k1 and b.k2 = c.k2 and c.k3=d.k3

将会分为4层来进行:

level 0: p({A}), p({B}), p({C}), p({D}) level 1: p({A, B}), p({B, C}), p({C, D}) level 2: p({A, B, C}), p({B, C, D}) level 3: p({A, B, C, D})

首先就是生成第0层,第0层的founfPlans={p{a},p{b},p{c},p{d}}.

如果设层级为level,那么每层的任务就是找到(level+1)个plan进行join最优的版本。

因此 k层和level-k层的所包含的表的个数之和,就是(k+1+level-k+1)=level+2,也就是说是level+1层所需要的foundPlan。

而我们在每次生成新的join之后,就判断他的itemSet是否已经存在,如果不存在就存储;如果存在,就取出其对应的plan,对比看是不是优于之前的plan(betterThan),保存最优的。

这样。每个level里面保存的都是相应个数个多join最优的plan,最终也得到了最优的plan。

当然,在形成plan时有很多判断,比如在level1 里面,就不能形成p({A,C})。

因为不存在condition 使得A,C可以进行join。

当然,在动态规划进行search的时候,有一个filter。

spark.sql.cbo.joinReorder.dp.star.filter

这是一个星型join过滤器,用来确保star schema 中的tables是被plan在一起。

spark.sql.cbo.joinReorder.dp.threshold

代表在dp进行costbasedReorder时,最多支持的表的数量。

**spark.sql.cbo.starSchemaDetection **

这个参数是在reorderJoin中触发,而且只在

spark.sql.cbo.starSchemaDetection=true spark.sql.cbo.enabled=false时才触发,很奇怪,这个参数以cbo命名,但是却在cbo.enable=false才触发。

这个是用来观察是否存在starJoin。

JoinSelection

在SparkPlanner类中,有几个优化策略会对LogicalPlan进行优化。

class SparkPlanner(
    val sparkContext: SparkContext,
    val conf: SQLConf,
    val experimentalMethods: ExperimentalMethods)
  extends SparkStrategies {
	...
  override def strategies: Seq[Strategy] =
    experimentalMethods.extraStrategies ++
      extraPlanningStrategies ++ (
      DataSourceV2Strategy ::
      FileSourceStrategy ::
      DataSourceStrategy(conf) ::
      SpecialLimits ::
      Aggregation ::
      JoinSelection ::
      InMemoryScans ::
      BasicOperators :: Nil)
      ...
  }

里面有一个JoinSelection方法,这个方法是主要是用来判断是否可以使用broadcastjoin,然后决定是使用broadcastJoin,还是shuffledHashJoin还是sortMergeJoin。

broadcastjoin可以避免shuffle,如果使用得当,可以提升程序的性能。这是针对一个大表和一个极小表在spark中有一个参数是,spark.sql.autoBroadcastJoinThreshold,这个参数是一个数字,单位字节,代表如果一个表的szie小于这个数值,就可以进行broadcastjoin。但是这里只使用size作为估计是不准确的,还应该使用rowCount作为参考,因为在join中,join的结果是与两个表的条数强相关,只使用size做判断是不准确的。

在spark中,有BroadCastHint,前面也提到过,如果没有开启cbo,那么如果判断join类型是非leftAntiJoin和leftSemiJoin,则会觉得join之后的大小无法估测,可能会爆炸式增长,因此会关掉BroadcastHint。

对于shuffledHashJoin,这是针对一个大表和一个小表(判断标准为a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes),简单描述一下过程就是两个表A和B,首先,选择一个表进行shuffle write操作,即针对每个分区,按照key的hash值进行排序,将相同hash值的key放在一起,形成一个partitionFile,然后在read端拉取write端所有相应key的数据,作为localhashMap和另外一个标的分区进行join。

这里也使用stats进行判断,如果plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions,则判断该表的size可以满足每个分区构建localhashMap的可能,可以看到这里也是以autoBroadcastJoinThreshold作为衡量标准。

如果是两张大表,则需要使用sortmergeJoin,类似于先排序,即按照keypair排序,然后进行归并。

这些join selection的操作,不管是否开启CBO都会进行。但是和CBO相关的是,这些数据的统计是和CBO有关,前面提过,如果开启CBO则使用BasicStatsPlanVisitor来进行统计。

上述的这些估测,都是基于size信息。但是即使是基于size信息,如果没有开启cbo,这些信息也是粗糙的,没有CBO那种更细致的估计,因此可能会造成Join种类选择不合适。

上述的判断,很多是基于spark.sql.autoBroadcastJoinThreshold,因此在运行环境中,一定要结合集群环境设置合适的值。

而且,在joinSelection中,也应该基于rowCount来判断join的种类。