Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
6954497
fix(accumulators): preserve state in evaluate() for window frame queries
GaneshPatil7517 Jan 3, 2026
2caf141
feat(physical-plan): add FilterExecBuilder for efficient construction
GaneshPatil7517 Jan 3, 2026
bf8ea99
fix: add missing closing brace for tests module
GaneshPatil7517 Jan 3, 2026
4823862
fix: update tests to use FilterExecBuilder pattern correctly
GaneshPatil7517 Jan 3, 2026
b393b38
fix(physical-plan): compute filter statistics using input schema
GaneshPatil7517 Jan 3, 2026
3dfc827
chore: remove unrelated changes from PR
GaneshPatil7517 Jan 3, 2026
fbda73e
fix: resolve cargo fmt and clippy issues in filter.rs
GaneshPatil7517 Jan 5, 2026
5273fe8
fix: use Arc::clone instead of .clone() on ref-counted pointers
GaneshPatil7517 Jan 5, 2026
26e5c46
Deprecate FilterExec with_* methods in favor of FilterExecBuilder
GaneshPatil7517 Jan 5, 2026
0726087
Fix deprecation version to 52.0.0 and add upgrading guide entry
GaneshPatil7517 Jan 5, 2026
aa4a892
Fix clippy warnings: use Arc::clone and builder pattern
GaneshPatil7517 Jan 5, 2026
a601075
Un-deprecate with_default_selectivity per reviewer feedback
GaneshPatil7517 Jan 5, 2026
dac2557
Update deprecated FilterExec method usages to FilterExecBuilder
GaneshPatil7517 Jan 6, 2026
30b1188
Merge branch 'main' into add-filterexec-with-projection
GaneshPatil7517 Jan 6, 2026
f9a2710
fix: address CI failures - format code and fix deprecated method usag…
GaneshPatil7517 Jan 6, 2026
6a341a9
Merge branch 'main' into add-filterexec-with-projection
GaneshPatil7517 Jan 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::physical_expr::{create_physical_expr, create_physical_exprs};
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use crate::physical_plan::analyze::AnalyzeExec;
use crate::physical_plan::explain::ExplainExec;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::filter::FilterExecBuilder;
use crate::physical_plan::joins::utils as join_utils;
use crate::physical_plan::joins::{
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec,
Expand Down Expand Up @@ -938,8 +938,12 @@ impl DefaultPhysicalPlanner {
input_schema.as_arrow(),
)? {
PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => {
FilterExec::try_new(Arc::clone(&runtime_expr[0]), physical_input)?
.with_batch_size(session_state.config().batch_size())?
FilterExecBuilder::new(
Arc::clone(&runtime_expr[0]),
physical_input,
)
.with_batch_size(session_state.config().batch_size())
.build()?
}
PlanAsyncExpr::Async(
async_map,
Expand All @@ -949,16 +953,17 @@ impl DefaultPhysicalPlanner {
async_map.async_exprs,
physical_input,
)?;
FilterExec::try_new(
FilterExecBuilder::new(
Arc::clone(&runtime_expr[0]),
Arc::new(async_exec),
)?
)
// project the output columns excluding the async functions
// The async functions are always appended to the end of the schema.
.with_projection(Some(
(0..input.schema().fields().len()).collect(),
))?
.with_batch_size(session_state.config().batch_size())?
))
.with_batch_size(session_state.config().batch_size())
.build()?
}
_ => {
return internal_err!(
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,9 @@ fn make_uint_batches(start: u8, end: u8) -> RecordBatch {
Field::new("u64", DataType::UInt64, true),
]));
let v8: Vec<u8> = (start..end).collect();
let v16: Vec<u16> = (start as _..end as _).collect();
let v32: Vec<u32> = (start as _..end as _).collect();
let v64: Vec<u64> = (start as _..end as _).collect();
let v16: Vec<u16> = (start as u16..end as u16).collect();
let v32: Vec<u32> = (start as u32..end as u32).collect();
let v64: Vec<u64> = (start as u64..end as u64).collect();
RecordBatch::try_new(
schema,
vec![
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use datafusion_physical_plan::{
coalesce_batches::CoalesceBatchesExec,
coalesce_partitions::CoalescePartitionsExec,
collect,
filter::FilterExec,
filter::{FilterExec, FilterExecBuilder},
repartition::RepartitionExec,
sorts::sort::SortExec,
};
Expand Down Expand Up @@ -478,9 +478,9 @@ fn test_filter_with_projection() {
let projection = vec![1, 0];
let predicate = col_lit_predicate("a", "foo", &schema());
let plan = Arc::new(
FilterExec::try_new(predicate, Arc::clone(&scan))
.unwrap()
FilterExecBuilder::new(predicate, Arc::clone(&scan))
.with_projection(Some(projection))
.build()
.unwrap(),
);

Expand All @@ -503,9 +503,9 @@ fn test_filter_with_projection() {
let projection = vec![1];
let predicate = col_lit_predicate("a", "foo", &schema());
let plan = Arc::new(
FilterExec::try_new(predicate, scan)
.unwrap()
FilterExecBuilder::new(predicate, scan)
.with_projection(Some(projection))
.build()
.unwrap(),
);
insta::assert_snapshot!(
Expand Down
Loading