Skip to content

Commit e6e9031

Browse files
rxincloud-fan
authored andcommitted
[SPARK-24865] Remove AnalysisBarrier
## What changes were proposed in this pull request? AnalysisBarrier was introduced in SPARK-20392 to improve analysis speed (don't re-analyze nodes that have already been analyzed). Before AnalysisBarrier, we already had some infrastructure in place, with analysis specific functions (resolveOperators and resolveExpressions). These functions do not recursively traverse down subplans that are already analyzed (with a mutable boolean flag _analyzed). The issue with the old system was that developers started using transformDown, which does a top-down traversal of the plan tree, because there was not top-down resolution function, and as a result analyzer performance became pretty bad. In order to fix the issue in SPARK-20392, AnalysisBarrier was introduced as a special node and for this special node, transform/transformUp/transformDown don't traverse down. However, the introduction of this special node caused a lot more troubles than it solves. This implicit node breaks assumptions and code in a few places, and it's hard to know when analysis barrier would exist, and when it wouldn't. Just a simple search of AnalysisBarrier in PR discussions demonstrates it is a source of bugs and additional complexity. Instead, this pull request removes AnalysisBarrier and reverts back to the old approach. We added infrastructure in tests that fail explicitly if transform methods are used in the analyzer. ## How was this patch tested? Added a test suite AnalysisHelperSuite for testing the resolve* methods and transform* methods. Author: Reynold Xin <[email protected]> Author: Xiao Li <[email protected]> Closes apache#21822 from rxin/SPARK-24865.
1 parent f9c9d80 commit e6e9031

File tree

25 files changed

+460
-276
lines changed

25 files changed

+460
-276
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 50 additions & 68 deletions
Large diffs are not rendered by default.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ trait CheckAnalysis extends PredicateHelper {
7979
// We transform up and order the rules so as to catch the first possible failure instead
8080
// of the result of cascading resolution failures.
8181
plan.foreachUp {
82+
83+
case p if p.analyzed => // Skip already analyzed sub-plans
84+
8285
case u: UnresolvedRelation =>
8386
u.failAnalysis(s"Table or view not found: ${u.tableIdentifier}")
8487

@@ -364,10 +367,11 @@ trait CheckAnalysis extends PredicateHelper {
364367
}
365368
extendedCheckRules.foreach(_(plan))
366369
plan.foreachUp {
367-
case AnalysisBarrier(child) if !child.resolved => checkAnalysis(child)
368370
case o if !o.resolved => failAnalysis(s"unresolved operator ${o.simpleString}")
369371
case _ =>
370372
}
373+
374+
plan.setAnalyzed()
371375
}
372376

373377
/**
@@ -531,9 +535,8 @@ trait CheckAnalysis extends PredicateHelper {
531535

532536
var foundNonEqualCorrelatedPred: Boolean = false
533537

534-
// Simplify the predicates before validating any unsupported correlation patterns
535-
// in the plan.
536-
BooleanSimplification(sub).foreachUp {
538+
// Simplify the predicates before validating any unsupported correlation patterns in the plan.
539+
AnalysisHelper.allowInvokingTransformsInAnalyzer { BooleanSimplification(sub).foreachUp {
537540
// Whitelist operators allowed in a correlated subquery
538541
// There are 4 categories:
539542
// 1. Operators that are allowed anywhere in a correlated subquery, and,
@@ -635,6 +638,6 @@ trait CheckAnalysis extends PredicateHelper {
635638
// are not allowed to have any correlated expressions.
636639
case p =>
637640
failOnOuterReferenceInSubTree(p)
638-
}
641+
}}
639642
}
640643
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ object DecimalPrecision extends TypeCoercionRule {
8282
PromotePrecision(Cast(e, dataType))
8383
}
8484

85-
override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transformUp {
85+
override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
8686
// fix decimal precision for expressions
8787
case q => q.transformExpressionsUp(
8888
decimalAndDecimal.orElse(integralAndDecimalLiteral).orElse(nondecimalAndDecimal))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ object ResolveHints {
8585
}
8686
}
8787

88-
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
88+
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
8989
case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
9090
if (h.parameters.isEmpty) {
9191
// If there is no table alias specified, turn the entire subtree into a BroadcastHint.
@@ -107,7 +107,7 @@ object ResolveHints {
107107
* This must be executed after all the other hint rules are executed.
108108
*/
109109
object RemoveAllHints extends Rule[LogicalPlan] {
110-
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
110+
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
111111
case h: UnresolvedHint => h.child
112112
}
113113
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.types.{StructField, StructType}
2929
* An analyzer rule that replaces [[UnresolvedInlineTable]] with [[LocalRelation]].
3030
*/
3131
case class ResolveInlineTables(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport {
32-
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
32+
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
3333
case table: UnresolvedInlineTable if table.expressionsResolved =>
3434
validateInputDimension(table)
3535
validateInputEvaluable(table)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ object ResolveTableValuedFunctions extends Rule[LogicalPlan] {
103103
})
104104
)
105105

106-
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
106+
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
107107
case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) =>
108108
val resolvedFunc = builtinFunctions.get(u.functionName.toLowerCase(Locale.ROOT)) match {
109109
case Some(tvf) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinals.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class SubstituteUnresolvedOrdinals(conf: SQLConf) extends Rule[LogicalPlan] {
3333
case _ => false
3434
}
3535

36-
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
36+
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
3737
case s: Sort if conf.orderByOrdinal && s.order.exists(o => isIntLiteral(o.child)) =>
3838
val newOrders = s.order.map {
3939
case order @ SortOrder(ordinal @ Literal(index: Int, IntegerType), _, _, _) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ object TypeCoercion {
318318
*/
319319
object WidenSetOperationTypes extends Rule[LogicalPlan] {
320320

321-
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
321+
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
322322
case s @ SetOperation(left, right) if s.childrenResolved &&
323323
left.output.length == right.output.length && !s.resolved =>
324324
val newChildren: Seq[LogicalPlan] = buildNewChildrenWithWiderTypes(left :: right :: Nil)
@@ -391,7 +391,7 @@ object TypeCoercion {
391391
}
392392

393393
override protected def coerceTypes(
394-
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
394+
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
395395
// Skip nodes who's children have not been resolved yet.
396396
case e if !e.childrenResolved => e
397397

@@ -453,7 +453,7 @@ object TypeCoercion {
453453
}
454454

455455
override protected def coerceTypes(
456-
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
456+
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
457457
// Skip nodes who's children have not been resolved yet.
458458
case e if !e.childrenResolved => e
459459

@@ -512,7 +512,7 @@ object TypeCoercion {
512512
private val trueValues = Seq(1.toByte, 1.toShort, 1, 1L, Decimal.ONE)
513513
private val falseValues = Seq(0.toByte, 0.toShort, 0, 0L, Decimal.ZERO)
514514

515-
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
515+
def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
516516
// Skip nodes who's children have not been resolved yet.
517517
case e if !e.childrenResolved => e
518518

@@ -555,7 +555,7 @@ object TypeCoercion {
555555
object FunctionArgumentConversion extends TypeCoercionRule {
556556

557557
override protected def coerceTypes(
558-
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
558+
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
559559
// Skip nodes who's children have not been resolved yet.
560560
case e if !e.childrenResolved => e
561561

@@ -670,7 +670,7 @@ object TypeCoercion {
670670
*/
671671
object Division extends TypeCoercionRule {
672672
override protected def coerceTypes(
673-
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
673+
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
674674
// Skip nodes who has not been resolved yet,
675675
// as this is an extra rule which should be applied at last.
676676
case e if !e.childrenResolved => e
@@ -693,7 +693,7 @@ object TypeCoercion {
693693
*/
694694
object CaseWhenCoercion extends TypeCoercionRule {
695695
override protected def coerceTypes(
696-
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
696+
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
697697
case c: CaseWhen if c.childrenResolved && !haveSameType(c.inputTypesForMerging) =>
698698
val maybeCommonType = findWiderCommonType(c.inputTypesForMerging)
699699
maybeCommonType.map { commonType =>
@@ -711,7 +711,7 @@ object TypeCoercion {
711711
*/
712712
object IfCoercion extends TypeCoercionRule {
713713
override protected def coerceTypes(
714-
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
714+
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
715715
case e if !e.childrenResolved => e
716716
// Find tightest common type for If, if the true value and false value have different types.
717717
case i @ If(pred, left, right) if !haveSameType(i.inputTypesForMerging) =>
@@ -731,7 +731,7 @@ object TypeCoercion {
731731
* Coerces NullTypes in the Stack expression to the column types of the corresponding positions.
732732
*/
733733
object StackCoercion extends TypeCoercionRule {
734-
override def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
734+
override def coerceTypes(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
735735
case s @ Stack(children) if s.childrenResolved && s.hasFoldableNumRows =>
736736
Stack(children.zipWithIndex.map {
737737
// The first child is the number of rows for stack.
@@ -751,7 +751,8 @@ object TypeCoercion {
751751
*/
752752
case class ConcatCoercion(conf: SQLConf) extends TypeCoercionRule {
753753

754-
override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transform { case p =>
754+
override protected def coerceTypes(
755+
plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown { case p =>
755756
p transformExpressionsUp {
756757
// Skip nodes if unresolved or empty children
757758
case c @ Concat(children) if !c.childrenResolved || children.isEmpty => c
@@ -773,7 +774,8 @@ object TypeCoercion {
773774
*/
774775
case class EltCoercion(conf: SQLConf) extends TypeCoercionRule {
775776

776-
override protected def coerceTypes(plan: LogicalPlan): LogicalPlan = plan transform { case p =>
777+
override protected def coerceTypes(
778+
plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown { case p =>
777779
p transformExpressionsUp {
778780
// Skip nodes if unresolved or not enough children
779781
case c @ Elt(children) if !c.childrenResolved || children.size < 2 => c
@@ -801,7 +803,7 @@ object TypeCoercion {
801803

802804
private val acceptedTypes = Seq(DateType, TimestampType, StringType)
803805

804-
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
806+
def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
805807
// Skip nodes who's children have not been resolved yet.
806808
case e if !e.childrenResolved => e
807809

@@ -822,7 +824,7 @@ object TypeCoercion {
822824
private def rejectTzInString = conf.getConf(SQLConf.REJECT_TIMEZONE_IN_STRING)
823825

824826
override protected def coerceTypes(
825-
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
827+
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
826828
// Skip nodes who's children have not been resolved yet.
827829
case e if !e.childrenResolved => e
828830

@@ -961,7 +963,7 @@ object TypeCoercion {
961963
*/
962964
object WindowFrameCoercion extends TypeCoercionRule {
963965
override protected def coerceTypes(
964-
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
966+
plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
965967
case s @ WindowSpecDefinition(_, Seq(order), SpecifiedWindowFrame(RangeFrame, lower, upper))
966968
if order.resolved =>
967969
s.copy(frameSpecification = SpecifiedWindowFrame(
@@ -999,7 +1001,7 @@ trait TypeCoercionRule extends Rule[LogicalPlan] with Logging {
9991001

10001002
protected def coerceTypes(plan: LogicalPlan): LogicalPlan
10011003

1002-
private def propagateTypes(plan: LogicalPlan): LogicalPlan = plan transformUp {
1004+
private def propagateTypes(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
10031005
// No propagation required for leaf nodes.
10041006
case q: LogicalPlan if q.children.isEmpty => q
10051007

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ case class ResolveTimeZone(conf: SQLConf) extends Rule[LogicalPlan] {
3838
}
3939

4040
override def apply(plan: LogicalPlan): LogicalPlan =
41-
plan.transformAllExpressions(transformTimeZoneExprs)
41+
plan.resolveExpressions(transformTimeZoneExprs)
4242

4343
def resolveTimeZones(e: Expression): Expression = e.transform(transformTimeZoneExprs)
4444
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ import org.apache.spark.sql.internal.SQLConf
4848
* completely resolved during the batch of Resolution.
4949
*/
5050
case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport {
51-
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
51+
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
5252
case v @ View(desc, output, child) if child.resolved && output != child.output =>
5353
val resolver = conf.resolver
5454
val queryColumnNames = desc.viewQueryColumnNames

0 commit comments

Comments
 (0)