Skip to content

Commit cb6abec

Browse files
committed
fix: Adding more fuzz tests for count(distinct)
1 parent 31fa46f commit cb6abec

File tree

2 files changed

+22
-9
lines changed

2 files changed

+22
-9
lines changed

spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,40 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
2626
df.createOrReplaceTempView("t1")
2727
for (col <- df.columns) {
2828
val sql = s"SELECT count(distinct $col) FROM t1"
29-
// Comet does not support count distinct yet
30-
// https://github.com/apache/datafusion-comet/issues/2292
3129
val (_, cometPlan) = checkSparkAnswer(sql)
3230
if (usingDataSourceExec) {
3331
assert(1 == collectNativeScans(cometPlan).length)
3432
}
33+
34+
checkSparkAnswerAndOperator(sql)
35+
}
36+
}
37+
38+
test("count distinct group by multpiple column") {
39+
val df = spark.read.parquet(filename)
40+
df.createOrReplaceTempView("t1")
41+
for (col <- df.columns) {
42+
val sql = s"SELECT c1, c2, c3, count(distinct $col) FROM t1 group by c1, c2, c3"
43+
val (_, cometPlan) = checkSparkAnswer(sql)
44+
if (usingDataSourceExec) {
45+
assert(1 == collectNativeScans(cometPlan).length)
46+
}
47+
48+
checkSparkAnswerAndOperator(sql)
3549
}
3650
}
3751

3852
test("count(*) group by single column") {
3953
val df = spark.read.parquet(filename)
4054
df.createOrReplaceTempView("t1")
4155
for (col <- df.columns) {
42-
// cannot run fully natively due to range partitioning and sort
4356
val sql = s"SELECT $col, count(*) FROM t1 GROUP BY $col ORDER BY $col"
4457
val (_, cometPlan) = checkSparkAnswer(sql)
4558
if (usingDataSourceExec) {
4659
assert(1 == collectNativeScans(cometPlan).length)
4760
}
61+
62+
checkSparkAnswerAndOperator(sql)
4863
}
4964
}
5065

@@ -53,12 +68,13 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
5368
df.createOrReplaceTempView("t1")
5469
val groupCol = df.columns.head
5570
for (col <- df.columns.drop(1)) {
56-
// cannot run fully natively due to range partitioning and sort
5771
val sql = s"SELECT $groupCol, count($col) FROM t1 GROUP BY $groupCol ORDER BY $groupCol"
5872
val (_, cometPlan) = checkSparkAnswer(sql)
5973
if (usingDataSourceExec) {
6074
assert(1 == collectNativeScans(cometPlan).length)
6175
}
76+
77+
checkSparkAnswerAndOperator(sql)
6278
}
6379
}
6480

@@ -67,13 +83,14 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
6783
df.createOrReplaceTempView("t1")
6884
val groupCol = df.columns.head
6985
val otherCol = df.columns.drop(1)
70-
// cannot run fully natively due to range partitioning and sort
7186
val sql = s"SELECT $groupCol, count(${otherCol.mkString(", ")}) FROM t1 " +
7287
s"GROUP BY $groupCol ORDER BY $groupCol"
7388
val (_, cometPlan) = checkSparkAnswer(sql)
7489
if (usingDataSourceExec) {
7590
assert(1 == collectNativeScans(cometPlan).length)
7691
}
92+
93+
checkSparkAnswerAndOperator(sql)
7794
}
7895

7996
test("min/max aggregate") {
@@ -88,5 +105,4 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase {
88105
}
89106
}
90107
}
91-
92108
}

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,9 +1031,6 @@ class CometExecSuite extends CometTestBase {
10311031
|GROUP BY key
10321032
""".stripMargin)
10331033

1034-
// The above query uses COUNT(DISTINCT) which Comet doesn't support yet, so the plan will
1035-
// have a mix of `HashAggregate` and `CometHashAggregate`. In the following we check all
1036-
// operators starting from `CometHashAggregate` are native.
10371034
checkSparkAnswer(df)
10381035
val subPlan = stripAQEPlan(df.queryExecution.executedPlan).collectFirst {
10391036
case s: CometHashAggregateExec => s

0 commit comments

Comments
 (0)