Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 8866174

Browse files
committed
[SPARK-22018][SQL] Preserve top-level alias metadata when collapsing projects
## What changes were proposed in this pull request? If there are two projects like as follows. ``` Project [a_with_metadata#27 AS b#26] +- Project [a#0 AS a_with_metadata#27] +- LocalRelation <empty>, [a#0, b#1] ``` Child Project has an output column with a metadata in it, and the parent Project has an alias that implicitly forwards the metadata. So this metadata is visible for higher operators. Upon applying CollapseProject optimizer rule, the metadata is not preserved. ``` Project [a#0 AS b#26] +- LocalRelation <empty>, [a#0, b#1] ``` This is incorrect, as downstream operators that expect certain metadata (e.g. watermark in structured streaming) to identify certain fields will fail to do so. This PR fixes it by preserving the metadata of top-level aliases. ## How was this patch tested? New unit test Author: Tathagata Das <[email protected]> Closes apache#19240 from tdas/SPARK-22018.
1 parent a28728a commit 8866174

File tree

2 files changed

+25
-3
lines changed

2 files changed

+25
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2256,7 +2256,10 @@ object CleanupAliases extends Rule[LogicalPlan] {
22562256

22572257
def trimNonTopLevelAliases(e: Expression): Expression = e match {
22582258
case a: Alias =>
2259-
a.withNewChildren(trimAliases(a.child) :: Nil)
2259+
a.copy(child = trimAliases(a.child))(
2260+
exprId = a.exprId,
2261+
qualifier = a.qualifier,
2262+
explicitMetadata = Some(a.metadata))
22602263
case other => trimAliases(other)
22612264
}
22622265

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseProjectSuite.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ package org.apache.spark.sql.catalyst.optimizer
2020
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
2121
import org.apache.spark.sql.catalyst.dsl.expressions._
2222
import org.apache.spark.sql.catalyst.dsl.plans._
23-
import org.apache.spark.sql.catalyst.expressions.Rand
23+
import org.apache.spark.sql.catalyst.expressions.{Alias, Rand}
2424
import org.apache.spark.sql.catalyst.plans.PlanTest
25-
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
25+
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
2626
import org.apache.spark.sql.catalyst.rules.RuleExecutor
27+
import org.apache.spark.sql.types.MetadataBuilder
2728

2829
class CollapseProjectSuite extends PlanTest {
2930
object Optimize extends RuleExecutor[LogicalPlan] {
@@ -119,4 +120,22 @@ class CollapseProjectSuite extends PlanTest {
119120

120121
comparePlans(optimized, correctAnswer)
121122
}
123+
124+
test("preserve top-level alias metadata while collapsing projects") {
125+
def hasMetadata(logicalPlan: LogicalPlan): Boolean = {
126+
logicalPlan.asInstanceOf[Project].projectList.exists(_.metadata.contains("key"))
127+
}
128+
129+
val metadata = new MetadataBuilder().putLong("key", 1).build()
130+
val analyzed =
131+
Project(Seq(Alias('a_with_metadata, "b")()),
132+
Project(Seq(Alias('a, "a_with_metadata")(explicitMetadata = Some(metadata))),
133+
testRelation.logicalPlan)).analyze
134+
require(hasMetadata(analyzed))
135+
136+
val optimized = Optimize.execute(analyzed)
137+
val projects = optimized.collect { case p: Project => p }
138+
assert(projects.size === 1)
139+
assert(hasMetadata(optimized))
140+
}
122141
}

0 commit comments

Comments
 (0)