Skip to content

Commit ab2eafb

Browse files
viiryacloud-fan
authored andcommitted
[SPARK-26085][SQL] Key attribute of non-struct type under typed aggregation should be named as "key" too
## What changes were proposed in this pull request? When doing typed aggregation on a Dataset, for struct key type, the key attribute is named as "key". But for non-struct type, the key attribute is named as "value". This key attribute should also be named as "key" for non-struct type. ## How was this patch tested? Added test. Closes apache#23054 from viirya/SPARK-26085. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 38628dd commit ab2eafb

File tree

4 files changed

+30
-1
lines changed

4 files changed

+30
-1
lines changed

docs/sql-migration-guide-upgrade.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ displayTitle: Spark SQL Upgrading Guide
2020
- The `ADD JAR` command previously returned a result set with the single value 0. It now returns an empty result set.
2121

2222
- In Spark version 2.4 and earlier, users can create map values with map type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since Spark 3.0, it's not allowed to create map values with map type key with these built-in functions. Users can still read map values with map type key from data source or Java/Scala collections, though they are not very useful.
23+
24+
- In Spark version 2.4 and earlier, `Dataset.groupByKey` results to a grouped dataset with key attribute wrongly named as "value", if the key is non-struct type, e.g. int, string, array, etc. This is counterintuitive and makes the schema of aggregation queries weird. For example, the schema of `ds.groupByKey(...).count()` is `(value, count)`. Since Spark 3.0, we name the grouping attribute to "key". The old behaviour is preserved under a newly added configuration `spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue` with a default value of `false`.
2325

2426
## Upgrading From Spark SQL 2.3 to 2.4
2527

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1595,6 +1595,15 @@ object SQLConf {
15951595
.booleanConf
15961596
.createWithDefault(false)
15971597

1598+
val NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE =
1599+
buildConf("spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue")
1600+
.internal()
1601+
.doc("When set to true, the key attribute resulted from running `Dataset.groupByKey` " +
1602+
"for non-struct key type, will be named as `value`, following the behavior of Spark " +
1603+
"version 2.4 and earlier.")
1604+
.booleanConf
1605+
.createWithDefault(false)
1606+
15981607
val MAX_TO_STRING_FIELDS = buildConf("spark.sql.debug.maxToStringFields")
15991608
.doc("Maximum number of fields of sequence-like entries can be converted to strings " +
16001609
"in debug output. Any elements beyond the limit will be dropped and replaced by a" +
@@ -2016,6 +2025,9 @@ class SQLConf extends Serializable with Logging {
20162025

20172026
def integralDivideReturnLong: Boolean = getConf(SQLConf.LEGACY_INTEGRALDIVIDE_RETURN_LONG)
20182027

2028+
def nameNonStructGroupingKeyAsValue: Boolean =
2029+
getConf(SQLConf.NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE)
2030+
20192031
def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS)
20202032

20212033
/** ********************** SQLConf functionality methods ************ */

sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CreateStruct
2626
import org.apache.spark.sql.catalyst.plans.logical._
2727
import org.apache.spark.sql.execution.QueryExecution
2828
import org.apache.spark.sql.expressions.ReduceAggregator
29+
import org.apache.spark.sql.internal.SQLConf
2930
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode}
3031

3132
/**
@@ -459,7 +460,11 @@ class KeyValueGroupedDataset[K, V] private[sql](
459460
columns.map(_.withInputType(vExprEnc, dataAttributes).named)
460461
val keyColumn = if (!kExprEnc.isSerializedAsStruct) {
461462
assert(groupingAttributes.length == 1)
462-
groupingAttributes.head
463+
if (SQLConf.get.nameNonStructGroupingKeyAsValue) {
464+
groupingAttributes.head
465+
} else {
466+
Alias(groupingAttributes.head, "key")()
467+
}
463468
} else {
464469
Alias(CreateStruct(groupingAttributes), "key")()
465470
}

sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1572,6 +1572,16 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
15721572
checkDatasetUnorderly(agg, ((1, 2), 1L, 3L), ((2, 3), 2L, 4L), ((3, 4), 3L, 5L))
15731573
}
15741574

1575+
test("SPARK-26085: fix key attribute name for atomic type for typed aggregation") {
1576+
val ds = Seq(1, 2, 3).toDS()
1577+
assert(ds.groupByKey(x => x).count().schema.head.name == "key")
1578+
1579+
// Enable legacy flag to follow previous Spark behavior
1580+
withSQLConf(SQLConf.NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE.key -> "true") {
1581+
assert(ds.groupByKey(x => x).count().schema.head.name == "value")
1582+
}
1583+
}
1584+
15751585
test("SPARK-8288: class with only a companion object constructor") {
15761586
val data = Seq(ScroogeLikeExample(1), ScroogeLikeExample(2))
15771587
val ds = data.toDS

0 commit comments

Comments
 (0)