Skip to content

Conversation

@coderfender
Copy link
Contributor

@coderfender coderfender commented Oct 17, 2025

Which issue does this PR close?

Closes #531 .
(Partially closes 531) . The ANSI changes to support AVG will be tracked in a new PR

Rationale for this change

DataFusion's default SUM doesn't match Spark's overflow semantics. This implementation ensures we get the exact same behavior as Spark for integer overflow in all 3 eval modes (ANSI , Legacy and Try mode)

What changes are included in this PR?

This PR adds native Rust implementation for SUM aggregation on integer types (byte, short, int, long)

Native changes (Rust):
(Inspired from SumDecimal)

  1. New SumInteger aggregate function that handles SUM for all integer types (in coherence with Spark)
  2. Support all eval modes (ANSI , Legacy and Try)
    ( Implemented code in similar fashion of spark leveraging Option<i64> to represent NULL and numeric values for sum , and an additional parameter called has_all_nulls which is leveraged in Try mode to distinguish if NULL sum is caused by all NULL inputs or the fact that the sum overflowed. (Spark does this with shouldTrackIsEmpty and assigning NULL to running sum which is a long datatype) )
  3. Implemented groups accumulator (which should optimize group by op)

Scala side changes :

  1. Update CometSum to add ANSI support (ANSI and Try)
  2. Change proto schema to pass along eval_mode instead of fail_on_error to support Legacy, ANSI and Try eval modes
  3. Added tests for NULL (had to force cast nulls as java.lang.Long to avoid Scala's auto-boxing feature which auto casts objects to primitive types there by casting nulls to 0s) handling in both simple and GROUP BY agg .
  4. Added tests for both ANSI and non-ANSI mode tests for the input types Int, Short, Byte
  5. Added try_sum overflow tests with GROUP BY (mixed overflow/non-overflow groups)

How are these changes tested?

@coderfender coderfender marked this pull request as draft October 17, 2025 19:13
@coderfender
Copy link
Contributor Author

Draft PR to support sum function - WIP

@coderfender coderfender marked this pull request as ready for review October 28, 2025 07:53
@coderfender coderfender changed the title [WIP]: Support ANSI mode sum expr feat: Support ANSI mode sum expr Oct 29, 2025
@codecov-commenter
Copy link

codecov-commenter commented Oct 30, 2025

Codecov Report

❌ Patch coverage is 75.00000% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 45.96%. Comparing base (f09f8af) to head (6dcf47b).
⚠️ Report is 670 commits behind head on main.

Files with missing lines Patch % Lines
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 50.00% 1 Missing ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##               main    #2600       +/-   ##
=============================================
- Coverage     56.12%   45.96%   -10.17%     
- Complexity      976     1201      +225     
=============================================
  Files           119      147       +28     
  Lines         11743    13811     +2068     
  Branches       2251     2370      +119     
=============================================
- Hits           6591     6348      -243     
- Misses         4012     6422     +2410     
+ Partials       1140     1041       -99     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

for i in 0..int_array.len() {
if !int_array.is_null(i) {
let v = int_array.value(i).to_i64().ok_or_else(|| {
DataFusionError::Internal("Failed to convert value to i64".to_string())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful to print the problematic value in the error message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review . I believe we wouldnt be needing these checks at all given how we check the data types multiple times (from planning phase , internal functions)

running_sum,
)?,
_ => {
panic!("Unsupported data type {}", values.data_type())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At line 138 it returns an Err when the conversion fails. Would it make sense to return an Err here too instead of panicking ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you .I made sure it is all error driven and not panic

@coderfender coderfender force-pushed the support_overflow_sum_function branch from 970d693 to c0715aa Compare November 7, 2025 01:16
@coderfender
Copy link
Contributor Author

@andygrove , @comphead I believe this should be ready for review (pending CI)

}

fn size(&self) -> usize {
std::mem::size_of_val(self)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this method ?
This returns the size of the pointer/reference &self

"integer",
)))
} else {
return Ok(None);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return Ok(None);
Ok(None)

fn new(eval_mode: EvalMode) -> Self {
if eval_mode == EvalMode::Try {
Self {
// Try mode starts with 0 (because if this is init to None we cant say if it is none due to all nulls or due to an overflow
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Try mode starts with 0 (because if this is init to None we cant say if it is none due to all nulls or due to an overflow
// Try mode starts with 0 (because if this is init to None we cant say if it is none due to all nulls or due to an overflow)

(null.asInstanceOf[java.lang.Long], "b"),
(null.asInstanceOf[java.lang.Long], "b")),
"tbl") {
val res = sql("SELECT _2, sum(_1) FROM tbl group by 1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val res = sql("SELECT _2, sum(_1) FROM tbl group by 1")
val res = sql("SELECT _2, try_sum(_1) FROM tbl group by 1")

The name of the test says try_sum

case s: Sum =>
if (AggSerde.sumDataTypeSupported(s.dataType) && !s.dataType
.isInstanceOf[DecimalType]) {
.isInstanceOf[DecimalType] && !integerTypes.contains(s.dataType)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the integer types are excluded now ?

@comphead
Copy link
Contributor

comphead commented Nov 7, 2025

- Windows support *** FAILED *** (3 seconds, 973 milliseconds)
  Expected only Comet native operators, but found Project.
  plan: Project
  +- Window
     +- CometExchange
        +- CometScan [native_iceberg_compat] parquet

Thats actually interesting why would window fall back, we planning to fallback on windows in #2726 but the test would still preserve windows, so I think we need to check a fallback reason

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add ANSI support in SUM and AVG

4 participants