Skip to content

Commit b8b5acd

Browse files
dilipbiswalcloud-fan
authored andcommitted
[SPARK-19712][SQL][FOLLOW-UP] Don't do partial pushdown when pushing down LeftAnti joins below Aggregate or Window operators.
## What changes were proposed in this pull request? After [23750](apache#23750), we may pushdown left anti joins below aggregate and window operators with a partial join condition. This is not correct and was pointed out by hvanhovell and cloud-fan [here](apache#23750 (comment)). This pr addresses their comments. ## How was this patch tested? Added two new tests to verify the behaviour. Closes apache#24253 from dilipbiswal/SPARK-19712-followup. Authored-by: Dilip Biswal <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 3628242 commit b8b5acd

File tree

2 files changed

+73
-6
lines changed

2 files changed

+73
-6
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,18 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper {
8282
val newAgg = agg.copy(child = Join(agg.child, rightOp, joinType, Option(replaced), hint))
8383
// If there is no more filter to stay up, just return the Aggregate over Join.
8484
// Otherwise, create "Filter(stayUp) <- Aggregate <- Join(pushDownPredicate)".
85-
if (stayUp.isEmpty) newAgg else Filter(stayUp.reduce(And), newAgg)
85+
if (stayUp.isEmpty) {
86+
newAgg
87+
} else {
88+
joinType match {
89+
// In case of Left semi join, the part of the join condition which does not refer to
90+
// to child attributes of the aggregate operator are kept as a Filter over window.
91+
case LeftSemi => Filter(stayUp.reduce(And), newAgg)
92+
// In case of left anti join, the join is pushed down when the entire join condition
93+
// is eligible to be pushed down to preserve the semantics of left anti join.
94+
case _ => join
95+
}
96+
}
8697
} else {
8798
// The join condition is not a subset of the Aggregate's GROUP BY columns,
8899
// no push down.
@@ -114,7 +125,18 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper {
114125
if (pushDown.nonEmpty && rightOpColumns.isEmpty) {
115126
val predicate = pushDown.reduce(And)
116127
val newPlan = w.copy(child = Join(w.child, rightOp, joinType, Option(predicate), hint))
117-
if (stayUp.isEmpty) newPlan else Filter(stayUp.reduce(And), newPlan)
128+
if (stayUp.isEmpty) {
129+
newPlan
130+
} else {
131+
joinType match {
132+
// In case of Left semi join, the part of the join condition which does not refer to
133+
// to partition attributes of the window operator are kept as a Filter over window.
134+
case LeftSemi => Filter(stayUp.reduce(And), newPlan)
135+
// In case of left anti join, the join is pushed down when the entire join condition
136+
// is eligible to be pushed down to preserve the semantics of left anti join.
137+
case _ => join
138+
}
139+
}
118140
} else {
119141
// The join condition is not a subset of the Window's PARTITION BY clause,
120142
// no push down.
@@ -184,7 +206,14 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper {
184206
if (pushDown.nonEmpty && rightOpColumns.isEmpty) {
185207
val newChild = insertJoin(Option(pushDown.reduceLeft(And)))
186208
if (stayUp.nonEmpty) {
187-
Filter(stayUp.reduceLeft(And), newChild)
209+
join.joinType match {
210+
// In case of Left semi join, the part of the join condition which does not refer to
211+
// to attributes of the grandchild are kept as a Filter over window.
212+
case LeftSemi => Filter(stayUp.reduce(And), newChild)
213+
// In case of left anti join, the join is pushed down when the entire join condition
214+
// is eligible to be pushed down to preserve the semantics of left anti join.
215+
case _ => join
216+
}
188217
} else {
189218
newChild
190219
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ class LeftSemiPushdownSuite extends PlanTest {
117117
comparePlans(optimized, originalQuery.analyze)
118118
}
119119

120-
test("Aggregate: LeftSemiAnti join partial pushdown") {
120+
test("Aggregate: LeftSemi join partial pushdown") {
121121
val originalQuery = testRelation
122122
.groupBy('b)('b, sum('c).as('sum))
123123
.join(testRelation1, joinType = LeftSemi, condition = Some('b === 'd && 'sum === 10))
@@ -132,6 +132,15 @@ class LeftSemiPushdownSuite extends PlanTest {
132132
comparePlans(optimized, correctAnswer)
133133
}
134134

135+
test("Aggregate: LeftAnti join no pushdown") {
136+
val originalQuery = testRelation
137+
.groupBy('b)('b, sum('c).as('sum))
138+
.join(testRelation1, joinType = LeftAnti, condition = Some('b === 'd && 'sum === 10))
139+
140+
val optimized = Optimize.execute(originalQuery.analyze)
141+
comparePlans(optimized, originalQuery.analyze)
142+
}
143+
135144
test("LeftSemiAnti join over aggregate - no pushdown") {
136145
val originalQuery = testRelation
137146
.groupBy('b)('b, sum('c).as('sum))
@@ -174,7 +183,7 @@ class LeftSemiPushdownSuite extends PlanTest {
174183
comparePlans(optimized, correctAnswer)
175184
}
176185

177-
test("Window: LeftSemiAnti partial pushdown") {
186+
test("Window: LeftSemi partial pushdown") {
178187
// Attributes from join condition which does not refer to the window partition spec
179188
// are kept up in the plan as a Filter operator above Window.
180189
val winExpr = windowExpr(count('b), windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
@@ -195,6 +204,25 @@ class LeftSemiPushdownSuite extends PlanTest {
195204
comparePlans(optimized, correctAnswer)
196205
}
197206

207+
test("Window: LeftAnti no pushdown") {
208+
// Attributes from join condition which does not refer to the window partition spec
209+
// are kept up in the plan as a Filter operator above Window.
210+
val winExpr = windowExpr(count('b), windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame))
211+
212+
val originalQuery = testRelation
213+
.select('a, 'b, 'c, winExpr.as('window))
214+
.join(testRelation1, joinType = LeftAnti, condition = Some('a === 'd && 'b > 5))
215+
216+
val optimized = Optimize.execute(originalQuery.analyze)
217+
218+
val correctAnswer = testRelation
219+
.select('a, 'b, 'c)
220+
.window(winExpr.as('window) :: Nil, 'a :: Nil, 'b.asc :: Nil)
221+
.join(testRelation1, joinType = LeftAnti, condition = Some('a === 'd && 'b > 5))
222+
.select('a, 'b, 'c, 'window).analyze
223+
comparePlans(optimized, correctAnswer)
224+
}
225+
198226
test("Union: LeftSemiAnti join pushdown") {
199227
val testRelation2 = LocalRelation('x.int, 'y.int, 'z.int)
200228

@@ -251,7 +279,7 @@ class LeftSemiPushdownSuite extends PlanTest {
251279
comparePlans(optimized, correctAnswer)
252280
}
253281

254-
test("Unary: LeftSemiAnti join pushdown - partial pushdown") {
282+
test("Unary: LeftSemi join pushdown - partial pushdown") {
255283
val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType))
256284
val originalQuery = testRelationWithArrayType
257285
.generate(Explode('c_arr), alias = Some("arr"), outputNames = Seq("out_col"))
@@ -267,6 +295,16 @@ class LeftSemiPushdownSuite extends PlanTest {
267295
comparePlans(optimized, correctAnswer)
268296
}
269297

298+
test("Unary: LeftAnti join pushdown - no pushdown") {
299+
val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType))
300+
val originalQuery = testRelationWithArrayType
301+
.generate(Explode('c_arr), alias = Some("arr"), outputNames = Seq("out_col"))
302+
.join(testRelation1, joinType = LeftAnti, condition = Some('b === 'd && 'b === 'out_col))
303+
304+
val optimized = Optimize.execute(originalQuery.analyze)
305+
comparePlans(optimized, originalQuery.analyze)
306+
}
307+
270308
test("Unary: LeftSemiAnti join pushdown - no pushdown") {
271309
val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType))
272310
val originalQuery = testRelationWithArrayType

0 commit comments

Comments
 (0)