Skip to content

Conversation

@GaneshPatil7517
Copy link
Contributor

@GaneshPatil7517 GaneshPatil7517 commented Jan 3, 2026

Part of #19612

Rationale for this change
When aggregate functions are used with window frames like ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, DataFusion uses PlainAggregateWindowExpr which calls evaluate() multiple times on the same accumulator instance. Accumulators that use std::mem::take() in their evaluate() method consume their internal state, causing incorrect results on subsequent calls.

What changes are included in this PR?
percentile_cont: Modified evaluate() to use mutable reference instead of consuming the Vec. Added retract_batch() support.
string_agg: Changed SimpleStringAggAccumulator::evaluate() to clone instead of take.
Added comprehensive test cases in aggregate.slt
Added documentation about window-compatible accumulators

Are these changes tested?
Yes, added sqllogictest cases that verify:

median() and percentile_cont(0.5) produce identical results in window frames
percentile_cont with different percentiles works correctly
string_agg accumulates correctly across window frame evaluations

Are there any user-facing changes?
No breaking changes. This is a bug fix that ensures aggregate functions work correctly in window contexts.

@github-actions github-actions bot added documentation Improvements or additions to documentation sqllogictest SQL Logic Tests (.slt) functions Changes to functions implementation labels Jan 3, 2026
@GaneshPatil7517
Copy link
Contributor Author

GaneshPatil7517 commented Jan 3, 2026

hey @nuno-faria can you review this... all checks are passed successfully excited to see it merged

@nuno-faria
Copy link
Contributor

hey @nuno-faria can you review this... all checks are passed successfully excited to see it merged

I think it's better to wait for @Jefffrey's review, since I'm not too familiar with this part of the code.

@Jefffrey
Copy link
Contributor

Jefffrey commented Jan 5, 2026

I'll try take a look at this soon.

This commit fixes issue apache#19612 where accumulators that don't implement
retract_batch exhibit buggy behavior in window frame queries.

When aggregate functions are used with window frames like
`ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`, DataFusion uses
PlainAggregateWindowExpr which calls evaluate() multiple times on the
same accumulator instance. Accumulators that use std::mem::take() in
their evaluate() method consume their internal state, causing incorrect
results on subsequent calls.

1. **percentile_cont**: Modified evaluate() to use mutable reference
   instead of consuming the Vec. Added retract_batch() support for
   both PercentileContAccumulator and DistinctPercentileContAccumulator.

2. **string_agg**: Changed SimpleStringAggAccumulator::evaluate() to
   clone the accumulated string instead of taking it.

- datafusion/functions-aggregate/src/percentile_cont.rs:
  - Changed calculate_percentile() to take &mut [T::Native] instead of Vec<T::Native>
  - Updated PercentileContAccumulator::evaluate() to pass reference
  - Updated DistinctPercentileContAccumulator::evaluate() to clone values
  - Added retract_batch() implementation using HashMap for efficient removal
  - Updated PercentileContGroupsAccumulator::evaluate() for consistency

- datafusion/functions-aggregate/src/string_agg.rs:
  - Changed evaluate() to use clone() instead of std::mem::take()

- datafusion/sqllogictest/test_files/aggregate.slt:
  - Added test cases for percentile_cont with window frames
  - Added test comparing median() vs percentile_cont(0.5) behavior
  - Added test for string_agg cumulative window frame

- docs/source/library-user-guide/functions/adding-udfs.md:
  - Added documentation about window-compatible accumulators
  - Explained evaluate() state preservation requirements
  - Documented retract_batch() implementation guidance

Closes apache#19612
@GaneshPatil7517 GaneshPatil7517 force-pushed the fix/accumulators-retract-batch-19612 branch from 2c2f792 to c5fe87b Compare January 6, 2026 04:28
Copy link
Contributor

@Jefffrey Jefffrey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the issue I was actually thinking of only fixing evaluate; since this PR goes above and beyond and implements retract batch for percentile_cont & string_agg, I think we need to add tests with different window frames for them. See this PR for an example:

Also I've edited the PR to not close the issue since it doesn't address distinct median accumulator.

Edit: I misread the code, seems it only implements retract_batch for percentile_cont

Comment on lines 447 to 469
let mut to_remove: HashMap<ScalarValue, usize> = HashMap::new();
for i in 0..values.len() {
let v = ScalarValue::try_from_array(&values, i)?;
if !v.is_null() {
*to_remove.entry(v).or_default() += 1;
}
}

let mut i = 0;
while i < self.all_values.len() {
let k =
ScalarValue::new_primitive::<T>(Some(self.all_values[i]), &T::DATA_TYPE)?;
if let Some(count) = to_remove.get_mut(&k)
&& *count > 0
{
self.all_values.swap_remove(i);
*count -= 1;
if *count == 0 {
to_remove.remove(&k);
if to_remove.is_empty() {
break;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my own reference (and anyone else reviewing), this is the same code as from median:

ScalarValue::LargeUtf8(Some(std::mem::take(&mut self.accumulated_string)))
if self.has_value {
Ok(ScalarValue::LargeUtf8(Some(
self.accumulated_string.clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is unavoidable 🙁

I might need to think on this a bit to see if there are ways around requiring this clone 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this approach is fine for now

@github-actions github-actions bot added logical-expr Logical plan and expressions and removed documentation Improvements or additions to documentation labels Jan 6, 2026
@Jefffrey
Copy link
Contributor

Jefffrey commented Jan 6, 2026

I realised we aren't adding retract_batch for stringagg, my apologies; for that I think we can just stick with the unbounded preceding to current row test, we don't need the other window frame tests for it

@GaneshPatil7517
Copy link
Contributor Author

hey @Jefffrey can you run the work flow....

@GaneshPatil7517
Copy link
Contributor Author

hey @Jefffrey all checks are passed successfully excited to see it merged...

Comment on lines 70 to 91
/// # Window Frame Queries
///
/// When used in a window context without [`Self::supports_retract_batch`],
/// `evaluate()` may be called multiple times on the same accumulator instance
/// (once per row in the partition). In this case, implementations **must not**
/// consume or modify internal state. Use references or clones to preserve state:
///
/// ```ignore
/// // GOOD: Preserves state for subsequent calls
/// fn evaluate(&mut self) -> Result<ScalarValue> {
/// calculate_result(&self.values) // Use reference
/// }
///
/// // BAD: Consumes state, breaks window queries
/// fn evaluate(&mut self) -> Result<ScalarValue> {
/// calculate_result(std::mem::take(&mut self.values))
/// }
/// ```
///
/// For efficient sliding window calculations, consider implementing
/// [`Self::retract_batch`] which allows DataFusion to incrementally
/// update state rather than calling `evaluate()` repeatedly.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's some things to correct here:

  • They are still allowed to modify the internal state, technically; percentile_cont and median use select_nth_unstable_by which modifies the state but doesn't consume it
  • Implementing retract_batch has nothing to do with efficiency; it just allows support for more types of window frames

I think something like this would be more accurate:

/// ## Correctness
///
/// This function must not consume the internal state, as it is also used in window
/// aggregate functions where it can be executed multiple times depending on the
/// current window frame. Consuming the internal state can cause the next invocation
/// to have incorrect results.
///
/// - Even if this accumulator doesn't implement [`retract_batch`] it may still be used
///   in window aggregate functions where the window frame is
///   `ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`
///
/// It is fine to modify the state (e.g. re-order elements within internal state vec) so long
/// as this doesn't cause an incorrect computation on the next call of evaluate.
///
/// [`retract_batch`]: Self::retract_batch

ScalarValue::LargeUtf8(Some(std::mem::take(&mut self.accumulated_string)))
if self.has_value {
Ok(ScalarValue::LargeUtf8(Some(
self.accumulated_string.clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this approach is fine for now

- Clarify that consuming internal state is the issue, not modifying it
- Note that modifying state (e.g. reordering) is fine if it doesn't affect correctness
- Clarify that retract_batch is for supporting more window frame types, not efficiency
@Jefffrey Jefffrey added this pull request to the merge queue Jan 10, 2026
Merged via the queue into apache:main with commit 45fb0b4 Jan 10, 2026
32 checks passed
@Jefffrey
Copy link
Contributor

Thanks @GaneshPatil7517

@GaneshPatil7517
Copy link
Contributor Author

Always welcome @Jefffrey thank you for merging.... always open to contribute...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

functions Changes to functions implementation logical-expr Logical plan and expressions sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants