Poison

Spark SQL JOIN 语句解析

Spark SQL 语法解析的文件位于 SqlBase.g4 at v2.4.4,其中 JOIN 语句的相关语法定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
fromClause
: FROM relation (',' relation)* lateralView* pivotClause?
;

relation
: relationPrimary joinRelation*
;

joinRelation
: (joinType) JOIN right=relationPrimary joinCriteria?
| NATURAL joinType JOIN right=relationPrimary
;

From 子句解析这部分的源码位于 AstBuilder.scala at v2.4.4:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Create a logical plan for a given 'FROM' clause. Note that we support multiple (comma
* separated) relations here, these get converted into a single plan by condition-less inner join.
*/
override def visitFromClause(ctx: FromClauseContext): LogicalPlan = withOrigin(ctx) {
val from = ctx.relation.asScala.foldLeft(null: LogicalPlan) { (left, relation) =>
val right = plan(relation.relationPrimary)
val join = right.optionalMap(left)(Join(_, _, Inner, None))
withJoinRelations(join, relation)
}
if (ctx.pivotClause() != null) {
if (!ctx.lateralView.isEmpty) {
throw new ParseException("LATERAL cannot be used together with PIVOT in FROM clause", ctx)
}
withPivot(ctx.pivotClause, from)
} else {
ctx.lateralView.asScala.foldLeft(from)(withGenerate)
}
}

visitFromClause 方法主要处理 fromClause 规则中的多个 relation 规则,根据上面的语法规则及该方法上的注释,我们知道每个 relation 为被 , 分隔的部分,其中使用了 foldLeft 依次对 relation 处理。在此贴一下 foldLeft 的源码:

1
2
3
4
5
def foldLeft[B](z: B)(op: (B, A) => B): B = {
var result = z
this foreach (x => result = op(result, x))
result
}

注意,初始值使用的 null,然后迭代 relation 进行处理,此时处理的主要为 , 分隔的 JOIN 表语句,因为 left 的初始值为 null,所以在 foldLeft 传递的函数中第一行变量的命名为 right,即第一次迭代时左表为 null,右表为首次出现的表。然后会根据 left 是否为空执行 optionalMap 方法,源码位于 ParserUtils.scala at v2.4.4:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/** Some syntactic sugar which makes it easier to work with optional clauses for LogicalPlans. */
implicit class EnhancedLogicalPlan(val plan: LogicalPlan) extends AnyVal {

/**
* Map a [[LogicalPlan]] to another [[LogicalPlan]] if the passed context exists using the
* passed function. The original plan is returned when the context does not exist.
*/
def optionalMap[C](ctx: C)(f: (C, LogicalPlan) => LogicalPlan): LogicalPlan = {
if (ctx != null) {
f(ctx, plan)
} else {
plan
}
}
}

在调用 optionalMap 方法时,planrightctxleft,可知,当首次调用时,不会执行传入的 Join 函数,而是直接返回 planright,即首次出现的表对应的逻辑计划。且通过 val join = right.optionalMap(left)(Join(_, _, Inner, None)) 可知,在将不同的 relation 进行 JOIN 操作时是采用的 Inner,且 JOIN 条件为 None

以上对于 ctx.relationfoldLeft 这一层操作是处理通过 , 分隔的 relation 语法规则,其中的 withJoinRelations(join, relation) 是对 relation 语法规则内部的 JOIN 语句进行处理,源码位于 AstBuilder.scala at v2.4.4:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Join one more [[LogicalPlan]]s to the current logical plan.
*/
private def withJoinRelations(base: LogicalPlan, ctx: RelationContext): LogicalPlan = {
ctx.joinRelation.asScala.foldLeft(base) { (left, join) =>
withOrigin(join) {
val baseJoinType = join.joinType match {
case null => Inner
case jt if jt.CROSS != null => Cross
case jt if jt.FULL != null => FullOuter
case jt if jt.SEMI != null => LeftSemi
case jt if jt.ANTI != null => LeftAnti
case jt if jt.LEFT != null => LeftOuter
case jt if jt.RIGHT != null => RightOuter
case _ => Inner
}

// Resolve the join type and join condition
val (joinType, condition) = Option(join.joinCriteria) match {
case Some(c) if c.USING != null =>
(UsingJoin(baseJoinType, c.identifier.asScala.map(_.getText)), None)
case Some(c) if c.booleanExpression != null =>
(baseJoinType, Option(expression(c.booleanExpression)))
case None if join.NATURAL != null =>
if (baseJoinType == Cross) {
throw new ParseException("NATURAL CROSS JOIN is not supported", ctx)
}
(NaturalJoin(baseJoinType), None)
case None =>
(baseJoinType, None)
}
Join(left, plan(join.right), joinType, condition)
}
}
}

可以看出,与之前的处理逻辑类似,只是这一次是处理的 relation 中的多个 joinRelation 规则,建立单个 relation 下多个 JOIN 语句的关系,其中 Join 类的源码位于 basicLogicalOperators.scala at v2.4.4:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
case class Join(
left: LogicalPlan,
right: LogicalPlan,
joinType: JoinType,
condition: Option[Expression])
extends BinaryNode with PredicateHelper {

override def output: Seq[Attribute] = {
joinType match {
case j: ExistenceJoin =>
left.output :+ j.exists
case LeftExistence(_) =>
left.output
case LeftOuter =>
left.output ++ right.output.map(_.withNullability(true))
case RightOuter =>
left.output.map(_.withNullability(true)) ++ right.output
case FullOuter =>
left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
case _ =>
left.output ++ right.output
}
}

override protected def validConstraints: Set[Expression] = {
joinType match {
case _: InnerLike if condition.isDefined =>
left.constraints
.union(right.constraints)
.union(splitConjunctivePredicates(condition.get).toSet)
case LeftSemi if condition.isDefined =>
left.constraints
.union(splitConjunctivePredicates(condition.get).toSet)
case j: ExistenceJoin =>
left.constraints
case _: InnerLike =>
left.constraints.union(right.constraints)
case LeftExistence(_) =>
left.constraints
case LeftOuter =>
left.constraints
case RightOuter =>
right.constraints
case FullOuter =>
Set.empty[Expression]
}
}

可知一个 Join 实例由 leftright 两个逻辑计划组成,且含有 joinTypecondition,即当前 Join 实例的连接类型及条件,以上即为 JOIN 语句的解析流程。最近开发 SQL 解析执行的相关组件,参考了 Spark SQL 的处理逻辑,且因为对 Scala 语法不是很熟,所以简要记录之。

Reference

Implicit Classes | Scala Documentation
《Spark SQL内核剖析》