Skip to content

Commit 1e3b876

Browse files
maryannxuecloud-fan
authored andcommitted
[SPARK-21479][SQL] Outer join filter pushdown in null supplying table when condition is on one of the joined columns
## What changes were proposed in this pull request? Added `TransitPredicateInOuterJoin` optimization rule that transits constraints from the preserved side of an outer join to the null-supplying side. The constraints of the join operator will remain unchanged. ## How was this patch tested? Added 3 tests in `InferFiltersFromConstraintsSuite`. Author: maryannxue <[email protected]> Closes apache#20816 from maryannxue/spark-21479.
1 parent 5fccdae commit 1e3b876

File tree

3 files changed

+96
-7
lines changed

3 files changed

+96
-7
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -637,8 +637,11 @@ object CollapseWindow extends Rule[LogicalPlan] {
637637
* constraints. These filters are currently inserted to the existing conditions in the Filter
638638
* operators and on either side of Join operators.
639639
*
640-
* Note: While this optimization is applicable to all types of join, it primarily benefits Inner and
641-
* LeftSemi joins.
640+
* In addition, for left/right outer joins, infer predicate from the preserved side of the Join
641+
* operator and push the inferred filter over to the null-supplying side. For example, if the
642+
* preserved side has constraints of the form 'a > 5' and the join condition is 'a = b', in
643+
* which 'b' is an attribute from the null-supplying side, a [[Filter]] operator of 'b > 5' will
644+
* be applied to the null-supplying side.
642645
*/
643646
object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper {
644647

@@ -671,11 +674,42 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
671674
val newConditionOpt = conditionOpt match {
672675
case Some(condition) =>
673676
val newFilters = additionalConstraints -- splitConjunctivePredicates(condition)
674-
if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None
677+
if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt
675678
case None =>
676679
additionalConstraints.reduceOption(And)
677680
}
678-
if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join
681+
// Infer filter for left/right outer joins
682+
val newLeftOpt = joinType match {
683+
case RightOuter if newConditionOpt.isDefined =>
684+
val inferredConstraints = left.getRelevantConstraints(
685+
left.constraints
686+
.union(right.constraints)
687+
.union(splitConjunctivePredicates(newConditionOpt.get).toSet))
688+
val newFilters = inferredConstraints
689+
.filterNot(left.constraints.contains)
690+
.reduceLeftOption(And)
691+
newFilters.map(Filter(_, left))
692+
case _ => None
693+
}
694+
val newRightOpt = joinType match {
695+
case LeftOuter if newConditionOpt.isDefined =>
696+
val inferredConstraints = right.getRelevantConstraints(
697+
right.constraints
698+
.union(left.constraints)
699+
.union(splitConjunctivePredicates(newConditionOpt.get).toSet))
700+
val newFilters = inferredConstraints
701+
.filterNot(right.constraints.contains)
702+
.reduceLeftOption(And)
703+
newFilters.map(Filter(_, right))
704+
case _ => None
705+
}
706+
707+
if ((newConditionOpt.isDefined && (newConditionOpt ne conditionOpt))
708+
|| newLeftOpt.isDefined || newRightOpt.isDefined) {
709+
Join(newLeftOpt.getOrElse(left), newRightOpt.getOrElse(right), joinType, newConditionOpt)
710+
} else {
711+
join
712+
}
679713
}
680714
}
681715

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,7 @@ trait QueryPlanConstraints { self: LogicalPlan =>
4141
* example, if this set contains the expression `a = 2` then that expression is guaranteed to
4242
* evaluate to `true` for all rows produced.
4343
*/
44-
lazy val constraints: ExpressionSet = ExpressionSet(allConstraints.filter { c =>
45-
c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
46-
})
44+
lazy val constraints: ExpressionSet = ExpressionSet(allConstraints.filter(selfReferenceOnly))
4745

4846
/**
4947
* This method can be overridden by any child class of QueryPlan to specify a set of constraints
@@ -55,6 +53,23 @@ trait QueryPlanConstraints { self: LogicalPlan =>
5553
*/
5654
protected def validConstraints: Set[Expression] = Set.empty
5755

56+
/**
57+
* Returns an [[ExpressionSet]] that contains an additional set of constraints, such as
58+
* equality constraints and `isNotNull` constraints, etc., and that only contains references
59+
* to this [[LogicalPlan]] node.
60+
*/
61+
def getRelevantConstraints(constraints: Set[Expression]): ExpressionSet = {
62+
val allRelevantConstraints =
63+
if (conf.constraintPropagationEnabled) {
64+
constraints
65+
.union(inferAdditionalConstraints(constraints))
66+
.union(constructIsNotNullConstraints(constraints))
67+
} else {
68+
constraints
69+
}
70+
ExpressionSet(allRelevantConstraints.filter(selfReferenceOnly))
71+
}
72+
5873
/**
5974
* Infers a set of `isNotNull` constraints from null intolerant expressions as well as
6075
* non-nullable attributes. For e.g., if an expression is of the form (`a > 5`), this
@@ -120,4 +135,8 @@ trait QueryPlanConstraints { self: LogicalPlan =>
120135
destination: Attribute): Set[Expression] = constraints.map(_ transform {
121136
case e: Expression if e.semanticEquals(source) => destination
122137
})
138+
139+
private def selfReferenceOnly(e: Expression): Boolean = {
140+
e.references.nonEmpty && e.references.subsetOf(outputSet) && e.deterministic
141+
}
123142
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,4 +204,40 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
204204
val optimized = Optimize.execute(originalQuery)
205205
comparePlans(optimized, correctAnswer)
206206
}
207+
208+
test("SPARK-21479: Outer join after-join filters push down to null-supplying side") {
209+
val x = testRelation.subquery('x)
210+
val y = testRelation.subquery('y)
211+
val condition = Some("x.a".attr === "y.a".attr)
212+
val originalQuery = x.join(y, LeftOuter, condition).where("x.a".attr === 2).analyze
213+
val left = x.where(IsNotNull('a) && 'a === 2)
214+
val right = y.where(IsNotNull('a) && 'a === 2)
215+
val correctAnswer = left.join(right, LeftOuter, condition).analyze
216+
val optimized = Optimize.execute(originalQuery)
217+
comparePlans(optimized, correctAnswer)
218+
}
219+
220+
test("SPARK-21479: Outer join pre-existing filters push down to null-supplying side") {
221+
val x = testRelation.subquery('x)
222+
val y = testRelation.subquery('y)
223+
val condition = Some("x.a".attr === "y.a".attr)
224+
val originalQuery = x.join(y.where("y.a".attr > 5), RightOuter, condition).analyze
225+
val left = x.where(IsNotNull('a) && 'a > 5)
226+
val right = y.where(IsNotNull('a) && 'a > 5)
227+
val correctAnswer = left.join(right, RightOuter, condition).analyze
228+
val optimized = Optimize.execute(originalQuery)
229+
comparePlans(optimized, correctAnswer)
230+
}
231+
232+
test("SPARK-21479: Outer join no filter push down to preserved side") {
233+
val x = testRelation.subquery('x)
234+
val y = testRelation.subquery('y)
235+
val condition = Some("x.a".attr === "y.a".attr)
236+
val originalQuery = x.join(y.where("y.a".attr === 1), LeftOuter, condition).analyze
237+
val left = x
238+
val right = y.where(IsNotNull('a) && 'a === 1)
239+
val correctAnswer = left.join(right, LeftOuter, condition).analyze
240+
val optimized = Optimize.execute(originalQuery)
241+
comparePlans(optimized, correctAnswer)
242+
}
207243
}

0 commit comments

Comments
 (0)