Skip to content

Commit 09ad28d

Browse files
authored
[spark] Handle NPE for pushdown aggregate when a datasplit has a null max/min value (#6611)
1 parent 2eba6b9 commit 09ad28d

File tree

2 files changed

+19
-0
lines changed

2 files changed

+19
-0
lines changed

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggFuncEvaluator.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ case class MinEvaluator(idx: Int, dataField: DataField, evolutions: SimpleStatsE
5959

6060
override def update(dataSplit: DataSplit): Unit = {
6161
val other = dataSplit.minValue(idx, dataField, evolutions)
62+
if (other == null) {
63+
return
64+
}
6265
if (_result == null || CompareUtils.compareLiteral(dataField.`type`(), _result, other) > 0) {
6366
_result = other;
6467
}
@@ -80,6 +83,9 @@ case class MaxEvaluator(idx: Int, dataField: DataField, evolutions: SimpleStatsE
8083

8184
override def update(dataSplit: DataSplit): Unit = {
8285
val other = dataSplit.maxValue(idx, dataField, evolutions)
86+
if (other == null) {
87+
return
88+
}
8389
if (_result == null || CompareUtils.compareLiteral(dataField.`type`(), _result, other) < 0) {
8490
_result = other
8591
}

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PushDownAggregatesTest.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,4 +294,17 @@ class PushDownAggregatesTest extends PaimonSparkTestBase with AdaptiveSparkPlanH
294294
Seq(Row(1, "t1"))
295295
)
296296
}
297+
298+
// https://github.com/apache/paimon/issues/6610
299+
test("Push down aggregate: aggregate a column in one partition is all null and another is not") {
300+
withTable("T") {
301+
spark.sql("CREATE TABLE T (c1 INT, c2 LONG) PARTITIONED BY(day STRING)")
302+
303+
spark.sql("INSERT INTO T VALUES (1, 2, '2025-11-10')")
304+
spark.sql("INSERT INTO T VALUES (null, 2, '2025-11-11')")
305+
306+
runAndCheckAggregate("SELECT MIN(c1) FROM T", Row(1) :: Nil, 0)
307+
runAndCheckAggregate("SELECT MAX(c1) FROM T", Row(1) :: Nil, 0)
308+
}
309+
}
297310
}

0 commit comments

Comments
 (0)