Skip to content

Commit aaa21d8

Browse files
mgaido91cloud-fan
authored andcommitted
[SPARK-26057][SQL] Transform also analyzed plans when dedup references
## What changes were proposed in this pull request? In SPARK-24865 `AnalysisBarrier` was removed and in order to improve resolution speed, the `analyzed` flag was (re-)introduced in order to process only plans which are not yet analyzed. This should not be the case when performing attribute deduplication as in that case we need to transform also the plans which were already analyzed, otherwise we can miss to rewrite some attributes leading to invalid plans. ## How was this patch tested? added UT Please review http://spark.apache.org/contributing.html before opening a pull request. Closes apache#23035 from mgaido91/SPARK-26057. Authored-by: Marco Gaido <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit b46f75a) Signed-off-by: Wenchen Fan <[email protected]>
1 parent ba638a7 commit aaa21d8

File tree

2 files changed

+26
-1
lines changed

2 files changed

+26
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -871,7 +871,7 @@ class Analyzer(
871871
private def dedupOuterReferencesInSubquery(
872872
plan: LogicalPlan,
873873
attrMap: AttributeMap[Attribute]): LogicalPlan = {
874-
plan resolveOperatorsDown { case currentFragment =>
874+
plan transformDown { case currentFragment =>
875875
currentFragment transformExpressions {
876876
case OuterReference(a: Attribute) =>
877877
OuterReference(dedupAttr(a, attrMap))

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2597,4 +2597,29 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
25972597

25982598
checkAnswer(swappedDf.filter($"key"($"map") > "a"), Row(2, Map(2 -> "b")))
25992599
}
2600+
2601+
test("SPARK-26057: attribute deduplication on already analyzed plans") {
2602+
withTempView("a", "b", "v") {
2603+
val df1 = Seq(("1-1", 6)).toDF("id", "n")
2604+
df1.createOrReplaceTempView("a")
2605+
val df3 = Seq("1-1").toDF("id")
2606+
df3.createOrReplaceTempView("b")
2607+
spark.sql(
2608+
"""
2609+
|SELECT a.id, n as m
2610+
|FROM a
2611+
|WHERE EXISTS(
2612+
| SELECT 1
2613+
| FROM b
2614+
| WHERE b.id = a.id)
2615+
""".stripMargin).createOrReplaceTempView("v")
2616+
val res = spark.sql(
2617+
"""
2618+
|SELECT a.id, n, m
2619+
| FROM a
2620+
| LEFT OUTER JOIN v ON v.id = a.id
2621+
""".stripMargin)
2622+
checkAnswer(res, Row("1-1", 6, 6))
2623+
}
2624+
}
26002625
}

0 commit comments

Comments
 (0)