Skip to content

Commit e316d28

Browse files
mihailoale-dbcloud-fan
authored andcommitted
[SPARK-54817][SQL] Refactor Unpivot resolution logic to UnpivotTransformer
### What changes were proposed in this pull request? In this PR I propose to refactor `Unpivot` resolution logic to `UnpivotTransformer`. ### Why are the changes needed? In order to reuse it in the single-pass implementation. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #53577 from mihailoale-db/resolveunpivottransformer. Authored-by: mihailoale-db <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 87a3b06 commit e316d28

File tree

2 files changed

+77
-26
lines changed

2 files changed

+77
-26
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -952,32 +952,7 @@ class Analyzer(
952952
// TypeCoercionBase.UnpivotCoercion determines valueType
953953
// and casts values once values are set and resolved
954954
case Unpivot(Some(ids), Some(values), aliases, variableColumnName, valueColumnNames, child) =>
955-
956-
def toString(values: Seq[NamedExpression]): String =
957-
values.map(v => v.name).mkString("_")
958-
959-
// construct unpivot expressions for Expand
960-
val exprs: Seq[Seq[Expression]] =
961-
values.zip(aliases.getOrElse(values.map(_ => None))).map {
962-
case (vals, Some(alias)) => (ids :+ Literal(alias)) ++ vals
963-
case (Seq(value), None) => (ids :+ Literal(value.name)) :+ value
964-
// there are more than one value in vals
965-
case (vals, None) => (ids :+ Literal(toString(vals))) ++ vals
966-
}
967-
968-
// construct output attributes
969-
val variableAttr = AttributeReference(variableColumnName, StringType, nullable = false)()
970-
val valueAttrs = valueColumnNames.zipWithIndex.map {
971-
case (valueColumnName, idx) =>
972-
AttributeReference(
973-
valueColumnName,
974-
values.head(idx).dataType,
975-
values.map(_(idx)).exists(_.nullable))()
976-
}
977-
val output = (ids.map(_.toAttribute) :+ variableAttr) ++ valueAttrs
978-
979-
// expand the unpivot expressions
980-
Expand(exprs, output, child)
955+
UnpivotTransformer(ids, values, aliases, variableColumnName, valueColumnNames, child)
981956
}
982957
}
983958

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.sql.catalyst.expressions.{
21+
AttributeReference,
22+
Expression,
23+
Literal,
24+
NamedExpression
25+
}
26+
import org.apache.spark.sql.catalyst.plans.logical.{Expand, LogicalPlan}
27+
import org.apache.spark.sql.types.StringType
28+
29+
/**
30+
* Object used to transform the given [[Unpivot]] node to an [[Expand]] node.
31+
*/
32+
object UnpivotTransformer {
33+
34+
/**
35+
* Construct an [[Expand]] node from the given [[Unpivot]] node. Do that by:
36+
* 1. Constructing expressions for [[Expand]] out of [[aliases]] and [[values]].
37+
* 2. Constructing output attributes.
38+
* 3. Creating the [[Expand]] node using the expressions, outputs and the [[Unpivot.child]].
39+
*/
40+
def apply(
41+
ids: Seq[NamedExpression],
42+
values: Seq[Seq[NamedExpression]],
43+
aliases: Option[Seq[Option[String]]],
44+
variableColumnName: String,
45+
valueColumnNames: Seq[String],
46+
child: LogicalPlan): Expand = {
47+
48+
val expressions: Seq[Seq[Expression]] =
49+
values.zip(aliases.getOrElse(values.map(_ => None))).map {
50+
case (values, Some(alias)) => (ids :+ Literal(alias)) ++ values
51+
case (Seq(value), None) => (ids :+ Literal(value.name)) :+ value
52+
case (values, None) =>
53+
val stringOfValues = values
54+
.map { value =>
55+
value.name
56+
}
57+
.mkString("_")
58+
(ids :+ Literal(stringOfValues)) ++ values
59+
}
60+
61+
val variableAttribute =
62+
AttributeReference(variableColumnName, StringType, nullable = false)()
63+
val valueAttributes = valueColumnNames.zipWithIndex.map {
64+
case (valueColumnName, index) =>
65+
AttributeReference(
66+
valueColumnName,
67+
values.head(index).dataType,
68+
values.map(_(index)).exists(_.nullable)
69+
)()
70+
}
71+
72+
val output = (ids.map(_.toAttribute) :+ variableAttribute) ++ valueAttributes
73+
74+
Expand(expressions, output, child)
75+
}
76+
}

0 commit comments

Comments
 (0)