Skip to content

Commit c58a4fe

Browse files
pengbomingbo_pb
authored andcommitted
[SPARK-27351][SQL] Wrong outputRows estimation after AggregateEstimation wit…
## What changes were proposed in this pull request? The upper bound of group-by columns row number is to multiply distinct counts of group-by columns. However, column with only null value will cause the output row number to be 0 which is incorrect. Ex: col1 (distinct: 2, rowCount 2) col2 (distinct: 0, rowCount 2) => group by col1, col2 Actual: output rows: 0 Expected: output rows: 2 ## How was this patch tested? According unit test has been added, plus manual test has been done in our tpcds benchmark environement. Closes apache#24286 from pengbo/master. Lead-authored-by: pengbo <[email protected]> Co-authored-by: mingbo_pb <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent d35e81f commit c58a4fe

File tree

2 files changed

+21
-3
lines changed

2 files changed

+21
-3
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,16 @@ object AggregateEstimation {
3939
// Multiply distinct counts of group-by columns. This is an upper bound, which assumes
4040
// the data contains all combinations of distinct values of group-by columns.
4141
var outputRows: BigInt = agg.groupingExpressions.foldLeft(BigInt(1))(
42-
(res, expr) => res *
43-
childStats.attributeStats(expr.asInstanceOf[Attribute]).distinctCount.get)
42+
(res, expr) => {
43+
val columnStat = childStats.attributeStats(expr.asInstanceOf[Attribute])
44+
val distinctCount = columnStat.distinctCount.get
45+
val distinctValue: BigInt = if (distinctCount == 0 && columnStat.nullCount.get > 0) {
46+
1
47+
} else {
48+
distinctCount
49+
}
50+
res * distinctValue
51+
})
4452

4553
outputRows = if (agg.groupingExpressions.isEmpty) {
4654
// If there's no group-by columns, the output is a single row containing values of aggregate

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest {
3838
attr("key22") -> ColumnStat(distinctCount = Some(2), min = Some(10), max = Some(20),
3939
nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
4040
attr("key31") -> ColumnStat(distinctCount = Some(0), min = None, max = None,
41-
nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
41+
nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
42+
attr("key32") -> ColumnStat(distinctCount = Some(0), min = None, max = None,
43+
nullCount = Some(4), avgLen = Some(4), maxLen = Some(4))
4244
))
4345

4446
private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1)
@@ -116,6 +118,14 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest {
116118
expectedOutputRowCount = 0)
117119
}
118120

121+
test("group-by column with only null value") {
122+
checkAggStats(
123+
tableColumns = Seq("key22", "key32"),
124+
tableRowCount = 6,
125+
groupByColumns = Seq("key22", "key32"),
126+
expectedOutputRowCount = nameToColInfo("key22")._2.distinctCount.get)
127+
}
128+
119129
test("non-cbo estimation") {
120130
val attributes = Seq("key12").map(nameToAttr)
121131
val child = StatsTestPlan(

0 commit comments

Comments
 (0)