Skip to content

Conversation

@comphead
Copy link
Contributor

Which issue does this PR close?

Just experiment, related to #2292
Closes #.

Rationale for this change

What changes are included in this PR?

How are these changes tested?

@comphead comphead changed the title feat: do not fallback to Spark for distincts feat: do not fallback to Spark for distinct aggregates Sep 20, 2025
@comphead
Copy link
Contributor Author

comphead commented Sep 20, 2025

Might be fixed by #2407
@andygrove WDYT do you have anything in your mind to double test the count(distinct) ?

Running CometFuzzAggregateSuite

@mbutrovich
Copy link
Contributor

mbutrovich commented Sep 20, 2025

I think it was fixed by several issues. I think #2407 is one, but I think the native shuffle rewrite and bumping the Arrow Java version contributed as well. It's encouraging to see!

@comphead
Copy link
Contributor Author

I'm adding more tests to make sure it is working now

binding: Boolean,
conf: SQLConf): Option[AggExpr] = {

if (aggExpr.isDistinct) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to pass the aggExpr.isDistinct value into the protobuf plan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good point, I was thinking the same but IMO Spark doesn't call the count distinct on partial phase.

                +-----------------------------+
                |         Driver              |
                |   COUNT(DISTINCT name)      |
                +-------------+---------------+
                              |
                              v
     +-------------------+          +-------------------+
     |   Executor 1      |          |   Executor 2      |
     | Partitions P0,P1  |          | Partitions P2,P3  |
     | Local distinct:   |          | Local distinct:   |
     | {Alice,Bob,Eve}   |          | {Mallory,Eve,Bob,Trent}|
     +---------+---------+          +---------+---------+
               |                              |
               |          Shuffle             |
               v                              v
        +--------------+               +--------------+
        | Reducer R0   |               | Reducer R1   |
        | {Alice,Bob,Eve}              | {Mallory,Trent} |
        +--------------+               +--------------+
               \                              /
                \                            /
                 \                          /
                  +-----------+-------------+
                              v
                    Driver Final Merge
                      DISTINCT = 5

The local distinct is made by HashAggregate so when count distinct get called as aggExpr it might not be needing the flag as data already deduped on reducers. Checking the Final stage though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining that. We should add tests for other distinct aggregates as well, such as sum and avg. I'm not sure if there are others?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark has tests for the following aggregates with DISTINCT:

  • count
  • sum
  • avg
  • first
  • last
  • corr
  • var_pop
  • var_samp

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this PR, we could just remove the fallback for COUNT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, will do once

@codecov-commenter
Copy link

codecov-commenter commented Sep 22, 2025

Codecov Report

❌ Patch coverage is 75.00000% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 58.48%. Comparing base (f09f8af) to head (9d3a40f).
⚠️ Report is 542 commits behind head on main.

Files with missing lines Patch % Lines
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 75.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #2429      +/-   ##
============================================
+ Coverage     56.12%   58.48%   +2.35%     
- Complexity      976     1440     +464     
============================================
  Files           119      146      +27     
  Lines         11743    13519    +1776     
  Branches       2251     2352     +101     
============================================
+ Hits           6591     7906    +1315     
- Misses         4012     4379     +367     
- Partials       1140     1234      +94     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@comphead
Copy link
Contributor Author

depends on #2258

val df = spark.read.parquet(filename)
df.createOrReplaceTempView("t1")
for (col <- df.columns) {
val sql = s"SELECT count(distinct $col) FROM t1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you also add tests for count distinct with multiple columns e.g. COUNT(DISTINCT col1, col2, col3)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andygrove I'll add them separately as well

@comphead
Copy link
Contributor Author

Fuzz tests fall to Spark

- count distinct (native_comet, native shuffle) *** FAILED *** (418 milliseconds)
  Expected only Comet native operators, but found HashAggregate.
  plan: HashAggregate(keys=[], functions=[count(distinct c0#92074)], output=[count(DISTINCT c0)#92169L])
  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=716408]
     +- HashAggregate(keys=[], functions=[partial_count(distinct c0#92074)], output=[count#92173L])
        +- CometHashAggregate [c0#92074], [c0#92074]
           +- CometExchange hashpartitioning(c0#92074, 10), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=716405]
              +- CometHashAggregate [c0#92074], [c0#92074]
                 +- CometScan [native_comet] parquet [c0#92074] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)

checking

@comphead comphead changed the title feat: do not fallback to Spark for distinct aggregates feat: do not fallback to Spark for COUNT(distinct) Sep 24, 2025
@comphead comphead marked this pull request as ready for review September 24, 2025 17:54
@comphead comphead requested a review from andygrove September 24, 2025 18:45
@mbutrovich
Copy link
Contributor

Can we add a case to CometAggregateBenchmark to tout our gains? :)

@comphead
Copy link
Contributor Author

Can we add a case to CometAggregateBenchmark to tout our gains? :)

Added

@comphead comphead requested a review from mbutrovich September 24, 2025 23:13
Copy link
Contributor

@mbutrovich mbutrovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I guess Comet already knew how to serde everything and it was just a matter of removing the fallback and beefing up tests. Thanks @comphead!

I also ran the benchmark locally: ~1.4x on most cases! 🚀

CometAggregateBenchmark.txt

@andygrove
Copy link
Member

I found that we fall back to Spark when there are multiple count distinct e.g.

  test("mulitple count distinct with group column") {
    val df = spark.read.parquet(filename)
    df.createOrReplaceTempView("t1")
    val sql = s"SELECT c1, count(distinct c2), count(distinct c3) FROM t1 group by c1"
    val (_, cometPlan) = checkSparkAnswer(sql)
    if (usingDataSourceExec) {
      assert(1 == collectNativeScans(cometPlan).length)
    }
  }

This isn't an issue for this PR, but thought I should make a note of this.

Comet cannot accelerate HashAggregateExec because: Aggregate expression with filter is not supported

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @comphead!

@comphead
Copy link
Contributor Author

I found that we fall back to Spark when there are multiple count distinct e.g.

  test("mulitple count distinct with group column") {
    val df = spark.read.parquet(filename)
    df.createOrReplaceTempView("t1")
    val sql = s"SELECT c1, count(distinct c2), count(distinct c3) FROM t1 group by c1"
    val (_, cometPlan) = checkSparkAnswer(sql)
    if (usingDataSourceExec) {
      assert(1 == collectNativeScans(cometPlan).length)
    }
  }

This isn't an issue for this PR, but thought I should make a note of this.

Comet cannot accelerate HashAggregateExec because: Aggregate expression with filter is not supported

Filed #2456

@comphead comphead merged commit 5845227 into apache:main Sep 25, 2025
102 checks passed
mbutrovich added a commit to mbutrovich/datafusion-comet that referenced this pull request Sep 29, 2025
mbutrovich added a commit that referenced this pull request Sep 29, 2025
coderfender pushed a commit to coderfender/datafusion-comet that referenced this pull request Dec 13, 2025
* feat: do not fallback to Spark for distincts
coderfender pushed a commit to coderfender/datafusion-comet that referenced this pull request Dec 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants