Skip to content

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Sep 16, 2025

Which issue does this PR close?

Rationale for this change

Use DataFusion's count_udaf instead of translating count(expr) to sum(if(expr is not null, 1, 0)).

When we tried to use count_udaf (about a year ago - see #744) we ran into performance issues, which is why we added the sum workaround, but this is no longer needed.

What changes are included in this PR?

How are these changes tested?

I ran CometAggregateSuite:

================================================================================================
Grouped Aggregate (single group key + single aggregate COUNT)
================================================================================================

OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic
AMD Ryzen 9 7950X3D 16-Core Processor
Grouped HashAgg Exec: single group key (cardinality 100), single aggregate COUNT:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
----------------------------------------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (COUNT)                                                                 239            248          15         43.9          22.8       1.0X
SQL Parquet - Comet (COUNT)                                                                 191            195           3         54.9          18.2       1.2X

OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic
AMD Ryzen 9 7950X3D 16-Core Processor
Grouped HashAgg Exec: single group key (cardinality 1024), single aggregate COUNT:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-----------------------------------------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (COUNT)                                                                  239            248          15         43.9          22.8       1.0X
SQL Parquet - Comet (COUNT)                                                                  198            202           4         52.8          18.9       1.2X

OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic
AMD Ryzen 9 7950X3D 16-Core Processor
Grouped HashAgg Exec: single group key (cardinality 1048576), single aggregate COUNT:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (COUNT)                                                                    2471           2472           1          4.2         235.7       1.0X
SQL Parquet - Comet (COUNT)                                                                    1043           1044           2         10.1          99.4       2.4X


================================================================================================
Grouped Aggregate (multiple group keys + single aggregate COUNT)
================================================================================================

OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic
AMD Ryzen 9 7950X3D 16-Core Processor
Grouped HashAgg Exec: multiple group keys (cardinality 100), single aggregate COUNT:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (COUNT)                                                                    516            532          25         20.3          49.2       1.0X
SQL Parquet - Comet (COUNT)                                                                    263            269           5         39.9          25.0       2.0X

OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic
AMD Ryzen 9 7950X3D 16-Core Processor
Grouped HashAgg Exec: multiple group keys (cardinality 1024), single aggregate COUNT:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (COUNT)                                                                    2811           2865          76          3.7         268.0       1.0X
SQL Parquet - Comet (COUNT)                                                                    1228           1236          11          8.5         117.1       2.3X

OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic
AMD Ryzen 9 7950X3D 16-Core Processor
Grouped HashAgg Exec: multiple group keys (cardinality 1048576), single aggregate COUNT:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (COUNT)                                                                       5837           5845          11          1.8         556.7       1.0X
SQL Parquet - Comet (COUNT)                                                                       4615           4727         158          2.3         440.2       1.3X


================================================================================================
Grouped Aggregate (single group key + multiple aggregates COUNT)
================================================================================================

OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic
AMD Ryzen 9 7950X3D 16-Core Processor
Grouped HashAgg Exec: single group key (cardinality 100), multiple aggregates COUNT:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (COUNT)                                                                    307            320          27         34.2          29.3       1.0X
SQL Parquet - Comet (COUNT)                                                                    225            233           6         46.5          21.5       1.4X

OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic
AMD Ryzen 9 7950X3D 16-Core Processor
Grouped HashAgg Exec: single group key (cardinality 1024), multiple aggregates COUNT:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (COUNT)                                                                     321            335          18         32.7          30.6       1.0X
SQL Parquet - Comet (COUNT)                                                                     240            253          10         43.7          22.9       1.3X

OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic
AMD Ryzen 9 7950X3D 16-Core Processor
Grouped HashAgg Exec: single group key (cardinality 1048576), multiple aggregates COUNT:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (COUNT)                                                                       2898           2926          40          3.6         276.4       1.0X
SQL Parquet - Comet (COUNT)                                                                       1440           1445           8          7.3         137.3       2.0X


================================================================================================
Grouped Aggregate (single group key + single aggregate COUNT on decimal)
================================================================================================

OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic
AMD Ryzen 9 7950X3D 16-Core Processor
Grouped HashAgg Exec: single group key (cardinality 100), single aggregate COUNT on decimal:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (COUNT)                                                                            295            309          25         35.5          28.2       1.0X
SQL Parquet - Comet (COUNT)                                                                            176            184           6         59.4          16.8       1.7X

OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic
AMD Ryzen 9 7950X3D 16-Core Processor
Grouped HashAgg Exec: single group key (cardinality 1024), single aggregate COUNT on decimal:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (COUNT)                                                                             308            311           4         34.0          29.4       1.0X
SQL Parquet - Comet (COUNT)                                                                             191            196           4         55.0          18.2       1.6X

OpenJDK 64-Bit Server VM 17.0.16+8-Ubuntu-0ubuntu122.04.1 on Linux 6.8.0-79-generic
AMD Ryzen 9 7950X3D 16-Core Processor
Grouped HashAgg Exec: single group key (cardinality 1048576), single aggregate COUNT on decimal:  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (COUNT)                                                                               2473           2492          27          4.2         235.9       1.0X
SQL Parquet - Comet (COUNT)                                                                                989           1001          17         10.6          94.3       2.5X

@codecov-commenter
Copy link

codecov-commenter commented Sep 16, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 57.52%. Comparing base (f09f8af) to head (a25f0a8).
⚠️ Report is 511 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #2407      +/-   ##
============================================
+ Coverage     56.12%   57.52%   +1.39%     
- Complexity      976     1296     +320     
============================================
  Files           119      147      +28     
  Lines         11743    13469    +1726     
  Branches       2251     2352     +101     
============================================
+ Hits           6591     7748    +1157     
- Misses         4012     4457     +445     
- Partials       1140     1264     +124     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@andygrove
Copy link
Member Author

I ran TPC-H locally and saw no regression in performance

Comment on lines -67 to -71
benchmark.addCase(s"SQL Parquet - Comet (Scan) ($aggregateFunction)") { _ =>
withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
spark.sql(query).noop()
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It no longer makes sense to benchmark Comet with Scan vs Scan & Exec configurations, especially for aggregates. Comet scans won't accelerate aggregates.

Copy link
Contributor

@mbutrovich mbutrovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know how I love a good negative line count. Thanks for following up on my comment in #2397. Glad to see we can rely on DF for this.

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andygrove it is nice removing unnecessary evaluations

@andygrove andygrove merged commit 0e7c26b into apache:main Sep 16, 2025
136 of 137 checks passed
@andygrove andygrove deleted the use-df-count branch September 16, 2025 23:38
@andygrove
Copy link
Member Author

Merged. Thanks for the reviews @mbutrovich and @comphead

coderfender pushed a commit to coderfender/datafusion-comet that referenced this pull request Dec 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants