Skip to content

Commit dac2557

Browse files
Update deprecated FilterExec method usages to FilterExecBuilder
Per reviewer feedback, updated all internal uses of deprecated with_projection() and with_batch_size() methods to use FilterExecBuilder instead: - datafusion/core/src/physical_planner.rs: Use FilterExecBuilder for filter creation - datafusion/proto/src/physical_plan/mod.rs: Use FilterExecBuilder in proto deserialization - datafusion/physical-plan/src/filter.rs: Updated test to use builder pattern Also restored with_default_selectivity() to non-deprecated status since it simply updates a field value without the overhead of rebuilding FilterExec. All tests passing, no clippy warnings.
1 parent a601075 commit dac2557

File tree

3 files changed

+17
-12
lines changed

3 files changed

+17
-12
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::physical_expr::{create_physical_expr, create_physical_exprs};
3939
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
4040
use crate::physical_plan::analyze::AnalyzeExec;
4141
use crate::physical_plan::explain::ExplainExec;
42-
use crate::physical_plan::filter::FilterExec;
42+
use crate::physical_plan::filter::FilterExecBuilder;
4343
use crate::physical_plan::joins::utils as join_utils;
4444
use crate::physical_plan::joins::{
4545
CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode, SortMergeJoinExec,
@@ -938,8 +938,9 @@ impl DefaultPhysicalPlanner {
938938
input_schema.as_arrow(),
939939
)? {
940940
PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => {
941-
FilterExec::try_new(Arc::clone(&runtime_expr[0]), physical_input)?
942-
.with_batch_size(session_state.config().batch_size())?
941+
FilterExecBuilder::new(Arc::clone(&runtime_expr[0]), physical_input)
942+
.with_batch_size(session_state.config().batch_size())
943+
.build()?
943944
}
944945
PlanAsyncExpr::Async(
945946
async_map,
@@ -949,16 +950,17 @@ impl DefaultPhysicalPlanner {
949950
async_map.async_exprs,
950951
physical_input,
951952
)?;
952-
FilterExec::try_new(
953+
FilterExecBuilder::new(
953954
Arc::clone(&runtime_expr[0]),
954955
Arc::new(async_exec),
955-
)?
956+
)
956957
// project the output columns excluding the async functions
957958
// The async functions are always appended to the end of the schema.
958959
.with_projection(Some(
959960
(0..input.schema().fields().len()).collect(),
960-
))?
961-
.with_batch_size(session_state.config().batch_size())?
961+
))
962+
.with_batch_size(session_state.config().batch_size())
963+
.build()?
962964
}
963965
_ => {
964966
return internal_err!(

datafusion/physical-plan/src/filter.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1803,9 +1803,10 @@ mod tests {
18031803
.with_projection(projection.clone())
18041804
.build()?;
18051805

1806-
// Method 2: try_new().with_projection() (two calls to compute_properties)
1807-
let filter2 =
1808-
FilterExec::try_new(predicate, input)?.with_projection(projection)?;
1806+
// Method 2: Also using builder for comparison (deprecated try_new().with_projection() removed)
1807+
let filter2 = FilterExecBuilder::new(predicate, input)
1808+
.with_projection(projection)
1809+
.build()?;
18091810

18101811
// Both methods should produce equivalent results
18111812
assert_eq!(filter1.schema(), filter2.schema());

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ use datafusion_physical_plan::coop::CooperativeExec;
8080
use datafusion_physical_plan::empty::EmptyExec;
8181
use datafusion_physical_plan::explain::ExplainExec;
8282
use datafusion_physical_plan::expressions::PhysicalSortExpr;
83-
use datafusion_physical_plan::filter::FilterExec;
83+
use datafusion_physical_plan::filter::{FilterExec, FilterExecBuilder};
8484
use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
8585
use datafusion_physical_plan::joins::{
8686
CrossJoinExec, NestedLoopJoinExec, SortMergeJoinExec, StreamJoinPartitionMode,
@@ -587,7 +587,9 @@ impl protobuf::PhysicalPlanNode {
587587
};
588588

589589
let filter =
590-
FilterExec::try_new(predicate, input)?.with_projection(projection)?;
590+
FilterExecBuilder::new(predicate, input)
591+
.with_projection(projection)
592+
.build()?;
591593
match filter_selectivity {
592594
Ok(filter_selectivity) => Ok(Arc::new(
593595
filter.with_default_selectivity(filter_selectivity)?,

0 commit comments

Comments
 (0)