Skip to content

Commit 6d6b233

Browse files
maropudongjoon-hyun
authored andcommitted
[SPARK-29343][SQL][FOLLOW-UP] Remove floating-point Sum/Average/CentralMomentAgg from order-insensitive aggregates
### What changes were proposed in this pull request? This pr is to remove floating-point `Sum/Average/CentralMomentAgg` from order-insensitive aggregates in `EliminateSorts`. This pr comes from the gatorsmile suggestion: apache#26011 (comment) ### Why are the changes needed? Bug fix. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added tests in `SubquerySuite`. Closes apache#26534 from maropu/SPARK-29343-FOLLOWUP. Authored-by: Takeshi Yamamuro <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 16e7195 commit 6d6b233

File tree

2 files changed

+22
-6
lines changed

2 files changed

+22
-6
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,12 +1002,11 @@ object EliminateSorts extends Rule[LogicalPlan] {
10021002

10031003
private def isOrderIrrelevantAggs(aggs: Seq[NamedExpression]): Boolean = {
10041004
def isOrderIrrelevantAggFunction(func: AggregateFunction): Boolean = func match {
1005-
case _: Sum => true
1006-
case _: Min => true
1007-
case _: Max => true
1008-
case _: Count => true
1009-
case _: Average => true
1010-
case _: CentralMomentAgg => true
1005+
case _: Min | _: Max | _: Count => true
1006+
// Arithmetic operations for floating-point values are order-sensitive
1007+
// (they are not associative).
1008+
case _: Sum | _: Average | _: CentralMomentAgg =>
1009+
!Seq(FloatType, DoubleType).exists(_.sameType(func.children.head.dataType))
10111010
case _ => false
10121011
}
10131012

sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1271,6 +1271,23 @@ class SubquerySuite extends QueryTest with SharedSparkSession {
12711271
}
12721272
}
12731273

1274+
test("Cannot remove sort for floating-point order-sensitive aggregates from subquery") {
1275+
Seq("float", "double").foreach { typeName =>
1276+
Seq("SUM", "AVG", "KURTOSIS", "SKEWNESS", "STDDEV_POP", "STDDEV_SAMP",
1277+
"VAR_POP", "VAR_SAMP").foreach { aggName =>
1278+
val query =
1279+
s"""
1280+
|SELECT k, $aggName(v) FROM (
1281+
| SELECT k, v
1282+
| FROM VALUES (1, $typeName(2.0)), (2, $typeName(1.0)) t(k, v)
1283+
| ORDER BY v)
1284+
|GROUP BY k
1285+
""".stripMargin
1286+
assert(getNumSortsInQuery(query) == 1)
1287+
}
1288+
}
1289+
}
1290+
12741291
test("SPARK-25482: Forbid pushdown to datasources of filters containing subqueries") {
12751292
withTempView("t1", "t2") {
12761293
sql("create temporary view t1(a int) using parquet")

0 commit comments

Comments
 (0)