Skip to content

Commit 1038540

Browse files
bogdanrdcgatorsmile
authored andcommitted
[SPARK-25212][SQL] Support Filter in ConvertToLocalRelation
## What changes were proposed in this pull request? Support Filter in ConvertToLocalRelation, similar to how Project works. Additionally, in Optimizer, run ConvertToLocalRelation earlier to simplify the plan. This is good for very short queries which often are queries on local relations. ## How was this patch tested? New test. Manual benchmark. Author: Bogdan Raducanu <[email protected]> Author: Shixiong Zhu <[email protected]> Author: Yinan Li <[email protected]> Author: Li Jin <[email protected]> Author: s71955 <[email protected]> Author: DB Tsai <[email protected]> Author: jaroslav chládek <[email protected]> Author: Huangweizhe <[email protected]> Author: Xiangrui Meng <[email protected]> Author: hyukjinkwon <[email protected]> Author: Kent Yao <[email protected]> Author: caoxuewen <[email protected]> Author: liuxian <[email protected]> Author: Adam Bradbury <[email protected]> Author: Jose Torres <[email protected]> Author: Yuming Wang <[email protected]> Author: Liang-Chi Hsieh <[email protected]> Closes apache#22205 from bogdanrdc/local-relation-filter.
1 parent 7ad18ee commit 1038540

File tree

3 files changed

+36
-4
lines changed

3 files changed

+36
-4
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,14 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
130130
// since the other rules might make two separate Unions operators adjacent.
131131
Batch("Union", Once,
132132
CombineUnions) ::
133+
// run this once earlier. this might simplify the plan and reduce cost of optimizer.
134+
// for example, a query such as Filter(LocalRelation) would go through all the heavy
135+
// optimizer rules that are triggered when there is a filter
136+
// (e.g. InferFiltersFromConstraints). if we run this batch earlier, the query becomes just
137+
// LocalRelation and does not trigger many rules
138+
Batch("LocalRelation early", fixedPoint,
139+
ConvertToLocalRelation,
140+
PropagateEmptyRelation) ::
133141
Batch("Pullup Correlated Expressions", Once,
134142
PullupCorrelatedPredicates) ::
135143
Batch("Subquery", Once,
@@ -1349,6 +1357,12 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] {
13491357

13501358
case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) =>
13511359
LocalRelation(output, data.take(limit), isStreaming)
1360+
1361+
case Filter(condition, LocalRelation(output, data, isStreaming))
1362+
if !hasUnevaluableExpr(condition) =>
1363+
val predicate = InterpretedPredicate.create(condition, output)
1364+
predicate.initialize(0)
1365+
LocalRelation(output, data.filter(row => predicate.eval(row)), isStreaming)
13521366
}
13531367

13541368
private def hasUnevaluableExpr(expr: Expression): Boolean = {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow
2121
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
2222
import org.apache.spark.sql.catalyst.dsl.expressions._
2323
import org.apache.spark.sql.catalyst.dsl.plans._
24+
import org.apache.spark.sql.catalyst.expressions.{LessThan, Literal}
2425
import org.apache.spark.sql.catalyst.plans.PlanTest
2526
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
2627
import org.apache.spark.sql.catalyst.rules.RuleExecutor
@@ -52,4 +53,21 @@ class ConvertToLocalRelationSuite extends PlanTest {
5253
comparePlans(optimized, correctAnswer)
5354
}
5455

56+
test("Filter on LocalRelation should be turned into a single LocalRelation") {
57+
val testRelation = LocalRelation(
58+
LocalRelation('a.int, 'b.int).output,
59+
InternalRow(1, 2) :: InternalRow(4, 5) :: Nil)
60+
61+
val correctAnswer = LocalRelation(
62+
LocalRelation('a1.int, 'b1.int).output,
63+
InternalRow(1, 3) :: Nil)
64+
65+
val filterAndProjectOnLocal = testRelation
66+
.select(UnresolvedAttribute("a").as("a1"), (UnresolvedAttribute("b") + 1).as("b1"))
67+
.where(LessThan(UnresolvedAttribute("b1"), Literal.create(6)))
68+
69+
val optimized = Optimize.execute(filterAndProjectOnLocal.analyze)
70+
71+
comparePlans(optimized, correctAnswer)
72+
}
5573
}

sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,15 +196,15 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
196196
val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b")
197197

198198
// outer -> left
199-
val outerJoin2Left = df.join(df2, $"a.int" === $"b.int", "outer").where($"a.int" === 3)
199+
val outerJoin2Left = df.join(df2, $"a.int" === $"b.int", "outer").where($"a.int" >= 3)
200200
assert(outerJoin2Left.queryExecution.optimizedPlan.collect {
201201
case j @ Join(_, _, LeftOuter, _) => j }.size === 1)
202202
checkAnswer(
203203
outerJoin2Left,
204204
Row(3, 4, "3", null, null, null) :: Nil)
205205

206206
// outer -> right
207-
val outerJoin2Right = df.join(df2, $"a.int" === $"b.int", "outer").where($"b.int" === 5)
207+
val outerJoin2Right = df.join(df2, $"a.int" === $"b.int", "outer").where($"b.int" >= 3)
208208
assert(outerJoin2Right.queryExecution.optimizedPlan.collect {
209209
case j @ Join(_, _, RightOuter, _) => j }.size === 1)
210210
checkAnswer(
@@ -221,15 +221,15 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
221221
Row(1, 2, "1", 1, 3, "1") :: Nil)
222222

223223
// right -> inner
224-
val rightJoin2Inner = df.join(df2, $"a.int" === $"b.int", "right").where($"a.int" === 1)
224+
val rightJoin2Inner = df.join(df2, $"a.int" === $"b.int", "right").where($"a.int" > 0)
225225
assert(rightJoin2Inner.queryExecution.optimizedPlan.collect {
226226
case j @ Join(_, _, Inner, _) => j }.size === 1)
227227
checkAnswer(
228228
rightJoin2Inner,
229229
Row(1, 2, "1", 1, 3, "1") :: Nil)
230230

231231
// left -> inner
232-
val leftJoin2Inner = df.join(df2, $"a.int" === $"b.int", "left").where($"b.int2" === 3)
232+
val leftJoin2Inner = df.join(df2, $"a.int" === $"b.int", "left").where($"b.int2" > 0)
233233
assert(leftJoin2Inner.queryExecution.optimizedPlan.collect {
234234
case j @ Join(_, _, Inner, _) => j }.size === 1)
235235
checkAnswer(

0 commit comments

Comments
 (0)