Skip to content

Commit 04e53d2

Browse files
aayushmaanjaingatorsmile
authored andcommitted
[SPAR-27342][SQL] Optimize Limit 0 queries
## What changes were proposed in this pull request? With this change, unnecessary file scans are avoided in case of Limit 0 queries. I added a case (rule) to `PropagateEmptyRelation` to replace `GlobalLimit 0` and `LocalLimit 0` nodes with an empty `LocalRelation`. This prunes the subtree under the Limit 0 node and further allows other rules of `PropagateEmptyRelation` to optimize the Logical Plan - while remaining semantically consistent with the Limit 0 query. For instance: **Query:** `SELECT * FROM table1 INNER JOIN (SELECT * FROM table2 LIMIT 0) AS table2 ON table1.id = table2.id` **Optimized Plan without fix:** ``` Join Inner, (id#79 = id#87) :- Filter isnotnull(id#79) : +- Relation[id#79,num1#80] parquet +- Filter isnotnull(id#87) +- GlobalLimit 0 +- LocalLimit 0 +- Relation[id#87,num2#88] parquet ``` **Optimized Plan with fix:** `LocalRelation <empty>, [id#75, num1#76, id#77, num2#78]` ## How was this patch tested? Added unit tests to verify Limit 0 optimization for: - Simple query containing Limit 0 - Inner Join, Left Outer Join, Right Outer Join, Full Outer Join queries containing Limit 0 as one of their children - Nested Inner Joins between 3 tables with one of them having a Limit 0 clause. - Intersect query wherein one of the subqueries was a Limit 0 query. Closes apache#24271 from aayushmaanjain/optimize-limit0. Authored-by: Aayushmaan Jain <[email protected]> Signed-off-by: gatorsmile <[email protected]>
1 parent 0e44a51 commit 04e53d2

File tree

2 files changed

+144
-0
lines changed

2 files changed

+144
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
137137
// since the other rules might make two separate Unions operators adjacent.
138138
Batch("Union", Once,
139139
CombineUnions) ::
140+
Batch("OptimizeLimitZero", Once,
141+
OptimizeLimitZero) ::
140142
// Run this once earlier. This might simplify the plan and reduce cost of optimizer.
141143
// For example, a query such as Filter(LocalRelation) would go through all the heavy
142144
// optimizer rules that are triggered when there is a filter
@@ -1681,3 +1683,37 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] {
16811683
}
16821684
}
16831685
}
1686+
1687+
/**
1688+
* Replaces GlobalLimit 0 and LocalLimit 0 nodes (subtree) with empty Local Relation, as they don't
1689+
* return any rows.
1690+
*/
1691+
object OptimizeLimitZero extends Rule[LogicalPlan] {
1692+
// returns empty Local Relation corresponding to given plan
1693+
private def empty(plan: LogicalPlan) =
1694+
LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming)
1695+
1696+
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
1697+
// Nodes below GlobalLimit or LocalLimit can be pruned if the limit value is zero (0).
1698+
// Any subtree in the logical plan that has GlobalLimit 0 or LocalLimit 0 as its root is
1699+
// semantically equivalent to an empty relation.
1700+
//
1701+
// In such cases, the effects of Limit 0 can be propagated through the Logical Plan by replacing
1702+
// the (Global/Local) Limit subtree with an empty LocalRelation, thereby pruning the subtree
1703+
// below and triggering other optimization rules of PropagateEmptyRelation to propagate the
1704+
// changes up the Logical Plan.
1705+
//
1706+
// Replace Global Limit 0 nodes with empty Local Relation
1707+
case gl @ GlobalLimit(IntegerLiteral(0), _) =>
1708+
empty(gl)
1709+
1710+
// Note: For all SQL queries, if a LocalLimit 0 node exists in the Logical Plan, then a
1711+
// GlobalLimit 0 node would also exist. Thus, the above case would be sufficient to handle
1712+
// almost all cases. However, if a user explicitly creates a Logical Plan with LocalLimit 0 node
1713+
// then the following rule will handle that case as well.
1714+
//
1715+
// Replace Local Limit 0 nodes with empty Local Relation
1716+
case ll @ LocalLimit(IntegerLiteral(0), _) =>
1717+
empty(ll)
1718+
}
1719+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.optimizer
19+
20+
import org.apache.spark.sql.Row
21+
import org.apache.spark.sql.catalyst.dsl.expressions._
22+
import org.apache.spark.sql.catalyst.dsl.plans._
23+
import org.apache.spark.sql.catalyst.expressions.Literal
24+
import org.apache.spark.sql.catalyst.plans._
25+
import org.apache.spark.sql.catalyst.plans.logical.{Distinct, GlobalLimit, LocalLimit, LocalRelation, LogicalPlan, Project}
26+
import org.apache.spark.sql.catalyst.rules.RuleExecutor
27+
import org.apache.spark.sql.types.IntegerType
28+
29+
// Test class to verify correct functioning of OptimizeLimitZero rule in various scenarios
30+
class OptimizeLimitZeroSuite extends PlanTest {
31+
object Optimize extends RuleExecutor[LogicalPlan] {
32+
val batches =
33+
Batch("OptimizeLimitZero", Once,
34+
ReplaceIntersectWithSemiJoin,
35+
OptimizeLimitZero,
36+
PropagateEmptyRelation) :: Nil
37+
}
38+
39+
val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = Seq(Row(1)))
40+
val testRelation2 = LocalRelation.fromExternalRows(Seq('b.int), data = Seq(Row(1)))
41+
42+
test("Limit 0: return empty local relation") {
43+
val query = testRelation1.limit(0)
44+
45+
val optimized = Optimize.execute(query.analyze)
46+
val correctAnswer = LocalRelation('a.int)
47+
48+
comparePlans(optimized, correctAnswer)
49+
}
50+
51+
test("Limit 0: individual LocalLimit 0 node") {
52+
val query = LocalLimit(0, testRelation1)
53+
54+
val optimized = Optimize.execute(query.analyze)
55+
val correctAnswer = LocalRelation('a.int)
56+
57+
comparePlans(optimized, correctAnswer)
58+
}
59+
60+
test("Limit 0: individual GlobalLimit 0 node") {
61+
val query = GlobalLimit(0, testRelation1)
62+
63+
val optimized = Optimize.execute(query.analyze)
64+
val correctAnswer = LocalRelation('a.int)
65+
66+
comparePlans(optimized, correctAnswer)
67+
}
68+
69+
Seq(
70+
(Inner, LocalRelation('a.int, 'b.int)),
71+
(LeftOuter, Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze),
72+
(RightOuter, LocalRelation('a.int, 'b.int)),
73+
(FullOuter, Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze)
74+
).foreach { case (jt, correctAnswer) =>
75+
test(s"Limit 0: for join type $jt") {
76+
val query = testRelation1
77+
.join(testRelation2.limit(0), joinType = jt, condition = Some('a.attr == 'b.attr))
78+
79+
val optimized = Optimize.execute(query.analyze)
80+
81+
comparePlans(optimized, correctAnswer)
82+
}
83+
}
84+
85+
test("Limit 0: 3-way join") {
86+
val testRelation3 = LocalRelation.fromExternalRows(Seq('c.int), data = Seq(Row(1)))
87+
88+
val subJoinQuery = testRelation1
89+
.join(testRelation2, joinType = Inner, condition = Some('a.attr == 'b.attr))
90+
val query = subJoinQuery
91+
.join(testRelation3.limit(0), joinType = Inner, condition = Some('a.attr == 'c.attr))
92+
93+
val optimized = Optimize.execute(query.analyze)
94+
val correctAnswer = LocalRelation('a.int, 'b.int, 'c.int)
95+
96+
comparePlans(optimized, correctAnswer)
97+
}
98+
99+
test("Limit 0: intersect") {
100+
val query = testRelation1
101+
.intersect(testRelation1.limit(0), isAll = false)
102+
103+
val optimized = Optimize.execute(query.analyze)
104+
val correctAnswer = Distinct(LocalRelation('a.int))
105+
106+
comparePlans(optimized, correctAnswer)
107+
}
108+
}

0 commit comments

Comments
 (0)