Skip to content

Conversation

@GaneshPatil7517
Copy link

This PR introduces a new FilterExec constructor that accepts a projection at creation time,
ensuring compute_properties() executes only once.

What was added:

  • New try_new_with_projection() method for FilterExec
  • Projection is applied during plan creation
  • compute_properties() now runs a single time
  • No regressions in existing query plans
  • Tests validate performance and correctness
  • Documentation updated to reflect usage

Fixes: #19608

This commit fixes issue apache#19612 where accumulators that don't implement
retract_batch exhibit buggy behavior in window frame queries.

## Problem
When aggregate functions are used with window frames like
`ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`, DataFusion uses
PlainAggregateWindowExpr which calls evaluate() multiple times on the
same accumulator instance. Accumulators that use std::mem::take() in
their evaluate() method consume their internal state, causing incorrect
results on subsequent calls.

## Solution
1. **percentile_cont**: Modified evaluate() to use mutable reference
   instead of consuming the Vec. Added retract_batch() support for
   both PercentileContAccumulator and DistinctPercentileContAccumulator.

2. **string_agg**: Changed SimpleStringAggAccumulator::evaluate() to
   clone the accumulated string instead of taking it.

## Changes
- datafusion/functions-aggregate/src/percentile_cont.rs:
  - Changed calculate_percentile() to take &mut [T::Native] instead of Vec<T::Native>
  - Updated PercentileContAccumulator::evaluate() to pass reference
  - Updated DistinctPercentileContAccumulator::evaluate() to clone values
  - Added retract_batch() implementation using HashMap for efficient removal
  - Updated PercentileContGroupsAccumulator::evaluate() for consistency

- datafusion/functions-aggregate/src/string_agg.rs:
  - Changed evaluate() to use clone() instead of std::mem::take()

- datafusion/sqllogictest/test_files/aggregate.slt:
  - Added test cases for percentile_cont with window frames
  - Added test comparing median() vs percentile_cont(0.5) behavior
  - Added test for string_agg cumulative window frame

- docs/source/library-user-guide/functions/adding-udfs.md:
  - Added documentation about window-compatible accumulators
  - Explained evaluate() state preservation requirements
  - Documented retract_batch() implementation guidance

Closes apache#19612
@github-actions github-actions bot added documentation Improvements or additions to documentation sqllogictest SQL Logic Tests (.slt) functions Changes to functions implementation physical-plan Changes to the physical-plan crate labels Jan 3, 2026
Copy link
Contributor

@nuno-faria nuno-faria left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @GaneshPatil7517 for taking a look at this. I left some comments below. I think it might be better to add a builder as proposed by @adriangb instead of the new constructor.

/// )?;
/// ```
#[expect(clippy::needless_pass_by_value)]
pub fn try_new_with_projection(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adriangb suggested to add a builder instead of a new constructor and I tend to agree. @GaneshPatil7517 could you create a FilterExecBuilder, with:

  • a new function which would take the predicate and input, since they are mandatory.
  • several with_... methods for the projection, default_selectivity, batch_size, and fetch.
  • a build method which would return the FilterExec.

We then might even consider to have the original try_new use the builder instead, to reduce the amount of duplicate code.

@@ -0,0 +1,147 @@
<!---
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think a new user guide is needed just for this change.

size_of_val(self) + self.all_values.capacity() * size_of::<T::Native>()
}

fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how this relates to this PR.


statement ok
drop table distinct_avg;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also not sure why new aggregate tests are required.

[`create_udaf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/fn.create_udaf.html
[`advanced_udaf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/udf/advanced_udaf.rs

### Window Frame Compatible Accumulators
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also doesn't fit in this PR.

Adds FilterExecBuilder pattern with fluent API

Allows setting projection, selectivity, batch_size, fetch in one build

Refactors try_new to use builder internally (reduces duplication)

Ensures compute_properties executes only once

Fixes apache#19608
@GaneshPatil7517 GaneshPatil7517 force-pushed the add-filterexec-with-projection branch from 74944f3 to bf8ea99 Compare January 3, 2026 12:08
@nuno-faria
Copy link
Contributor

@GaneshPatil7517 thanks for the changes. There are still some changes that I think are from a different PR, at functions-aggregate , aggregate.slt, and adding-udfs.md. Could you remove those as well?

@GaneshPatil7517
Copy link
Author

ok @nuno-faria ill remove it...

@github-actions github-actions bot removed documentation Improvements or additions to documentation sqllogictest SQL Logic Tests (.slt) functions Changes to functions implementation labels Jan 3, 2026
@GaneshPatil7517
Copy link
Author

@nuno-faria Please Review this...

@nuno-faria
Copy link
Contributor

@alamb could you please enable the workflows in this PR? Thanks.

@adriangb
Copy link
Contributor

adriangb commented Jan 4, 2026

I’ve set them to run

@GaneshPatil7517
Copy link
Author

there are 2 failing and 30 successful checks, let me solve this....

@GaneshPatil7517
Copy link
Author

Hi @nuno-faria & @adriangb ,
All checks have passed successfully on this PR. I’m really excited to see it get merged 🚀
Kindly request you to review and approve when you get a moment.

Thank you for your time and support! 🙌

Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we deprecate with_projection and other methods on FilterExec and re-implement them in terms of the building until they are removed?

@GaneshPatil7517
Copy link
Author

ok ill work on that...

As suggested in PR review, deprecate with_projection(), with_default_selectivity(),
and with_batch_size() methods on FilterExec. These methods now use FilterExecBuilder
internally for backward compatibility while guiding users toward the builder pattern.

- Marked methods as deprecated since 51.0.0
- Re-implemented using FilterExecBuilder to maintain functionality
- All 114 filter tests passing
- Provides gentle migration path for users
@GaneshPatil7517 GaneshPatil7517 force-pushed the add-filterexec-with-projection branch from 5f35662 to 26e5c46 Compare January 5, 2026 05:22
@GaneshPatil7517
Copy link
Author

hey @adriangb please can you review it i updated it....

Copy link
Contributor

@adriangb adriangb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, almost there! Can you add an entry to upgrading.md?

- Updated deprecation version from 51.0.0 to 52.0.0 for FilterExec methods
- Added comprehensive entry to upgrading.md explaining the migration path
- All 114 filter tests passing
@GaneshPatil7517 GaneshPatil7517 force-pushed the add-filterexec-with-projection branch from 87dad10 to 0726087 Compare January 5, 2026 06:13
@github-actions github-actions bot added the documentation Improvements or additions to documentation label Jan 5, 2026
@GaneshPatil7517
Copy link
Author

GaneshPatil7517 commented Jan 5, 2026

Hey @adriangb sir please can you cheack now...

- Replace .clone() with Arc::clone() to follow Rust best practices
- Replace deprecated method calls in internal code with direct builder usage
- Update with_new_children, try_swapping_with_projection, and EmbeddedProjection impl
- All 114 filter tests passing
- No clippy warnings or errors
@GaneshPatil7517 GaneshPatil7517 force-pushed the add-filterexec-with-projection branch from 9596ceb to aa4a892 Compare January 5, 2026 06:46
Comment on lines 201 to 212
#[deprecated(since = "52.0.0", note = "Use FilterExecBuilder::with_default_selectivity instead")]
pub fn with_default_selectivity(
mut self,
self,
default_selectivity: u8,
) -> Result<Self, DataFusionError> {
if default_selectivity > 100 {
return plan_err!(
"Default filter selectivity value needs to be less than or equal to 100"
);
}
self.default_selectivity = default_selectivity;
Ok(self)
FilterExecBuilder::new(Arc::clone(&self.predicate), Arc::clone(&self.input))
.with_projection(self.projection.clone())
.with_default_selectivity(default_selectivity)
.with_batch_size(self.batch_size)
.with_fetch(self.fetch)
.build()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if the with_default_selectivity method should be deprecated, since before it simply updated the field. Now we would need to create a new FilterExec from scratch.

Copy link
Author

@GaneshPatil7517 GaneshPatil7517 Jan 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok ill work on it...

The reviewer pointed out that with_default_selectivity() simply updates a field
and doesn't need the overhead of creating a new FilterExec via the builder.
Restored the original efficient implementation.

Only with_projection() and with_batch_size() remain deprecated, as they
benefit from the builder pattern's single property computation.

- Restored original with_default_selectivity implementation
- Updated upgrading.md to reflect only 2 deprecated methods
- All 114 filter tests passing
- No clippy warnings
@GaneshPatil7517 GaneshPatil7517 force-pushed the add-filterexec-with-projection branch from f961dc9 to a601075 Compare January 5, 2026 09:14
@nuno-faria
Copy link
Contributor

@adriangb could you run the workflows again?
@GaneshPatil7517 since with_projection and with_batch_size are being deprecated, we also need to update those uses in DataFusion itself. There are uses at physical_planner.rs, physical_plan/mod.rs, filter_pushdown/mod.rs, filter.rs (at test_builder_vs_with_projection), and at roundtrip_physical_plan.rs.

@GaneshPatil7517
Copy link
Author

@adriangb could you run the workflows again? @GaneshPatil7517 since with_projection and with_batch_size are being deprecated, we also need to update those uses in DataFusion itself. There are uses at physical_planner.rs, physical_plan/mod.rs, filter_pushdown/mod.rs, filter.rs (at test_builder_vs_with_projection), and at roundtrip_physical_plan.rs.

ok ill updating it...

Per reviewer feedback, updated all internal uses of deprecated with_projection()
and with_batch_size() methods to use FilterExecBuilder instead:

- datafusion/core/src/physical_planner.rs: Use FilterExecBuilder for filter creation
- datafusion/proto/src/physical_plan/mod.rs: Use FilterExecBuilder in proto deserialization
- datafusion/physical-plan/src/filter.rs: Updated test to use builder pattern

Also restored with_default_selectivity() to non-deprecated status since it
simply updates a field value without the overhead of rebuilding FilterExec.

All tests passing, no clippy warnings.
@GaneshPatil7517 GaneshPatil7517 force-pushed the add-filterexec-with-projection branch from 3701930 to dac2557 Compare January 6, 2026 01:18
@github-actions github-actions bot added core Core DataFusion crate proto Related to proto crate labels Jan 6, 2026
@GaneshPatil7517
Copy link
Author

hey @adriangb could you run the workflows again.....?

@GaneshPatil7517
Copy link
Author

Hi @nuno-faria & @adriangb ,
All checks have passed successfully on this PR. I’m really excited to see it get merged 🚀
Kindly request you to review and approve when you get a moment.
Thank you for your time and support! 🙌

@alamb
Copy link
Contributor

alamb commented Jan 7, 2026

run benchmark sql_planner

@alamb-ghbot
Copy link

🤖 ./gh_compare_branch_bench.sh compare_branch_bench.sh Running
Linux aal-dev 6.14.0-1018-gcp #19~24.04.1-Ubuntu SMP Wed Sep 24 23:23:09 UTC 2025 x86_64 x86_64 x86_64 GNU/Linux
Comparing add-filterexec-with-projection (6a341a9) to 1037f0a diff
BENCH_NAME=sql_planner
BENCH_COMMAND=cargo bench --features=parquet --bench sql_planner
BENCH_FILTER=
BENCH_BRANCH_NAME=add-filterexec-with-projection
Results will be posted here when complete

@alamb alamb changed the title Add one-step FilterExec creation with projection (#19608) Add FilterExecBuilder (#19608) Jan 7, 2026
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @GaneshPatil7517 and @nuno-faria and @adriangb -- this looks really nice to me

/// # Deprecated
/// Use [`FilterExecBuilder::with_projection`] instead
#[deprecated(
since = "52.0.0",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this technically will now go out in 53 I think


You can see the current [status of the `52.0.0`release here](https://github.com/apache/datafusion/issues/18566)

### `FilterExec` builder methods deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to move these to the 53 release section

@alamb-ghbot
Copy link

Benchmark script failed with exit code 101.

Last 10 lines of output:

Click to expand
  3 (3.00%) high severe

Benchmarking physical_plan_clickbench_q50
Benchmarking physical_plan_clickbench_q50: Warming up for 3.0000 s

thread 'main' (580581) panicked at datafusion/core/benches/sql_planner.rs:62:14:
called `Result::unwrap()` on an `Err` value: Context("type_coercion", Internal("Expect TypeSignatureClass::Native(LogicalType(Native(String), String)) but received NativeType::Binary, DataType: BinaryView"))
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

error: bench failed, to rerun pass `-p datafusion --bench sql_planner`

Copy link
Contributor

@nuno-faria nuno-faria left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @GaneshPatil7517. I still have comments, this time about the with_projection method.

Comment on lines +689 to +705
// Check if the projection is valid
can_project(&self.schema(), projection.as_ref())?;

let projection = match projection {
Some(projection) => match &self.projection {
Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
None => Some(projection),
},
None => None,
};

FilterExecBuilder::new(Arc::clone(&self.predicate), Arc::clone(&self.input))
.with_projection(projection)
.with_default_selectivity(self.default_selectivity)
.with_batch_size(self.batch_size)
.with_fetch(self.fetch)
.build()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having some doubts about deprecating with_projection of FilterExec (I highlight here the with_projection of impl EmbeddedProjection, which has the same code). Mainly about this part here:

        let projection = match projection {
            Some(projection) => match &self.projection {
                Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
                None => Some(projection),
            },
            None => None,
        };

It appears that the projection passed to the method tries to project the existing projection. I'm not sure why this is the case, but I think if we deprecate with_projection we lose the ability to do this. But I also haven't seen a with_projection called on an existing FilterExec, it is always accompanied with a try_new.

@adriangb @alamb what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess FilterExecBuilder could do the same thing? I can see why that would be the correct behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in this case FilterExecBuilder always builds a new filter, while the previous with_projection could operate over an existing one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't get it. The old with_projection also built a new FilterExec:

let cache = Self::compute_properties(
    &self.input,
    &self.predicate,
    self.default_selectivity,
    projection.as_ref(),
)?;
Ok(Self {
    predicate: Arc::clone(&self.predicate),
    input: Arc::clone(&self.input),
    metrics: self.metrics.clone(),
    default_selectivity: self.default_selectivity,
    cache,
    projection,
    batch_size: self.batch_size,
    fetch: self.fetch,
})

The only difference I see is that the old implementation composed projections, the new one replaces them. So maybe all we need to do is implement the projection composition in FilterExecBuilder::with_projection to match the existing behavior?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed analysis @adriangb and @nuno-faria! You're right - the old implementation composed projections while the new one replaces them. I'll update FilterExecBuilder::with_projection to compose the projections instead of replacing, matching the original behavior. This way we maintain backward compatibility while still having the cleaner builder API.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate documentation Improvements or additions to documentation physical-plan Changes to the physical-plan crate proto Related to proto crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants