-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Add one-step FilterExec creation with projection (#19608) #19619
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add one-step FilterExec creation with projection (#19608) #19619
Conversation
This commit fixes issue apache#19612 where accumulators that don't implement retract_batch exhibit buggy behavior in window frame queries. ## Problem When aggregate functions are used with window frames like `ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`, DataFusion uses PlainAggregateWindowExpr which calls evaluate() multiple times on the same accumulator instance. Accumulators that use std::mem::take() in their evaluate() method consume their internal state, causing incorrect results on subsequent calls. ## Solution 1. **percentile_cont**: Modified evaluate() to use mutable reference instead of consuming the Vec. Added retract_batch() support for both PercentileContAccumulator and DistinctPercentileContAccumulator. 2. **string_agg**: Changed SimpleStringAggAccumulator::evaluate() to clone the accumulated string instead of taking it. ## Changes - datafusion/functions-aggregate/src/percentile_cont.rs: - Changed calculate_percentile() to take &mut [T::Native] instead of Vec<T::Native> - Updated PercentileContAccumulator::evaluate() to pass reference - Updated DistinctPercentileContAccumulator::evaluate() to clone values - Added retract_batch() implementation using HashMap for efficient removal - Updated PercentileContGroupsAccumulator::evaluate() for consistency - datafusion/functions-aggregate/src/string_agg.rs: - Changed evaluate() to use clone() instead of std::mem::take() - datafusion/sqllogictest/test_files/aggregate.slt: - Added test cases for percentile_cont with window frames - Added test comparing median() vs percentile_cont(0.5) behavior - Added test for string_agg cumulative window frame - docs/source/library-user-guide/functions/adding-udfs.md: - Added documentation about window-compatible accumulators - Explained evaluate() state preservation requirements - Documented retract_batch() implementation guidance Closes apache#19612
nuno-faria
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @GaneshPatil7517 for taking a look at this. I left some comments below. I think it might be better to add a builder as proposed by @adriangb instead of the new constructor.
| /// )?; | ||
| /// ``` | ||
| #[expect(clippy::needless_pass_by_value)] | ||
| pub fn try_new_with_projection( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adriangb suggested to add a builder instead of a new constructor and I tend to agree. @GaneshPatil7517 could you create a FilterExecBuilder, with:
- a
newfunction which would take the predicate and input, since they are mandatory. - several
with_...methods for the projection, default_selectivity, batch_size, and fetch. - a
buildmethod which would return theFilterExec.
We then might even consider to have the original try_new use the builder instead, to reduce the amount of duplicate code.
| @@ -0,0 +1,147 @@ | |||
| <!--- | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think a new user guide is needed just for this change.
| size_of_val(self) + self.all_values.capacity() * size_of::<T::Native>() | ||
| } | ||
|
|
||
| fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how this relates to this PR.
|
|
||
| statement ok | ||
| drop table distinct_avg; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also not sure why new aggregate tests are required.
| [`create_udaf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/fn.create_udaf.html | ||
| [`advanced_udaf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/udf/advanced_udaf.rs | ||
|
|
||
| ### Window Frame Compatible Accumulators |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also doesn't fit in this PR.
Adds FilterExecBuilder pattern with fluent API Allows setting projection, selectivity, batch_size, fetch in one build Refactors try_new to use builder internally (reduces duplication) Ensures compute_properties executes only once Fixes apache#19608
74944f3 to
bf8ea99
Compare
|
@GaneshPatil7517 thanks for the changes. There are still some changes that I think are from a different PR, at |
|
ok @nuno-faria ill remove it... |
|
@nuno-faria Please Review this... |
|
@alamb could you please enable the workflows in this PR? Thanks. |
|
I’ve set them to run |
|
there are 2 failing and 30 successful checks, let me solve this.... |
|
Hi @nuno-faria & @adriangb , Thank you for your time and support! 🙌 |
adriangb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we deprecate with_projection and other methods on FilterExec and re-implement them in terms of the building until they are removed?
|
ok ill work on that... |
As suggested in PR review, deprecate with_projection(), with_default_selectivity(), and with_batch_size() methods on FilterExec. These methods now use FilterExecBuilder internally for backward compatibility while guiding users toward the builder pattern. - Marked methods as deprecated since 51.0.0 - Re-implemented using FilterExecBuilder to maintain functionality - All 114 filter tests passing - Provides gentle migration path for users
5f35662 to
26e5c46
Compare
|
hey @adriangb please can you review it i updated it.... |
| /// | ||
| /// # Deprecated | ||
| /// Use [`FilterExecBuilder::with_default_selectivity`] instead | ||
| #[deprecated(since = "51.0.0", note = "Use FilterExecBuilder::with_default_selectivity instead")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be 52.0.0, or probably 53.0.0 at this point
adriangb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, almost there! Can you add an entry to upgrading.md?
- Updated deprecation version from 51.0.0 to 52.0.0 for FilterExec methods - Added comprehensive entry to upgrading.md explaining the migration path - All 114 filter tests passing
87dad10 to
0726087
Compare
|
Hey @adriangb sir please can you cheack now... |
- Replace .clone() with Arc::clone() to follow Rust best practices - Replace deprecated method calls in internal code with direct builder usage - Update with_new_children, try_swapping_with_projection, and EmbeddedProjection impl - All 114 filter tests passing - No clippy warnings or errors
9596ceb to
aa4a892
Compare
| #[deprecated(since = "52.0.0", note = "Use FilterExecBuilder::with_default_selectivity instead")] | ||
| pub fn with_default_selectivity( | ||
| mut self, | ||
| self, | ||
| default_selectivity: u8, | ||
| ) -> Result<Self, DataFusionError> { | ||
| if default_selectivity > 100 { | ||
| return plan_err!( | ||
| "Default filter selectivity value needs to be less than or equal to 100" | ||
| ); | ||
| } | ||
| self.default_selectivity = default_selectivity; | ||
| Ok(self) | ||
| FilterExecBuilder::new(Arc::clone(&self.predicate), Arc::clone(&self.input)) | ||
| .with_projection(self.projection.clone()) | ||
| .with_default_selectivity(default_selectivity) | ||
| .with_batch_size(self.batch_size) | ||
| .with_fetch(self.fetch) | ||
| .build() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if the with_default_selectivity method should be deprecated, since before it simply updated the field. Now we would need to create a new FilterExec from scratch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok ill work on it...
The reviewer pointed out that with_default_selectivity() simply updates a field and doesn't need the overhead of creating a new FilterExec via the builder. Restored the original efficient implementation. Only with_projection() and with_batch_size() remain deprecated, as they benefit from the builder pattern's single property computation. - Restored original with_default_selectivity implementation - Updated upgrading.md to reflect only 2 deprecated methods - All 114 filter tests passing - No clippy warnings
f961dc9 to
a601075
Compare
|
@adriangb could you run the workflows again? |
ok ill updating it... |
Per reviewer feedback, updated all internal uses of deprecated with_projection() and with_batch_size() methods to use FilterExecBuilder instead: - datafusion/core/src/physical_planner.rs: Use FilterExecBuilder for filter creation - datafusion/proto/src/physical_plan/mod.rs: Use FilterExecBuilder in proto deserialization - datafusion/physical-plan/src/filter.rs: Updated test to use builder pattern Also restored with_default_selectivity() to non-deprecated status since it simply updates a field value without the overhead of rebuilding FilterExec. All tests passing, no clippy warnings.
3701930 to
dac2557
Compare
|
hey @adriangb could you run the workflows again.....? |
|
Hi @nuno-faria & @adriangb , |
This PR introduces a new FilterExec constructor that accepts a projection at creation time,
ensuring compute_properties() executes only once.
What was added:
try_new_with_projection()method for FilterExeccompute_properties()now runs a single timeFixes: #19608