Skip to content

Commit 150b653

Browse files
author
Robert Kruszewski
committed
[SPARK-23802][SQL] PropagateEmptyRelation can leave query plan in unresolved state
## What changes were proposed in this pull request? Add cast to nulls introduced by PropagateEmptyRelation so in cases they're part of coalesce they will not break its type checking rules ## How was this patch tested? Added unit test Author: Robert Kruszewski <[email protected]> Closes apache#20914 from robert3005/rk/propagate-empty-fix.
1 parent c781454 commit 150b653

File tree

2 files changed

+25
-9
lines changed

2 files changed

+25
-9
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717

1818
package org.apache.spark.sql.catalyst.optimizer
1919

20+
import org.apache.spark.sql.catalyst.analysis.CastSupport
2021
import org.apache.spark.sql.catalyst.expressions._
2122
import org.apache.spark.sql.catalyst.plans._
2223
import org.apache.spark.sql.catalyst.plans.logical._
2324
import org.apache.spark.sql.catalyst.rules._
25+
import org.apache.spark.sql.internal.SQLConf
2426

2527
/**
2628
* Collapse plans consisting empty local relations generated by [[PruneFilters]].
@@ -32,7 +34,7 @@ import org.apache.spark.sql.catalyst.rules._
3234
* - Aggregate with all empty children and at least one grouping expression.
3335
* - Generate(Explode) with all empty children. Others like Hive UDTF may return results.
3436
*/
35-
object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
37+
object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper with CastSupport {
3638
private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match {
3739
case p: LocalRelation => p.data.isEmpty
3840
case _ => false
@@ -43,7 +45,9 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper {
4345

4446
// Construct a project list from plan's output, while the value is always NULL.
4547
private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] =
46-
plan.output.map{ a => Alias(Literal(null), a.name)(a.exprId) }
48+
plan.output.map{ a => Alias(cast(Literal(null), a.dataType), a.name)(a.exprId) }
49+
50+
override def conf: SQLConf = SQLConf.get
4751

4852
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
4953
case p: Union if p.children.forall(isEmptyLocalRelation) =>

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal
2525
import org.apache.spark.sql.catalyst.plans._
2626
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
2727
import org.apache.spark.sql.catalyst.rules.RuleExecutor
28-
import org.apache.spark.sql.types.StructType
28+
import org.apache.spark.sql.types.{IntegerType, StructType}
2929

3030
class PropagateEmptyRelationSuite extends PlanTest {
3131
object Optimize extends RuleExecutor[LogicalPlan] {
@@ -37,7 +37,8 @@ class PropagateEmptyRelationSuite extends PlanTest {
3737
ReplaceIntersectWithSemiJoin,
3838
PushDownPredicate,
3939
PruneFilters,
40-
PropagateEmptyRelation) :: Nil
40+
PropagateEmptyRelation,
41+
CollapseProject) :: Nil
4142
}
4243

4344
object OptimizeWithoutPropagateEmptyRelation extends RuleExecutor[LogicalPlan] {
@@ -48,7 +49,8 @@ class PropagateEmptyRelationSuite extends PlanTest {
4849
ReplaceExceptWithAntiJoin,
4950
ReplaceIntersectWithSemiJoin,
5051
PushDownPredicate,
51-
PruneFilters) :: Nil
52+
PruneFilters,
53+
CollapseProject) :: Nil
5254
}
5355

5456
val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = Seq(Row(1)))
@@ -79,18 +81,21 @@ class PropagateEmptyRelationSuite extends PlanTest {
7981

8082
(true, false, Inner, Some(LocalRelation('a.int, 'b.int))),
8183
(true, false, Cross, Some(LocalRelation('a.int, 'b.int))),
82-
(true, false, LeftOuter, Some(Project(Seq('a, Literal(null).as('b)), testRelation1).analyze)),
84+
(true, false, LeftOuter,
85+
Some(Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze)),
8386
(true, false, RightOuter, Some(LocalRelation('a.int, 'b.int))),
84-
(true, false, FullOuter, Some(Project(Seq('a, Literal(null).as('b)), testRelation1).analyze)),
87+
(true, false, FullOuter,
88+
Some(Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze)),
8589
(true, false, LeftAnti, Some(testRelation1)),
8690
(true, false, LeftSemi, Some(LocalRelation('a.int))),
8791

8892
(false, true, Inner, Some(LocalRelation('a.int, 'b.int))),
8993
(false, true, Cross, Some(LocalRelation('a.int, 'b.int))),
9094
(false, true, LeftOuter, Some(LocalRelation('a.int, 'b.int))),
9195
(false, true, RightOuter,
92-
Some(Project(Seq(Literal(null).as('a), 'b), testRelation2).analyze)),
93-
(false, true, FullOuter, Some(Project(Seq(Literal(null).as('a), 'b), testRelation2).analyze)),
96+
Some(Project(Seq(Literal(null).cast(IntegerType).as('a), 'b), testRelation2).analyze)),
97+
(false, true, FullOuter,
98+
Some(Project(Seq(Literal(null).cast(IntegerType).as('a), 'b), testRelation2).analyze)),
9499
(false, true, LeftAnti, Some(LocalRelation('a.int))),
95100
(false, true, LeftSemi, Some(LocalRelation('a.int))),
96101

@@ -209,4 +214,11 @@ class PropagateEmptyRelationSuite extends PlanTest {
209214

210215
comparePlans(optimized, correctAnswer)
211216
}
217+
218+
test("propagate empty relation keeps the plan resolved") {
219+
val query = testRelation1.join(
220+
LocalRelation('a.int, 'b.int), UsingJoin(FullOuter, "a" :: Nil), None)
221+
val optimized = Optimize.execute(query.analyze)
222+
assert(optimized.resolved)
223+
}
212224
}

0 commit comments

Comments
 (0)