Skip to content

Commit e46e487

Browse files
imback82cloud-fan
authored andcommitted
[SPARK-29682][SQL] Resolve conflicting attributes in Expand correctly
### What changes were proposed in this pull request? This PR addresses issues where conflicting attributes in `Expand` are not correctly handled. ### Why are the changes needed? ```Scala val numsDF = Seq(1, 2, 3, 4, 5, 6).toDF("nums") val cubeDF = numsDF.cube("nums").agg(max(lit(0)).as("agcol")) cubeDF.join(cubeDF, "nums").show ``` fails with the following exception: ``` org.apache.spark.sql.AnalysisException: Failure when resolving conflicting references in Join: 'Join Inner :- Aggregate [nums#38, spark_grouping_id#36], [nums#38, max(0) AS agcol#35] : +- Expand [List(nums#3, nums#37, 0), List(nums#3, null, 1)], [nums#3, nums#38, spark_grouping_id#36] : +- Project [nums#3, nums#3 AS nums#37] : +- Project [value#1 AS nums#3] : +- LocalRelation [value#1] +- Aggregate [nums#38, spark_grouping_id#36], [nums#38, max(0) AS agcol#58] +- Expand [List(nums#3, nums#37, 0), List(nums#3, null, 1)], [nums#3, nums#38, spark_grouping_id#36] ^^^^^^^ +- Project [nums#3, nums#3 AS nums#37] +- Project [value#1 AS nums#3] +- LocalRelation [value#1] Conflicting attributes: nums#38 ``` As you can see from the above plan, `num#38`, the output of `Expand` on the right side of `Join`, should have been handled to produce new attribute. Since the conflict is not resolved in `Expand`, the failure is happening upstream at `Aggregate`. This PR addresses handling conflicting attributes in `Expand`. ### Does this PR introduce any user-facing change? Yes, the previous example now shows the following output: ``` +----+-----+-----+ |nums|agcol|agcol| +----+-----+-----+ | 1| 0| 0| | 6| 0| 0| | 4| 0| 0| | 2| 0| 0| | 5| 0| 0| | 3| 0| 0| +----+-----+-----+ ``` ### How was this patch tested? Added new unit test. Closes apache#26441 from imback82/spark-29682. Authored-by: Terry Kim <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent b5a02d3 commit e46e487

File tree

3 files changed

+23
-0
lines changed

3 files changed

+23
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -972,6 +972,18 @@ class Analyzer(
972972
val newOutput = oldVersion.generatorOutput.map(_.newInstance())
973973
(oldVersion, oldVersion.copy(generatorOutput = newOutput))
974974

975+
case oldVersion: Expand
976+
if oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty =>
977+
val producedAttributes = oldVersion.producedAttributes
978+
val newOutput = oldVersion.output.map { attr =>
979+
if (producedAttributes.contains(attr)) {
980+
attr.newInstance()
981+
} else {
982+
attr
983+
}
984+
}
985+
(oldVersion, oldVersion.copy(output = newOutput))
986+
975987
case oldVersion @ Window(windowExpressions, _, _, child)
976988
if AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes)
977989
.nonEmpty =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -672,6 +672,8 @@ case class Expand(
672672
override lazy val references: AttributeSet =
673673
AttributeSet(projections.flatten.flatMap(_.references))
674674

675+
override def producedAttributes: AttributeSet = AttributeSet(output diff child.output)
676+
675677
// This operator can reuse attributes (for example making them null when doing a roll up) so
676678
// the constraints of the child may no longer be valid.
677679
override protected lazy val validConstraints: Set[Expression] = Set.empty[Expression]

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3304,6 +3304,15 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession {
33043304
""".stripMargin).collect()
33053305
}
33063306
}
3307+
3308+
test("SPARK-29682: Conflicting attributes in Expand are resolved") {
3309+
val numsDF = Seq(1, 2, 3).toDF("nums")
3310+
val cubeDF = numsDF.cube("nums").agg(max(lit(0)).as("agcol"))
3311+
3312+
checkAnswer(
3313+
cubeDF.join(cubeDF, "nums"),
3314+
Row(1, 0, 0) :: Row(2, 0, 0) :: Row(3, 0, 0) :: Nil)
3315+
}
33073316
}
33083317

33093318
case class Foo(bar: Option[String])

0 commit comments

Comments
 (0)