From 21b8a3fafc7733dba0931e78c79e0ff88bf93012 Mon Sep 17 00:00:00 2001 From: Albert Skalt Date: Tue, 13 Jan 2026 17:10:23 +0300 Subject: [PATCH 1/2] add fast-path for `with_new_children` This patch aims to implement a fast-path for the ExecutionPlan::with_new_children function for some plans, moving closer to a physical plan re-use implementation and improving planning performance. If the passed children properties are the same as in self, we do not actually recompute self's properties (which could be costly if projection mapping is required). Instead, we just replace the children and re-use self's properties as-is. To be able to compare two different properties -- ExecutionPlan::properties(...) signature is modified and now returns `&Arc`. If `children` properties are the same in `with_new_children` -- we clone our properties arc and then a parent plan will consider our properties as unchanged, doing the same. Also, there are other improvements. The patch includes the following changes: - Return `&Arc` from `ExecutionPlan::properties(...)` instead of a reference. - Implement `with_new_children` fast-path if there is no children properties changes for all major plans. - Store `Arc<[usize]>` instead of vector within `FilterExec`. - Store `Arc<[usize]>` instead of vector within projection of `HashJoinExec`. - Store `Arc<[Arc]>` instead of vec for aggr expr and filters. - Store `Arc<[ProjectionExpr]> instead of vec in `ProjectionExprs` struct. - Get `Option<&[usize]>` instead of option vec ref in `project_schema` -- it makes API more flexible. Note: currently, `reset_plan_states` does not allow to re-use plan in general: it is not supported for dynamic filters and recursive queries features, as in this case state reset should update pointers in the children plans. Closes https://github.com/apache/datafusion/issues/19796 --- .../custom_data_source/custom_datasource.rs | 9 +- .../memory_pool_execution_plan.rs | 4 +- .../proto/composed_extension_codec.rs | 4 +- .../examples/relation_planner/table_sample.rs | 6 +- datafusion/catalog-listing/src/table.rs | 2 +- datafusion/catalog/src/memory/table.rs | 6 +- datafusion/common/src/stats.rs | 17 ++-- datafusion/common/src/utils/mod.rs | 4 +- datafusion/core/benches/reset_plan_states.rs | 2 + datafusion/core/src/datasource/empty.rs | 3 +- datafusion/core/src/physical_planner.rs | 14 +-- .../core/tests/custom_sources_cases/mod.rs | 11 ++- .../provider_filter_pushdown.rs | 9 +- .../tests/custom_sources_cases/statistics.rs | 6 +- datafusion/core/tests/fuzz_cases/once_exec.rs | 6 +- .../enforce_distribution.rs | 6 +- .../physical_optimizer/filter_pushdown/mod.rs | 4 +- .../filter_pushdown/util.rs | 2 +- .../physical_optimizer/join_selection.rs | 17 ++-- .../tests/physical_optimizer/test_utils.rs | 8 +- .../tests/user_defined/insert_operation.rs | 20 ++-- .../tests/user_defined/user_defined_plan.rs | 10 +- datafusion/datasource/src/memory.rs | 2 +- datafusion/datasource/src/sink.rs | 6 +- datafusion/datasource/src/source.rs | 24 +++-- datafusion/ffi/src/execution_plan.rs | 20 ++-- datafusion/ffi/src/tests/async_provider.rs | 8 +- .../src/equivalence/properties/mod.rs | 7 +- datafusion/physical-expr/src/projection.rs | 37 +++----- .../src/enforce_distribution.rs | 4 +- .../physical-optimizer/src/join_selection.rs | 6 +- .../src/output_requirements.rs | 6 +- .../src/update_aggr_exprs.rs | 2 +- .../physical-plan/src/aggregates/mod.rs | 62 +++++++----- .../src/aggregates/no_grouping.rs | 4 +- .../physical-plan/src/aggregates/row_hash.rs | 6 +- datafusion/physical-plan/src/analyze.rs | 6 +- datafusion/physical-plan/src/async_func.rs | 26 +++-- .../physical-plan/src/coalesce_batches.rs | 27 ++++-- .../physical-plan/src/coalesce_partitions.rs | 30 ++++-- datafusion/physical-plan/src/common.rs | 2 +- datafusion/physical-plan/src/coop.rs | 26 +++-- datafusion/physical-plan/src/display.rs | 2 +- datafusion/physical-plan/src/empty.rs | 8 +- .../physical-plan/src/execution_plan.rs | 57 +++++++++-- datafusion/physical-plan/src/explain.rs | 6 +- datafusion/physical-plan/src/filter.rs | 55 ++++++----- .../physical-plan/src/joins/cross_join.rs | 32 +++++-- .../physical-plan/src/joins/hash_join/exec.rs | 54 ++++++----- .../src/joins/nested_loop_join.rs | 39 ++++++-- .../src/joins/piecewise_merge_join/exec.rs | 94 +++++++++++++------ .../src/joins/sort_merge_join/exec.rs | 27 +++++- .../src/joins/symmetric_hash_join.rs | 26 ++++- datafusion/physical-plan/src/joins/utils.rs | 4 +- datafusion/physical-plan/src/limit.rs | 51 +++++++--- datafusion/physical-plan/src/memory.rs | 19 ++-- .../physical-plan/src/placeholder_row.rs | 8 +- datafusion/physical-plan/src/projection.rs | 24 +++-- .../physical-plan/src/recursive_query.rs | 6 +- .../physical-plan/src/repartition/mod.rs | 34 +++++-- .../physical-plan/src/sorts/partial_sort.rs | 27 ++++-- datafusion/physical-plan/src/sorts/sort.rs | 53 ++++++----- .../src/sorts/sort_preserving_merge.rs | 34 +++++-- datafusion/physical-plan/src/streaming.rs | 8 +- datafusion/physical-plan/src/test.rs | 16 ++-- datafusion/physical-plan/src/test/exec.rs | 38 ++++---- datafusion/physical-plan/src/union.rs | 41 ++++++-- datafusion/physical-plan/src/unnest.rs | 25 +++-- .../src/windows/bounded_window_agg_exec.rs | 21 ++++- .../src/windows/window_agg_exec.rs | 26 +++-- datafusion/physical-plan/src/work_table.rs | 8 +- datafusion/proto/src/physical_plan/mod.rs | 4 +- .../tests/cases/roundtrip_physical_plan.rs | 2 +- .../custom-table-providers.md | 10 +- 74 files changed, 882 insertions(+), 458 deletions(-) diff --git a/datafusion-examples/examples/custom_data_source/custom_datasource.rs b/datafusion-examples/examples/custom_data_source/custom_datasource.rs index b276ae32cf247..9cda46fb9640a 100644 --- a/datafusion-examples/examples/custom_data_source/custom_datasource.rs +++ b/datafusion-examples/examples/custom_data_source/custom_datasource.rs @@ -192,7 +192,7 @@ impl TableProvider for CustomDataSource { struct CustomExec { db: CustomDataSource, projected_schema: SchemaRef, - cache: PlanProperties, + cache: Arc, } impl CustomExec { @@ -202,12 +202,13 @@ impl CustomExec { schema: SchemaRef, db: CustomDataSource, ) -> Self { - let projected_schema = project_schema(&schema, projections).unwrap(); + let projected_schema = + project_schema(&schema, projections.map(AsRef::as_ref)).unwrap(); let cache = Self::compute_properties(projected_schema.clone()); Self { db, projected_schema, - cache, + cache: Arc::new(cache), } } @@ -238,7 +239,7 @@ impl ExecutionPlan for CustomExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs b/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs index 48475acbb1542..6f377ea1ce3b9 100644 --- a/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs +++ b/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs @@ -199,7 +199,7 @@ impl ExternalBatchBufferer { struct BufferingExecutionPlan { schema: SchemaRef, input: Arc, - properties: PlanProperties, + properties: Arc, } impl BufferingExecutionPlan { @@ -233,7 +233,7 @@ impl ExecutionPlan for BufferingExecutionPlan { self.schema.clone() } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.properties } diff --git a/datafusion-examples/examples/proto/composed_extension_codec.rs b/datafusion-examples/examples/proto/composed_extension_codec.rs index f3910d461b6a8..b4f3d4f098996 100644 --- a/datafusion-examples/examples/proto/composed_extension_codec.rs +++ b/datafusion-examples/examples/proto/composed_extension_codec.rs @@ -106,7 +106,7 @@ impl ExecutionPlan for ParentExec { self } - fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + fn properties(&self) -> &Arc { unreachable!() } @@ -182,7 +182,7 @@ impl ExecutionPlan for ChildExec { self } - fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + fn properties(&self) -> &Arc { unreachable!() } diff --git a/datafusion-examples/examples/relation_planner/table_sample.rs b/datafusion-examples/examples/relation_planner/table_sample.rs index 657432ef31362..895f2fdd4ff3a 100644 --- a/datafusion-examples/examples/relation_planner/table_sample.rs +++ b/datafusion-examples/examples/relation_planner/table_sample.rs @@ -618,7 +618,7 @@ pub struct SampleExec { upper_bound: f64, seed: u64, metrics: ExecutionPlanMetricsSet, - cache: PlanProperties, + cache: Arc, } impl SampleExec { @@ -656,7 +656,7 @@ impl SampleExec { upper_bound, seed, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), }) } @@ -686,7 +686,7 @@ impl ExecutionPlan for SampleExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 38456944075fc..e081280825135 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -522,7 +522,7 @@ impl TableProvider for ListingTable { // if no files need to be read, return an `EmptyExec` if partitioned_file_lists.is_empty() { - let projected_schema = project_schema(&self.schema(), projection.as_ref())?; + let projected_schema = project_schema(&self.schema(), projection.as_deref())?; return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema)))); } diff --git a/datafusion/catalog/src/memory/table.rs b/datafusion/catalog/src/memory/table.rs index 7865eb016bee1..484b5f805e547 100644 --- a/datafusion/catalog/src/memory/table.rs +++ b/datafusion/catalog/src/memory/table.rs @@ -549,7 +549,7 @@ fn evaluate_filters_to_mask( struct DmlResultExec { rows_affected: u64, schema: SchemaRef, - properties: PlanProperties, + properties: Arc, } impl DmlResultExec { @@ -570,7 +570,7 @@ impl DmlResultExec { Self { rows_affected, schema, - properties, + properties: Arc::new(properties), } } } @@ -604,7 +604,7 @@ impl ExecutionPlan for DmlResultExec { Arc::clone(&self.schema) } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.properties } diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index ba13ef392d912..751dff86efb66 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -391,7 +391,7 @@ impl Statistics { /// For example, if we had statistics for columns `{"a", "b", "c"}`, /// projecting to `vec![2, 1]` would return statistics for columns `{"c", /// "b"}`. - pub fn project(mut self, projection: Option<&Vec>) -> Self { + pub fn project(mut self, projection: Option<&[usize]>) -> Self { let Some(projection) = projection else { return self; }; @@ -1066,29 +1066,28 @@ mod tests { #[test] fn test_project_none() { - let projection = None; - let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref()); + let stats = make_stats(vec![10, 20, 30]).project(None); assert_eq!(stats, make_stats(vec![10, 20, 30])); } #[test] fn test_project_empty() { - let projection = Some(vec![]); - let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref()); + let projection: Option<&[_]> = Some(&[]); + let stats = make_stats(vec![10, 20, 30]).project(projection); assert_eq!(stats, make_stats(vec![])); } #[test] fn test_project_swap() { - let projection = Some(vec![2, 1]); - let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref()); + let projection: Option<&[_]> = Some(&[2, 1]); + let stats = make_stats(vec![10, 20, 30]).project(projection); assert_eq!(stats, make_stats(vec![30, 20])); } #[test] fn test_project_repeated() { - let projection = Some(vec![1, 2, 1, 1, 0, 2]); - let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref()); + let projection: Option<&[_]> = Some(&[1, 2, 1, 1, 0, 2]); + let stats = make_stats(vec![10, 20, 30]).project(projection); assert_eq!(stats, make_stats(vec![20, 30, 20, 20, 10, 30])); } diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs index 03310a7bde193..545dc10af031a 100644 --- a/datafusion/common/src/utils/mod.rs +++ b/datafusion/common/src/utils/mod.rs @@ -59,7 +59,7 @@ use std::thread::available_parallelism; /// /// // Pick columns 'c' and 'b' /// let projection = Some(vec![2, 1]); -/// let projected_schema = project_schema(&schema, projection.as_ref()).unwrap(); +/// let projected_schema = project_schema(&schema, projection.as_deref()).unwrap(); /// /// let expected_schema = SchemaRef::new(Schema::new(vec![ /// Field::new("c", DataType::Utf8, true), @@ -70,7 +70,7 @@ use std::thread::available_parallelism; /// ``` pub fn project_schema( schema: &SchemaRef, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result { let schema = match projection { Some(columns) => Arc::new(schema.project(columns)?), diff --git a/datafusion/core/benches/reset_plan_states.rs b/datafusion/core/benches/reset_plan_states.rs index f2f81f755b96e..5afae7f43242d 100644 --- a/datafusion/core/benches/reset_plan_states.rs +++ b/datafusion/core/benches/reset_plan_states.rs @@ -166,6 +166,8 @@ fn run_reset_states(b: &mut criterion::Bencher, plan: &Arc) { /// making an independent instance of the execution plan to re-execute it, avoiding /// re-planning stage. fn bench_reset_plan_states(c: &mut Criterion) { + env_logger::init(); + let rt = Runtime::new().unwrap(); let ctx = SessionContext::new(); ctx.register_table( diff --git a/datafusion/core/src/datasource/empty.rs b/datafusion/core/src/datasource/empty.rs index 5aeca92b1626d..882abd921a155 100644 --- a/datafusion/core/src/datasource/empty.rs +++ b/datafusion/core/src/datasource/empty.rs @@ -77,7 +77,8 @@ impl TableProvider for EmptyTable { _limit: Option, ) -> Result> { // even though there is no data, projections apply - let projected_schema = project_schema(&self.schema, projection)?; + let projected_schema = + project_schema(&self.schema, projection.map(AsRef::as_ref))?; Ok(Arc::new( EmptyExec::new(projected_schema).with_partitions(self.partitions), )) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 2715ad98202cb..c145d837f98e8 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -3661,13 +3661,15 @@ mod tests { #[derive(Debug)] struct NoOpExecutionPlan { - cache: PlanProperties, + cache: Arc, } impl NoOpExecutionPlan { fn new(schema: SchemaRef) -> Self { let cache = Self::compute_properties(schema); - Self { cache } + Self { + cache: Arc::new(cache), + } } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -3705,7 +3707,7 @@ mod tests { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -3859,7 +3861,7 @@ digraph { fn children(&self) -> Vec<&Arc> { self.0.iter().collect::>() } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { unimplemented!() } fn execute( @@ -3908,7 +3910,7 @@ digraph { fn children(&self) -> Vec<&Arc> { unimplemented!() } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { unimplemented!() } fn execute( @@ -4029,7 +4031,7 @@ digraph { fn children(&self) -> Vec<&Arc> { vec![] } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { unimplemented!() } fn execute( diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index 8453615c2886b..5978b54edf296 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -79,16 +79,19 @@ struct CustomTableProvider; #[derive(Debug, Clone)] struct CustomExecutionPlan { projection: Option>, - cache: PlanProperties, + cache: Arc, } impl CustomExecutionPlan { fn new(projection: Option>) -> Self { let schema = TEST_CUSTOM_SCHEMA_REF!(); let schema = - project_schema(&schema, projection.as_ref()).expect("projected schema"); + project_schema(&schema, projection.as_deref()).expect("projected schema"); let cache = Self::compute_properties(schema); - Self { projection, cache } + Self { + projection, + cache: Arc::new(cache), + } } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -157,7 +160,7 @@ impl ExecutionPlan for CustomExecutionPlan { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index ca1eaa1f958ea..7a624d0636530 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -62,13 +62,16 @@ fn create_batch(value: i32, num_rows: usize) -> Result { #[derive(Debug)] struct CustomPlan { batches: Vec, - cache: PlanProperties, + cache: Arc, } impl CustomPlan { fn new(schema: SchemaRef, batches: Vec) -> Self { let cache = Self::compute_properties(schema); - Self { batches, cache } + Self { + batches, + cache: Arc::new(cache), + } } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -109,7 +112,7 @@ impl ExecutionPlan for CustomPlan { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 820c2a470b376..6964d72eacffe 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -45,7 +45,7 @@ use async_trait::async_trait; struct StatisticsValidation { stats: Statistics, schema: Arc, - cache: PlanProperties, + cache: Arc, } impl StatisticsValidation { @@ -59,7 +59,7 @@ impl StatisticsValidation { Self { stats, schema, - cache, + cache: Arc::new(cache), } } @@ -158,7 +158,7 @@ impl ExecutionPlan for StatisticsValidation { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/core/tests/fuzz_cases/once_exec.rs b/datafusion/core/tests/fuzz_cases/once_exec.rs index 49e2caaa7417c..69edf9be1d825 100644 --- a/datafusion/core/tests/fuzz_cases/once_exec.rs +++ b/datafusion/core/tests/fuzz_cases/once_exec.rs @@ -32,7 +32,7 @@ use std::sync::{Arc, Mutex}; pub struct OnceExec { /// the results to send back stream: Mutex>, - cache: PlanProperties, + cache: Arc, } impl Debug for OnceExec { @@ -46,7 +46,7 @@ impl OnceExec { let cache = Self::compute_properties(stream.schema()); Self { stream: Mutex::new(Some(stream)), - cache, + cache: Arc::new(cache), } } @@ -83,7 +83,7 @@ impl ExecutionPlan for OnceExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 94ae82a9ad755..b3a3d29d070b1 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -120,7 +120,7 @@ macro_rules! assert_plan { struct SortRequiredExec { input: Arc, expr: LexOrdering, - cache: PlanProperties, + cache: Arc, } impl SortRequiredExec { @@ -132,7 +132,7 @@ impl SortRequiredExec { Self { input, expr: requirement, - cache, + cache: Arc::new(cache), } } @@ -174,7 +174,7 @@ impl ExecutionPlan for SortRequiredExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index d265841246867..cb1784943764f 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -482,7 +482,7 @@ fn test_filter_with_projection() { let plan = Arc::new( FilterExec::try_new(predicate, Arc::clone(&scan)) .unwrap() - .with_projection(Some(projection)) + .with_projection(Some(projection.into())) .unwrap(), ); @@ -507,7 +507,7 @@ fn test_filter_with_projection() { let plan = Arc::new( FilterExec::try_new(predicate, scan) .unwrap() - .with_projection(Some(projection)) + .with_projection(Some(projection.into())) .unwrap(), ); insta::assert_snapshot!( diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index 1afdc4823f0a4..18861792f90d1 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -447,7 +447,7 @@ impl ExecutionPlan for TestNode { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { self.input.properties() } diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs index 9234a95591baa..137ffc4702a16 100644 --- a/datafusion/core/tests/physical_optimizer/join_selection.rs +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -762,7 +762,10 @@ async fn test_hash_join_swap_on_joins_with_projections( "ProjectionExec won't be added above if HashJoinExec contains embedded projection", ); - assert_eq!(swapped_join.projection, Some(vec![0_usize])); + assert_eq!( + swapped_join.projection.as_ref().map(|p| p.to_vec()), + Some(vec![0_usize]) + ); assert_eq!(swapped.schema().fields.len(), 1); assert_eq!(swapped.schema().fields[0].name(), "small_col"); Ok(()) @@ -979,7 +982,7 @@ impl RecordBatchStream for UnboundedStream { pub struct UnboundedExec { batch_produce: Option, batch: RecordBatch, - cache: PlanProperties, + cache: Arc, } impl UnboundedExec { @@ -995,7 +998,7 @@ impl UnboundedExec { Self { batch_produce, batch, - cache, + cache: Arc::new(cache), } } @@ -1052,7 +1055,7 @@ impl ExecutionPlan for UnboundedExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -1091,7 +1094,7 @@ pub enum SourceType { pub struct StatisticsExec { stats: Statistics, schema: Arc, - cache: PlanProperties, + cache: Arc, } impl StatisticsExec { @@ -1105,7 +1108,7 @@ impl StatisticsExec { Self { stats, schema: Arc::new(schema), - cache, + cache: Arc::new(cache), } } @@ -1153,7 +1156,7 @@ impl ExecutionPlan for StatisticsExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index feac8190ffde4..f8c91ba272a9f 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -454,7 +454,7 @@ impl ExecutionPlan for RequirementsTestExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { self.input.properties() } @@ -825,7 +825,7 @@ pub fn sort_expr_named(name: &str, index: usize) -> PhysicalSortExpr { pub struct TestScan { schema: SchemaRef, output_ordering: Vec, - plan_properties: PlanProperties, + plan_properties: Arc, // Store the requested ordering for display requested_ordering: Option, } @@ -859,7 +859,7 @@ impl TestScan { Self { schema, output_ordering, - plan_properties, + plan_properties: Arc::new(plan_properties), requested_ordering: None, } } @@ -915,7 +915,7 @@ impl ExecutionPlan for TestScan { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.plan_properties } diff --git a/datafusion/core/tests/user_defined/insert_operation.rs b/datafusion/core/tests/user_defined/insert_operation.rs index 7ad00dece1b24..4d2a31ca1f960 100644 --- a/datafusion/core/tests/user_defined/insert_operation.rs +++ b/datafusion/core/tests/user_defined/insert_operation.rs @@ -122,20 +122,22 @@ impl TableProvider for TestInsertTableProvider { #[derive(Debug)] struct TestInsertExec { op: InsertOp, - plan_properties: PlanProperties, + plan_properties: Arc, } impl TestInsertExec { fn new(op: InsertOp) -> Self { Self { op, - plan_properties: PlanProperties::new( - EquivalenceProperties::new(make_count_schema()), - Partitioning::UnknownPartitioning(1), - EmissionType::Incremental, - Boundedness::Bounded, - ) - .with_scheduling_type(SchedulingType::Cooperative), + plan_properties: Arc::new( + PlanProperties::new( + EquivalenceProperties::new(make_count_schema()), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ) + .with_scheduling_type(SchedulingType::Cooperative), + ), } } } @@ -159,7 +161,7 @@ impl ExecutionPlan for TestInsertExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.plan_properties } diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index d53e076739608..c2533e73d2be9 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -653,13 +653,17 @@ struct TopKExec { input: Arc, /// The maximum number of values k: usize, - cache: PlanProperties, + cache: Arc, } impl TopKExec { fn new(input: Arc, k: usize) -> Self { let cache = Self::compute_properties(input.schema()); - Self { input, k, cache } + Self { + input, + k, + cache: Arc::new(cache), + } } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -704,7 +708,7 @@ impl ExecutionPlan for TopKExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 1d12bb3200309..3fc388cd3c4ad 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -262,7 +262,7 @@ impl MemorySourceConfig { schema: SchemaRef, projection: Option>, ) -> Result { - let projected_schema = project_schema(&schema, projection.as_ref())?; + let projected_schema = project_schema(&schema, projection.as_deref())?; Ok(Self { partitions: partitions.to_vec(), schema, diff --git a/datafusion/datasource/src/sink.rs b/datafusion/datasource/src/sink.rs index 5460a0ffdc3df..0533a83b82896 100644 --- a/datafusion/datasource/src/sink.rs +++ b/datafusion/datasource/src/sink.rs @@ -89,7 +89,7 @@ pub struct DataSinkExec { count_schema: SchemaRef, /// Optional required sort order for output data. sort_order: Option, - cache: PlanProperties, + cache: Arc, } impl Debug for DataSinkExec { @@ -117,7 +117,7 @@ impl DataSinkExec { sink, count_schema: make_count_schema(), sort_order, - cache, + cache: Arc::new(cache), } } @@ -174,7 +174,7 @@ impl ExecutionPlan for DataSinkExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index a3892dfac9778..5d7a3d67b195d 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -74,8 +74,8 @@ use datafusion_physical_plan::filter_pushdown::{ /// ```text /// ┌─────────────────────┐ -----► execute path /// │ │ ┄┄┄┄┄► init path -/// │ DataSourceExec │ -/// │ │ +/// │ DataSourceExec │ +/// │ │ /// └───────▲─────────────┘ /// ┊ │ /// ┊ │ @@ -229,7 +229,7 @@ pub struct DataSourceExec { /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig` data_source: Arc, /// Cached plan properties such as sort order - cache: PlanProperties, + cache: Arc, } impl DisplayAs for DataSourceExec { @@ -253,7 +253,7 @@ impl ExecutionPlan for DataSourceExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -323,7 +323,7 @@ impl ExecutionPlan for DataSourceExec { fn with_fetch(&self, limit: Option) -> Option> { let data_source = self.data_source.with_fetch(limit)?; - let cache = self.cache.clone(); + let cache = Arc::clone(&self.cache); Some(Arc::new(Self { data_source, cache })) } @@ -367,7 +367,8 @@ impl ExecutionPlan for DataSourceExec { let mut new_node = self.clone(); new_node.data_source = data_source; // Re-compute properties since we have new filters which will impact equivalence info - new_node.cache = Self::compute_properties(&new_node.data_source); + new_node.cache = + Arc::new(Self::compute_properties(&new_node.data_source)); Ok(FilterPushdownPropagation { filters: res.filters, @@ -403,7 +404,10 @@ impl DataSourceExec { // Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`. pub fn new(data_source: Arc) -> Self { let cache = Self::compute_properties(&data_source); - Self { data_source, cache } + Self { + data_source, + cache: Arc::new(cache), + } } /// Return the source object @@ -412,20 +416,20 @@ impl DataSourceExec { } pub fn with_data_source(mut self, data_source: Arc) -> Self { - self.cache = Self::compute_properties(&data_source); + self.cache = Arc::new(Self::compute_properties(&data_source)); self.data_source = data_source; self } /// Assign constraints pub fn with_constraints(mut self, constraints: Constraints) -> Self { - self.cache = self.cache.with_constraints(constraints); + Arc::make_mut(&mut self.cache).set_constraints(constraints); self } /// Assign output partitioning pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self { - self.cache = self.cache.with_partitioning(partitioning); + Arc::make_mut(&mut self.cache).partitioning = partitioning; self } diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index c879b022067c3..ec8f7538fee1a 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -90,7 +90,7 @@ impl FFI_ExecutionPlan { unsafe extern "C" fn properties_fn_wrapper( plan: &FFI_ExecutionPlan, ) -> FFI_PlanProperties { - plan.inner().properties().into() + plan.inner().properties().as_ref().into() } unsafe extern "C" fn children_fn_wrapper( @@ -192,7 +192,7 @@ impl Drop for FFI_ExecutionPlan { pub struct ForeignExecutionPlan { name: String, plan: FFI_ExecutionPlan, - properties: PlanProperties, + properties: Arc, children: Vec>, } @@ -244,7 +244,7 @@ impl TryFrom<&FFI_ExecutionPlan> for Arc { let plan = ForeignExecutionPlan { name, plan: plan.clone(), - properties, + properties: Arc::new(properties), children, }; @@ -262,7 +262,7 @@ impl ExecutionPlan for ForeignExecutionPlan { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.properties } @@ -278,7 +278,7 @@ impl ExecutionPlan for ForeignExecutionPlan { plan: self.plan.clone(), name: self.name.clone(), children, - properties: self.properties.clone(), + properties: Arc::clone(&self.properties), })) } @@ -305,19 +305,19 @@ pub(crate) mod tests { #[derive(Debug)] pub struct EmptyExec { - props: PlanProperties, + props: Arc, children: Vec>, } impl EmptyExec { pub fn new(schema: arrow::datatypes::SchemaRef) -> Self { Self { - props: PlanProperties::new( + props: Arc::new(PlanProperties::new( datafusion::physical_expr::EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(3), EmissionType::Incremental, Boundedness::Bounded, - ), + )), children: Vec::default(), } } @@ -342,7 +342,7 @@ pub(crate) mod tests { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.props } @@ -355,7 +355,7 @@ pub(crate) mod tests { children: Vec>, ) -> Result> { Ok(Arc::new(EmptyExec { - props: self.props.clone(), + props: Arc::clone(&self.props), children, })) } diff --git a/datafusion/ffi/src/tests/async_provider.rs b/datafusion/ffi/src/tests/async_provider.rs index 6149736c58555..8370cf19e6589 100644 --- a/datafusion/ffi/src/tests/async_provider.rs +++ b/datafusion/ffi/src/tests/async_provider.rs @@ -162,7 +162,7 @@ impl Drop for AsyncTableProvider { #[derive(Debug)] struct AsyncTestExecutionPlan { - properties: datafusion_physical_plan::PlanProperties, + properties: Arc, batch_request: mpsc::Sender, batch_receiver: broadcast::Receiver>, } @@ -173,12 +173,12 @@ impl AsyncTestExecutionPlan { batch_receiver: broadcast::Receiver>, ) -> Self { Self { - properties: datafusion_physical_plan::PlanProperties::new( + properties: Arc::new(datafusion_physical_plan::PlanProperties::new( EquivalenceProperties::new(super::create_test_schema()), Partitioning::UnknownPartitioning(3), datafusion_physical_plan::execution_plan::EmissionType::Incremental, datafusion_physical_plan::execution_plan::Boundedness::Bounded, - ), + )), batch_request, batch_receiver, } @@ -194,7 +194,7 @@ impl ExecutionPlan for AsyncTestExecutionPlan { self } - fn properties(&self) -> &datafusion_physical_plan::PlanProperties { + fn properties(&self) -> &Arc { &self.properties } diff --git a/datafusion/physical-expr/src/equivalence/properties/mod.rs b/datafusion/physical-expr/src/equivalence/properties/mod.rs index 70f97139f8af4..f643445a449ae 100644 --- a/datafusion/physical-expr/src/equivalence/properties/mod.rs +++ b/datafusion/physical-expr/src/equivalence/properties/mod.rs @@ -207,8 +207,13 @@ impl EquivalenceProperties { } /// Adds constraints to the properties. - pub fn with_constraints(mut self, constraints: Constraints) -> Self { + pub fn set_constraints(&mut self, constraints: Constraints) { self.constraints = constraints; + } + + /// Adds constraints to the properties. + pub fn with_constraints(mut self, constraints: Constraints) -> Self { + self.set_constraints(constraints); self } diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index 540fd620c92ce..f0e3ce4b79d48 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -125,7 +125,7 @@ impl From for (Arc, String) { /// indices. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ProjectionExprs { - exprs: Vec, + exprs: Arc<[ProjectionExpr]>, } impl std::fmt::Display for ProjectionExprs { @@ -137,14 +137,16 @@ impl std::fmt::Display for ProjectionExprs { impl From> for ProjectionExprs { fn from(value: Vec) -> Self { - Self { exprs: value } + Self { + exprs: value.into_iter().collect(), + } } } impl From<&[ProjectionExpr]> for ProjectionExprs { fn from(value: &[ProjectionExpr]) -> Self { Self { - exprs: value.to_vec(), + exprs: value.iter().cloned().collect(), } } } @@ -152,7 +154,7 @@ impl From<&[ProjectionExpr]> for ProjectionExprs { impl FromIterator for ProjectionExprs { fn from_iter>(exprs: T) -> Self { Self { - exprs: exprs.into_iter().collect::>(), + exprs: exprs.into_iter().collect(), } } } @@ -169,7 +171,7 @@ impl ProjectionExprs { I: IntoIterator, { Self { - exprs: exprs.into_iter().collect::>(), + exprs: exprs.into_iter().collect(), } } @@ -285,13 +287,13 @@ impl ProjectionExprs { { let exprs = self .exprs - .into_iter() - .map(|mut proj| { - proj.expr = f(proj.expr)?; - Ok(proj) + .iter() + .map(|proj| { + let expr = f(Arc::clone(&proj.expr))?; + Ok(ProjectionExpr::new(expr, proj.alias.clone())) }) - .collect::>>()?; - Ok(Self::new(exprs)) + .collect::>()?; + Ok(Self { exprs }) } /// Apply another projection on top of this projection, returning the combined projection. @@ -361,7 +363,7 @@ impl ProjectionExprs { /// applied on top of this projection. pub fn try_merge(&self, other: &ProjectionExprs) -> Result { let mut new_exprs = Vec::with_capacity(other.exprs.len()); - for proj_expr in &other.exprs { + for proj_expr in other.exprs.iter() { let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)? .ok_or_else(|| { internal_datafusion_err!( @@ -607,7 +609,7 @@ impl ProjectionExprs { ) -> Result { let mut column_statistics = vec![]; - for proj_expr in &self.exprs { + for proj_expr in self.exprs.iter() { let expr = &proj_expr.expr; let col_stats = if let Some(col) = expr.as_any().downcast_ref::() { std::mem::take(&mut stats.column_statistics[col.index()]) @@ -754,15 +756,6 @@ impl Projector { } } -impl IntoIterator for ProjectionExprs { - type Item = ProjectionExpr; - type IntoIter = std::vec::IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.exprs.into_iter() - } -} - /// The function operates in two modes: /// /// 1) When `sync_with_child` is `true`: diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index f3ec083efb240..957359e2f05c9 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -312,7 +312,7 @@ pub fn adjust_input_keys_ordering( filter.clone(), join_type, // TODO: although projection is not used in the join here, because projection pushdown is after enforce_distribution. Maybe we need to handle it later. Same as filter. - projection.clone(), + projection.clone().map(|p| p.to_vec()), PartitionMode::Partitioned, *null_equality, *null_aware, @@ -644,7 +644,7 @@ pub fn reorder_join_keys_to_inputs( new_join_on, filter.clone(), join_type, - projection.clone(), + projection.clone().map(|p| p.to_vec()), PartitionMode::Partitioned, *null_equality, *null_aware, diff --git a/datafusion/physical-optimizer/src/join_selection.rs b/datafusion/physical-optimizer/src/join_selection.rs index 7412d0ba97812..5c16f45ee72f1 100644 --- a/datafusion/physical-optimizer/src/join_selection.rs +++ b/datafusion/physical-optimizer/src/join_selection.rs @@ -197,7 +197,7 @@ pub(crate) fn try_collect_left( hash_join.on().to_vec(), hash_join.filter().cloned(), hash_join.join_type(), - hash_join.projection.clone(), + hash_join.projection.clone().map(|p| p.to_vec()), PartitionMode::CollectLeft, hash_join.null_equality(), hash_join.null_aware, @@ -210,7 +210,7 @@ pub(crate) fn try_collect_left( hash_join.on().to_vec(), hash_join.filter().cloned(), hash_join.join_type(), - hash_join.projection.clone(), + hash_join.projection.clone().map(|p| p.to_vec()), PartitionMode::CollectLeft, hash_join.null_equality(), hash_join.null_aware, @@ -260,7 +260,7 @@ pub(crate) fn partitioned_hash_join( hash_join.on().to_vec(), hash_join.filter().cloned(), hash_join.join_type(), - hash_join.projection.clone(), + hash_join.projection.clone().map(|p| p.to_vec()), partition_mode, hash_join.null_equality(), hash_join.null_aware, diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index 0dc6a25fbc0b7..9c4169ec654f8 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -98,7 +98,7 @@ pub struct OutputRequirementExec { input: Arc, order_requirement: Option, dist_requirement: Distribution, - cache: PlanProperties, + cache: Arc, fetch: Option, } @@ -114,7 +114,7 @@ impl OutputRequirementExec { input, order_requirement: requirements, dist_requirement, - cache, + cache: Arc::new(cache), fetch, } } @@ -200,7 +200,7 @@ impl ExecutionPlan for OutputRequirementExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-optimizer/src/update_aggr_exprs.rs b/datafusion/physical-optimizer/src/update_aggr_exprs.rs index c0aab4080da77..19d46c8e5e6e9 100644 --- a/datafusion/physical-optimizer/src/update_aggr_exprs.rs +++ b/datafusion/physical-optimizer/src/update_aggr_exprs.rs @@ -108,7 +108,7 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder { input.equivalence_properties(), )?; - let aggr_exec = aggr_exec.with_new_aggr_exprs(aggr_exprs); + let aggr_exec = aggr_exec.with_new_aggr_exprs(aggr_exprs.into()); Ok(Transformed::yes(Arc::new(aggr_exec) as _)) } else { diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 4dd9482ac4322..5219a2bd2afce 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -25,7 +25,9 @@ use crate::aggregates::{ no_grouping::AggregateStream, row_hash::GroupedHashAggregateStream, topk_stream::GroupedTopKAggregateStream, }; -use crate::execution_plan::{CardinalityEffect, EmissionType}; +use crate::execution_plan::{ + CardinalityEffect, EmissionType, has_same_children_properties, +}; use crate::filter_pushdown::{ ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, PushedDownPredicate, @@ -33,7 +35,7 @@ use crate::filter_pushdown::{ use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::{ DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode, - SendableRecordBatchStream, Statistics, + SendableRecordBatchStream, Statistics, check_if_same_properties, }; use datafusion_common::config::ConfigOptions; use datafusion_physical_expr::utils::collect_columns; @@ -510,9 +512,9 @@ pub struct AggregateExec { /// Group by expressions group_by: PhysicalGroupBy, /// Aggregate expressions - aggr_expr: Vec>, + aggr_expr: Arc<[Arc]>, /// FILTER (WHERE clause) expression for each aggregate expression - filter_expr: Vec>>, + filter_expr: Arc<[Option>]>, /// Set if the output of this aggregation is truncated by a upstream sort/limit clause limit: Option, /// Input plan, could be a partial aggregate or the input to the aggregate @@ -530,7 +532,7 @@ pub struct AggregateExec { required_input_ordering: Option, /// Describes how the input is ordered relative to the group by columns input_order_mode: InputOrderMode, - cache: PlanProperties, + cache: Arc, /// During initialization, if the plan supports dynamic filtering (see [`AggrDynFilter`]), /// it is set to `Some(..)` regardless of whether it can be pushed down to a child node. /// @@ -546,7 +548,7 @@ impl AggregateExec { /// Rewrites aggregate exec with new aggregate expressions. pub fn with_new_aggr_exprs( &self, - aggr_expr: Vec>, + aggr_expr: Arc<[Arc]>, ) -> Self { Self { aggr_expr, @@ -554,10 +556,10 @@ impl AggregateExec { required_input_ordering: self.required_input_ordering.clone(), metrics: ExecutionPlanMetricsSet::new(), input_order_mode: self.input_order_mode.clone(), - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), mode: self.mode, group_by: self.group_by.clone(), - filter_expr: self.filter_expr.clone(), + filter_expr: Arc::clone(&self.filter_expr), limit: self.limit, input: Arc::clone(&self.input), schema: Arc::clone(&self.schema), @@ -574,11 +576,11 @@ impl AggregateExec { required_input_ordering: self.required_input_ordering.clone(), metrics: ExecutionPlanMetricsSet::new(), input_order_mode: self.input_order_mode.clone(), - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), mode: self.mode, group_by: self.group_by.clone(), - aggr_expr: self.aggr_expr.clone(), - filter_expr: self.filter_expr.clone(), + aggr_expr: Arc::clone(&self.aggr_expr), + filter_expr: Arc::clone(&self.filter_expr), input: Arc::clone(&self.input), schema: Arc::clone(&self.schema), input_schema: Arc::clone(&self.input_schema), @@ -702,8 +704,8 @@ impl AggregateExec { let mut exec = AggregateExec { mode, group_by, - aggr_expr, - filter_expr, + aggr_expr: aggr_expr.into(), + filter_expr: filter_expr.into(), input, schema, input_schema, @@ -711,7 +713,7 @@ impl AggregateExec { required_input_ordering, limit: None, input_order_mode, - cache, + cache: Arc::new(cache), dynamic_filter: None, }; @@ -1060,6 +1062,17 @@ impl AggregateExec { _ => Precision::Absent, } } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for AggregateExec { @@ -1195,7 +1208,7 @@ impl ExecutionPlan for AggregateExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -1236,14 +1249,16 @@ impl ExecutionPlan for AggregateExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); + let mut me = AggregateExec::try_new_with_schema( self.mode, self.group_by.clone(), - self.aggr_expr.clone(), - self.filter_expr.clone(), - Arc::clone(&children[0]), + self.aggr_expr.to_vec(), + self.filter_expr.to_vec(), + children.swap_remove(0), Arc::clone(&self.input_schema), Arc::clone(&self.schema), )?; @@ -2281,14 +2296,17 @@ mod tests { struct TestYieldingExec { /// True if this exec should yield back to runtime the first time it is polled pub yield_first: bool, - cache: PlanProperties, + cache: Arc, } impl TestYieldingExec { fn new(yield_first: bool) -> Self { let schema = some_data().0; let cache = Self::compute_properties(schema); - Self { yield_first, cache } + Self { + yield_first, + cache: Arc::new(cache), + } } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -2329,7 +2347,7 @@ mod tests { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index a55d70ca6fb27..576fe25e07669 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -276,7 +276,7 @@ impl AggregateStream { partition: usize, ) -> Result { let agg_schema = Arc::clone(&agg.schema); - let agg_filter_expr = agg.filter_expr.clone(); + let agg_filter_expr = Arc::clone(&agg.filter_expr); let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); let input = agg.input.execute(partition, Arc::clone(context))?; @@ -285,7 +285,7 @@ impl AggregateStream { let filter_expressions = match agg.mode { AggregateMode::Partial | AggregateMode::Single - | AggregateMode::SinglePartitioned => agg_filter_expr, + | AggregateMode::SinglePartitioned => agg_filter_expr.to_vec(), AggregateMode::Final | AggregateMode::FinalPartitioned => { vec![None; agg.aggr_expr.len()] } diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 1ae7202711112..0df66145aa357 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -466,7 +466,7 @@ impl GroupedHashAggregateStream { debug!("Creating GroupedHashAggregateStream"); let agg_schema = Arc::clone(&agg.schema); let agg_group_by = agg.group_by.clone(); - let agg_filter_expr = agg.filter_expr.clone(); + let agg_filter_expr = Arc::clone(&agg.filter_expr); let batch_size = context.session_config().batch_size(); let input = agg.input.execute(partition, Arc::clone(context))?; @@ -475,7 +475,7 @@ impl GroupedHashAggregateStream { let timer = baseline_metrics.elapsed_compute().timer(); - let aggregate_exprs = agg.aggr_expr.clone(); + let aggregate_exprs = Arc::clone(&agg.aggr_expr); // arguments for each aggregate, one vec of expressions per // aggregate @@ -494,7 +494,7 @@ impl GroupedHashAggregateStream { let filter_expressions = match agg.mode { AggregateMode::Partial | AggregateMode::Single - | AggregateMode::SinglePartitioned => agg_filter_expr, + | AggregateMode::SinglePartitioned => agg_filter_expr.to_vec(), AggregateMode::Final | AggregateMode::FinalPartitioned => { vec![None; agg.aggr_expr.len()] } diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 1fb8f93a38782..eca31ea0e194f 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -51,7 +51,7 @@ pub struct AnalyzeExec { pub(crate) input: Arc, /// The output schema for RecordBatches of this exec node schema: SchemaRef, - cache: PlanProperties, + cache: Arc, } impl AnalyzeExec { @@ -70,7 +70,7 @@ impl AnalyzeExec { metric_types, input, schema, - cache, + cache: Arc::new(cache), } } @@ -131,7 +131,7 @@ impl ExecutionPlan for AnalyzeExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/async_func.rs b/datafusion/physical-plan/src/async_func.rs index a61fd95949d1a..93701a25b3bf4 100644 --- a/datafusion/physical-plan/src/async_func.rs +++ b/datafusion/physical-plan/src/async_func.rs @@ -16,10 +16,12 @@ // under the License. use crate::coalesce::LimitedBatchCoalescer; +use crate::execution_plan::has_same_children_properties; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::stream::RecordBatchStreamAdapter; use crate::{ DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, + check_if_same_properties, }; use arrow::array::RecordBatch; use arrow_schema::{Fields, Schema, SchemaRef}; @@ -45,12 +47,12 @@ use std::task::{Context, Poll, ready}; /// /// The schema of the output of the AsyncFuncExec is: /// Input columns followed by one column for each async expression -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct AsyncFuncExec { /// The async expressions to evaluate async_exprs: Vec>, input: Arc, - cache: PlanProperties, + cache: Arc, metrics: ExecutionPlanMetricsSet, } @@ -84,7 +86,7 @@ impl AsyncFuncExec { Ok(Self { input, async_exprs, - cache, + cache: Arc::new(cache), metrics: ExecutionPlanMetricsSet::new(), }) } @@ -113,6 +115,17 @@ impl AsyncFuncExec { pub fn input(&self) -> &Arc { &self.input } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for AsyncFuncExec { @@ -149,7 +162,7 @@ impl ExecutionPlan for AsyncFuncExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -159,16 +172,17 @@ impl ExecutionPlan for AsyncFuncExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { assert_eq_or_internal_err!( children.len(), 1, "AsyncFuncExec wrong number of children" ); + check_if_same_properties!(self, children); Ok(Arc::new(AsyncFuncExec::try_new( self.async_exprs.clone(), - Arc::clone(&children[0]), + children.swap_remove(0), )?)) } diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index dfcd3cb0bcae7..27a521ec6bc5d 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -27,6 +27,7 @@ use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics}; use crate::projection::ProjectionExec; use crate::{ DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, + check_if_same_properties, }; use arrow::datatypes::SchemaRef; @@ -36,7 +37,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus}; -use crate::execution_plan::CardinalityEffect; +use crate::execution_plan::{CardinalityEffect, has_same_children_properties}; use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, @@ -71,7 +72,7 @@ pub struct CoalesceBatchesExec { fetch: Option, /// Execution metrics metrics: ExecutionPlanMetricsSet, - cache: PlanProperties, + cache: Arc, } #[expect(deprecated)] @@ -84,7 +85,7 @@ impl CoalesceBatchesExec { target_batch_size, fetch: None, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), } } @@ -115,6 +116,17 @@ impl CoalesceBatchesExec { input.boundedness(), ) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } #[expect(deprecated)] @@ -159,7 +171,7 @@ impl ExecutionPlan for CoalesceBatchesExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -177,10 +189,11 @@ impl ExecutionPlan for CoalesceBatchesExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new( - CoalesceBatchesExec::new(Arc::clone(&children[0]), self.target_batch_size) + CoalesceBatchesExec::new(children.swap_remove(0), self.target_batch_size) .with_fetch(self.fetch), )) } @@ -222,7 +235,7 @@ impl ExecutionPlan for CoalesceBatchesExec { target_batch_size: self.target_batch_size, fetch: limit, metrics: self.metrics.clone(), - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), })) } diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index d83f90eb3d8c1..8210a152b2e8f 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -27,11 +27,13 @@ use super::{ DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream, Statistics, }; -use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; +use crate::execution_plan::{ + CardinalityEffect, EvaluationType, SchedulingType, has_same_children_properties, +}; use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase}; use crate::projection::{ProjectionExec, make_with_child}; use crate::sort_pushdown::SortOrderPushdownResult; -use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; +use crate::{DisplayFormatType, ExecutionPlan, Partitioning, check_if_same_properties}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_common::config::ConfigOptions; @@ -47,7 +49,7 @@ pub struct CoalescePartitionsExec { input: Arc, /// Execution metrics metrics: ExecutionPlanMetricsSet, - cache: PlanProperties, + cache: Arc, /// Optional number of rows to fetch. Stops producing rows after this fetch pub(crate) fetch: Option, } @@ -59,7 +61,7 @@ impl CoalescePartitionsExec { CoalescePartitionsExec { input, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), fetch: None, } } @@ -100,6 +102,17 @@ impl CoalescePartitionsExec { .with_evaluation_type(drive) .with_scheduling_type(scheduling) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for CoalescePartitionsExec { @@ -135,7 +148,7 @@ impl ExecutionPlan for CoalescePartitionsExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -149,9 +162,10 @@ impl ExecutionPlan for CoalescePartitionsExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { - let mut plan = CoalescePartitionsExec::new(Arc::clone(&children[0])); + check_if_same_properties!(self, children); + let mut plan = CoalescePartitionsExec::new(children.swap_remove(0)); plan.fetch = self.fetch; Ok(Arc::new(plan)) } @@ -274,7 +288,7 @@ impl ExecutionPlan for CoalescePartitionsExec { input: Arc::clone(&self.input), fetch: limit, metrics: self.metrics.clone(), - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), })) } diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index 32dc60b56ad48..590f6f09e8b9e 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -181,7 +181,7 @@ pub fn compute_record_batch_statistics( /// Checks if the given projection is valid for the given schema. pub fn can_project( schema: &arrow::datatypes::SchemaRef, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result<()> { match projection { Some(columns) => { diff --git a/datafusion/physical-plan/src/coop.rs b/datafusion/physical-plan/src/coop.rs index a1fad86777408..6740ed43d35ec 100644 --- a/datafusion/physical-plan/src/coop.rs +++ b/datafusion/physical-plan/src/coop.rs @@ -82,14 +82,14 @@ use crate::filter_pushdown::{ use crate::projection::ProjectionExec; use crate::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, SortOrderPushdownResult, + SendableRecordBatchStream, SortOrderPushdownResult, check_if_same_properties, }; use arrow::record_batch::RecordBatch; use arrow_schema::Schema; use datafusion_common::{Result, Statistics, assert_eq_or_internal_err}; use datafusion_execution::TaskContext; -use crate::execution_plan::SchedulingType; +use crate::execution_plan::{SchedulingType, has_same_children_properties}; use crate::stream::RecordBatchStreamAdapter; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use futures::{Stream, StreamExt}; @@ -212,16 +212,15 @@ where #[derive(Debug, Clone)] pub struct CooperativeExec { input: Arc, - properties: PlanProperties, + properties: Arc, } impl CooperativeExec { /// Creates a new `CooperativeExec` operator that wraps the given input execution plan. pub fn new(input: Arc) -> Self { - let properties = input - .properties() - .clone() - .with_scheduling_type(SchedulingType::Cooperative); + let properties = PlanProperties::clone(input.properties()) + .with_scheduling_type(SchedulingType::Cooperative) + .into(); Self { input, properties } } @@ -230,6 +229,16 @@ impl CooperativeExec { pub fn input(&self) -> &Arc { &self.input } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + ..Self::clone(self) + } + } } impl DisplayAs for CooperativeExec { @@ -255,7 +264,7 @@ impl ExecutionPlan for CooperativeExec { self.input.schema() } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.properties } @@ -276,6 +285,7 @@ impl ExecutionPlan for CooperativeExec { 1, "CooperativeExec requires exactly one child" ); + check_if_same_properties!(self, children); Ok(Arc::new(CooperativeExec::new(children.swap_remove(0)))) } diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index 52c37a106b39e..d525f44541d5f 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -1153,7 +1153,7 @@ mod tests { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { unimplemented!() } diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index fcfbcfa3e8277..d3e62f91c0570 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -43,7 +43,7 @@ pub struct EmptyExec { schema: SchemaRef, /// Number of partitions partitions: usize, - cache: PlanProperties, + cache: Arc, } impl EmptyExec { @@ -53,7 +53,7 @@ impl EmptyExec { EmptyExec { schema, partitions: 1, - cache, + cache: Arc::new(cache), } } @@ -62,7 +62,7 @@ impl EmptyExec { self.partitions = partitions; // Changing partitions may invalidate output partitioning, so update it: let output_partitioning = Self::output_partitioning_helper(self.partitions); - self.cache = self.cache.with_partitioning(output_partitioning); + Arc::make_mut(&mut self.cache).partitioning = output_partitioning; self } @@ -114,7 +114,7 @@ impl ExecutionPlan for EmptyExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 8d72921d06735..4fa1367d17fce 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -128,7 +128,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// /// This information is available via methods on [`ExecutionPlanProperties`] /// trait, which is implemented for all `ExecutionPlan`s. - fn properties(&self) -> &PlanProperties; + fn properties(&self) -> &Arc; /// Returns an error if this individual node does not conform to its invariants. /// These invariants are typically only checked in debug mode. @@ -1047,12 +1047,17 @@ impl PlanProperties { self } - /// Overwrite equivalence properties with its new value. - pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self { + /// Set equivalence properties having mut reference. + pub fn set_eq_properties(&mut self, eq_properties: EquivalenceProperties) { // Changing equivalence properties also changes output ordering, so // make sure to overwrite it: self.output_ordering = eq_properties.output_ordering(); self.eq_properties = eq_properties; + } + + /// Overwrite equivalence properties with its new value. + pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self { + self.set_eq_properties(eq_properties); self } @@ -1084,9 +1089,14 @@ impl PlanProperties { self } + /// Set constraints having mut reference. + pub fn set_constraints(&mut self, constraints: Constraints) { + self.eq_properties.set_constraints(constraints); + } + /// Overwrite constraints with its new value. pub fn with_constraints(mut self, constraints: Constraints) -> Self { - self.eq_properties = self.eq_properties.with_constraints(constraints); + self.set_constraints(constraints); self } @@ -1409,6 +1419,41 @@ pub fn reset_plan_states(plan: Arc) -> Result, + children: &[Arc], +) -> Result { + let old_children = plan.children(); + assert_eq_or_internal_err!( + children.len(), + old_children.len(), + "Wrong number of children" + ); + for (lhs, rhs) in plan.children().iter().zip(children.iter()) { + if !Arc::ptr_eq(lhs.properties(), rhs.properties()) { + return Ok(false); + } + } + Ok(true) +} + +/// Helper macro to avoid properties re-computation if passed children properties +/// the same as plan already has. Could be used to implement fast-path for method +/// [`ExecutionPlan::with_new_children`]. +#[macro_export] +macro_rules! check_if_same_properties { + ($plan: expr, $children: expr) => { + if has_same_children_properties(&$plan, &$children)? { + let plan = $plan.with_new_children_and_same_properties($children); + return Ok(Arc::new(plan)); + } + }; +} + /// Utility function yielding a string representation of the given [`ExecutionPlan`]. pub fn get_plan_string(plan: &Arc) -> Vec { let formatted = displayable(plan.as_ref()).indent(true).to_string(); @@ -1471,7 +1516,7 @@ mod tests { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { unimplemented!() } @@ -1538,7 +1583,7 @@ mod tests { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { unimplemented!() } diff --git a/datafusion/physical-plan/src/explain.rs b/datafusion/physical-plan/src/explain.rs index aa3c0afefe8b5..bf21b0484689a 100644 --- a/datafusion/physical-plan/src/explain.rs +++ b/datafusion/physical-plan/src/explain.rs @@ -44,7 +44,7 @@ pub struct ExplainExec { stringified_plans: Vec, /// control which plans to print verbose: bool, - cache: PlanProperties, + cache: Arc, } impl ExplainExec { @@ -59,7 +59,7 @@ impl ExplainExec { schema, stringified_plans, verbose, - cache, + cache: Arc::new(cache), } } @@ -112,7 +112,7 @@ impl ExecutionPlan for ExplainExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index e78e50c9868fb..4adc4c8ce4da9 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -26,9 +26,10 @@ use super::{ ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use crate::check_if_same_properties; use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus}; use crate::common::can_project; -use crate::execution_plan::CardinalityEffect; +use crate::execution_plan::{CardinalityEffect, has_same_children_properties}; use crate::filter_pushdown::{ ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, PushedDown, PushedDownPredicate, @@ -83,9 +84,9 @@ pub struct FilterExec { /// Selectivity for statistics. 0 = no rows, 100 = all rows default_selectivity: u8, /// Properties equivalence properties, partitioning, etc. - cache: PlanProperties, + cache: Arc, /// The projection indices of the columns in the output schema of join - projection: Option>, + projection: Option>, /// Target batch size for output batches batch_size: usize, /// Number of rows to fetch @@ -113,7 +114,7 @@ impl FilterExec { input: Arc::clone(&input), metrics: ExecutionPlanMetricsSet::new(), default_selectivity, - cache, + cache: Arc::new(cache), projection: None, batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE, fetch: None, @@ -139,9 +140,9 @@ impl FilterExec { } /// Return new instance of [FilterExec] with the given projection. - pub fn with_projection(&self, projection: Option>) -> Result { + pub fn with_projection(&self, projection: Option>) -> Result { // Check if the projection is valid - can_project(&self.schema(), projection.as_ref())?; + can_project(&self.schema(), projection.as_deref())?; let projection = match projection { Some(projection) => match &self.projection { @@ -155,14 +156,14 @@ impl FilterExec { &self.input, &self.predicate, self.default_selectivity, - projection.as_ref(), + projection.as_deref(), )?; Ok(Self { predicate: Arc::clone(&self.predicate), input: Arc::clone(&self.input), metrics: self.metrics.clone(), default_selectivity: self.default_selectivity, - cache, + cache: Arc::new(cache), projection, batch_size: self.batch_size, fetch: self.fetch, @@ -175,7 +176,7 @@ impl FilterExec { input: Arc::clone(&self.input), metrics: self.metrics.clone(), default_selectivity: self.default_selectivity, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), projection: self.projection.clone(), batch_size, fetch: self.fetch, @@ -198,8 +199,8 @@ impl FilterExec { } /// Projection - pub fn projection(&self) -> Option<&Vec> { - self.projection.as_ref() + pub fn projection(&self) -> Option<&[usize]> { + self.projection.as_ref().map(|p| p.as_ref()) } /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics. @@ -276,7 +277,7 @@ impl FilterExec { input: &Arc, predicate: &Arc, default_selectivity: u8, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result { // Combine the equal predicates with the input equivalence properties // to construct the equivalence properties: @@ -328,6 +329,17 @@ impl FilterExec { input.boundedness(), )) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for FilterExec { @@ -382,7 +394,7 @@ impl ExecutionPlan for FilterExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -399,12 +411,13 @@ impl ExecutionPlan for FilterExec { self: Arc, mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0)) .and_then(|e| { let selectivity = e.default_selectivity(); e.with_default_selectivity(selectivity) }) - .and_then(|e| e.with_projection(self.projection().cloned())) + .and_then(|e| e.with_projection(self.projection.as_ref().map(Arc::clone))) .map(|e| e.with_fetch(self.fetch).unwrap()) } @@ -425,7 +438,7 @@ impl ExecutionPlan for FilterExec { predicate: Arc::clone(&self.predicate), input: self.input.execute(partition, context)?, metrics, - projection: self.projection.clone(), + projection: self.projection.as_ref().map(|p| p.to_vec()), batch_coalescer: LimitedBatchCoalescer::new( self.schema(), self.batch_size, @@ -453,7 +466,7 @@ impl ExecutionPlan for FilterExec { self.predicate(), self.default_selectivity, )?; - Ok(stats.project(self.projection.as_ref())) + Ok(stats.project(self.projection.as_ref().map(|p| p.as_ref()))) } fn cardinality_effect(&self) -> CardinalityEffect { @@ -583,12 +596,12 @@ impl ExecutionPlan for FilterExec { input: Arc::clone(&filter_input), metrics: self.metrics.clone(), default_selectivity: self.default_selectivity, - cache: Self::compute_properties( + cache: Arc::new(Self::compute_properties( &filter_input, &new_predicate, self.default_selectivity, - self.projection.as_ref(), - )?, + self.projection.as_ref().map(|p| p.as_ref()), + )?), projection: None, batch_size: self.batch_size, fetch: self.fetch, @@ -608,7 +621,7 @@ impl ExecutionPlan for FilterExec { input: Arc::clone(&self.input), metrics: self.metrics.clone(), default_selectivity: self.default_selectivity, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), projection: self.projection.clone(), batch_size: self.batch_size, fetch, @@ -618,7 +631,7 @@ impl ExecutionPlan for FilterExec { impl EmbeddedProjection for FilterExec { fn with_projection(&self, projection: Option>) -> Result { - self.with_projection(projection) + self.with_projection(projection.map(|p| p.into_iter().collect())) } } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 4f32b6176ec39..4df2cf04032b9 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -25,7 +25,9 @@ use super::utils::{ OnceAsync, OnceFut, StatefulStreamResult, adjust_right_output_partitioning, reorder_output_after_swap, }; -use crate::execution_plan::{EmissionType, boundedness_from_children}; +use crate::execution_plan::{ + EmissionType, boundedness_from_children, has_same_children_properties, +}; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::projection::{ ProjectionExec, join_allows_pushdown, join_table_borders, new_join_children, @@ -34,7 +36,7 @@ use crate::projection::{ use crate::{ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, Statistics, handle_state, + SendableRecordBatchStream, Statistics, check_if_same_properties, handle_state, }; use arrow::array::{RecordBatch, RecordBatchOptions}; @@ -94,7 +96,7 @@ pub struct CrossJoinExec { /// Execution plan metrics metrics: ExecutionPlanMetricsSet, /// Properties such as schema, equivalence properties, ordering, partitioning, etc. - cache: PlanProperties, + cache: Arc, } impl CrossJoinExec { @@ -125,7 +127,7 @@ impl CrossJoinExec { schema, left_fut: Default::default(), metrics: ExecutionPlanMetricsSet::default(), - cache, + cache: Arc::new(cache), } } @@ -192,6 +194,23 @@ impl CrossJoinExec { &self.right.schema(), ) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + let left = children.swap_remove(0); + let right = children.swap_remove(0); + + Self { + left, + right, + metrics: ExecutionPlanMetricsSet::new(), + left_fut: Default::default(), + cache: Arc::clone(&self.cache), + schema: Arc::clone(&self.schema), + } + } } /// Asynchronously collect the result of the left child @@ -256,7 +275,7 @@ impl ExecutionPlan for CrossJoinExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -272,6 +291,7 @@ impl ExecutionPlan for CrossJoinExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new(CrossJoinExec::new( Arc::clone(&children[0]), Arc::clone(&children[1]), @@ -285,7 +305,7 @@ impl ExecutionPlan for CrossJoinExec { schema: Arc::clone(&self.schema), left_fut: Default::default(), // reset the build side! metrics: ExecutionPlanMetricsSet::default(), - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), }; Ok(Arc::new(new_exec)) } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 131b07461ebe5..73d2446937616 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -22,7 +22,9 @@ use std::sync::{Arc, OnceLock}; use std::{any::Any, vec}; use crate::ExecutionPlanProperties; -use crate::execution_plan::{EmissionType, boundedness_from_children}; +use crate::execution_plan::{ + EmissionType, boundedness_from_children, has_same_children_properties, +}; use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, @@ -466,7 +468,7 @@ pub struct HashJoinExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// The projection indices of the columns in the output schema of join - pub projection: Option>, + pub projection: Option>, /// Information of index and left / right placement of columns column_indices: Vec, /// The equality null-handling behavior of the join algorithm. @@ -474,7 +476,7 @@ pub struct HashJoinExec { /// Flag to indicate if this is a null-aware anti join pub null_aware: bool, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, /// Dynamic filter for pushing down to the probe side /// Set when dynamic filter pushdown is detected in handle_child_pushdown_result. /// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates. @@ -566,7 +568,7 @@ impl HashJoinExec { let join_schema = Arc::new(join_schema); // check if the projection is valid - can_project(&join_schema, projection.as_ref())?; + can_project(&join_schema, projection.as_deref())?; let cache = Self::compute_properties( &left, @@ -575,7 +577,7 @@ impl HashJoinExec { *join_type, &on, partition_mode, - projection.as_ref(), + projection.as_deref(), )?; // Initialize both dynamic filter and bounds accumulator to None @@ -592,11 +594,11 @@ impl HashJoinExec { random_state, mode: partition_mode, metrics: ExecutionPlanMetricsSet::new(), - projection, + projection: projection.map(Into::into), column_indices, null_equality, null_aware, - cache, + cache: Arc::new(cache), dynamic_filter: None, }) } @@ -688,7 +690,7 @@ impl HashJoinExec { /// Return new instance of [HashJoinExec] with the given projection. pub fn with_projection(&self, projection: Option>) -> Result { // check if the projection is valid - can_project(&self.schema(), projection.as_ref())?; + can_project(&self.schema(), projection.as_deref())?; let projection = match projection { Some(projection) => match &self.projection { Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), @@ -717,7 +719,7 @@ impl HashJoinExec { join_type: JoinType, on: JoinOnRef, mode: PartitionMode, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result { // Calculate equivalence properties: let mut eq_properties = join_equivalence_properties( @@ -824,7 +826,7 @@ impl HashJoinExec { swap_join_projection( left.schema().fields().len(), right.schema().fields().len(), - self.projection.as_ref(), + self.projection.as_deref(), self.join_type(), ), partition_mode, @@ -937,7 +939,7 @@ impl ExecutionPlan for HashJoinExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -998,6 +1000,20 @@ impl ExecutionPlan for HashJoinExec { self: Arc, children: Vec>, ) -> Result> { + let cache = if has_same_children_properties(&self, &children)? { + Arc::clone(&self.cache) + } else { + Arc::new(Self::compute_properties( + &children[0], + &children[1], + &self.join_schema, + self.join_type, + &self.on, + self.mode, + self.projection.as_deref(), + )?) + }; + Ok(Arc::new(HashJoinExec { left: Arc::clone(&children[0]), right: Arc::clone(&children[1]), @@ -1013,15 +1029,7 @@ impl ExecutionPlan for HashJoinExec { column_indices: self.column_indices.clone(), null_equality: self.null_equality, null_aware: self.null_aware, - cache: Self::compute_properties( - &children[0], - &children[1], - &self.join_schema, - self.join_type, - &self.on, - self.mode, - self.projection.as_ref(), - )?, + cache, // Keep the dynamic filter, bounds accumulator will be reset dynamic_filter: self.dynamic_filter.clone(), })) @@ -1044,7 +1052,7 @@ impl ExecutionPlan for HashJoinExec { column_indices: self.column_indices.clone(), null_equality: self.null_equality, null_aware: self.null_aware, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), // Reset dynamic filter and bounds accumulator to initial state dynamic_filter: None, })) @@ -1240,7 +1248,7 @@ impl ExecutionPlan for HashJoinExec { &self.join_schema, )?; // Project statistics if there is a projection - Ok(stats.project(self.projection.as_ref())) + Ok(stats.project(self.projection.as_deref())) } /// Tries to push `projection` down through `hash_join`. If possible, performs the @@ -1373,7 +1381,7 @@ impl ExecutionPlan for HashJoinExec { column_indices: self.column_indices.clone(), null_equality: self.null_equality, null_aware: self.null_aware, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), dynamic_filter: Some(HashJoinExecDynamicFilter { filter: dynamic_filter, build_accumulator: OnceLock::new(), diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 44637321a7e35..8d36f73ea9c28 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -29,7 +29,9 @@ use super::utils::{ reorder_output_after_swap, swap_join_projection, }; use crate::common::can_project; -use crate::execution_plan::{EmissionType, boundedness_from_children}; +use crate::execution_plan::{ + EmissionType, boundedness_from_children, has_same_children_properties, +}; use crate::joins::SharedBitmapBuilder; use crate::joins::utils::{ BuildProbeJoinMetrics, ColumnIndex, JoinFilter, OnceAsync, OnceFut, @@ -46,6 +48,7 @@ use crate::projection::{ use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, + check_if_same_properties, }; use arrow::array::{ @@ -197,7 +200,7 @@ pub struct NestedLoopJoinExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl NestedLoopJoinExec { @@ -233,7 +236,7 @@ impl NestedLoopJoinExec { column_indices, projection, metrics: Default::default(), - cache, + cache: Arc::new(cache), }) } @@ -335,7 +338,7 @@ impl NestedLoopJoinExec { pub fn with_projection(&self, projection: Option>) -> Result { // check if the projection is valid - can_project(&self.schema(), projection.as_ref())?; + can_project(&self.schema(), projection.as_deref())?; let projection = match projection { Some(projection) => match &self.projection { Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), @@ -371,7 +374,7 @@ impl NestedLoopJoinExec { swap_join_projection( left.schema().fields().len(), right.schema().fields().len(), - self.projection.as_ref(), + self.projection.as_deref(), self.join_type(), ), )?; @@ -399,6 +402,27 @@ impl NestedLoopJoinExec { Ok(plan) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + let left = children.swap_remove(0); + let right = children.swap_remove(0); + + Self { + left, + right, + metrics: ExecutionPlanMetricsSet::new(), + build_side_data: Default::default(), + cache: Arc::clone(&self.cache), + filter: self.filter.clone(), + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + column_indices: self.column_indices.clone(), + projection: self.projection.clone(), + } + } } impl DisplayAs for NestedLoopJoinExec { @@ -453,7 +477,7 @@ impl ExecutionPlan for NestedLoopJoinExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -476,6 +500,7 @@ impl ExecutionPlan for NestedLoopJoinExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new(NestedLoopJoinExec::try_new( Arc::clone(&children[0]), Arc::clone(&children[1]), @@ -577,7 +602,7 @@ impl ExecutionPlan for NestedLoopJoinExec { &self.join_schema, )?; - Ok(stats.project(self.projection.as_ref())) + Ok(stats.project(self.projection.as_deref())) } /// Tries to push `projection` down through `nested_loop_join`. If possible, performs the diff --git a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs index 508be2e3984f4..235f29f03ffe2 100644 --- a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs @@ -41,7 +41,9 @@ use std::fmt::Formatter; use std::sync::Arc; use std::sync::atomic::AtomicUsize; -use crate::execution_plan::{EmissionType, boundedness_from_children}; +use crate::execution_plan::{ + EmissionType, boundedness_from_children, has_same_children_properties, +}; use crate::joins::piecewise_merge_join::classic_join::{ ClassicPWMJStream, PiecewiseMergeJoinStreamState, @@ -51,7 +53,9 @@ use crate::joins::piecewise_merge_join::utils::{ }; use crate::joins::utils::asymmetric_join_output_partitioning; use crate::metrics::MetricsSet; -use crate::{DisplayAs, DisplayFormatType, ExecutionPlanProperties}; +use crate::{ + DisplayAs, DisplayFormatType, ExecutionPlanProperties, check_if_same_properties, +}; use crate::{ ExecutionPlan, PlanProperties, joins::{ @@ -86,7 +90,7 @@ use crate::{ /// Both sides are sorted so that we can iterate from index 0 to the end on each side. This ordering ensures /// that when we find the first matching pair of rows, we can emit the current stream row joined with all remaining /// probe rows from the match position onward, without rescanning earlier probe rows. -/// +/// /// For `<` and `<=` operators, both inputs are sorted in **descending** order, while for `>` and `>=` operators /// they are sorted in **ascending** order. This choice ensures that the pointer on the buffered side can advance /// monotonically as we stream new batches from the stream side. @@ -129,34 +133,34 @@ use crate::{ /// /// Processing Row 1: /// -/// Sorted Buffered Side Sorted Streamed Side -/// ┌──────────────────┐ ┌──────────────────┐ -/// 1 │ 100 │ 1 │ 100 │ -/// ├──────────────────┤ ├──────────────────┤ -/// 2 │ 200 │ ─┐ 2 │ 200 │ -/// ├──────────────────┤ │ For row 1 on streamed side with ├──────────────────┤ -/// 3 │ 200 │ │ value 100, we emit rows 2 - 5. 3 │ 500 │ +/// Sorted Buffered Side Sorted Streamed Side +/// ┌──────────────────┐ ┌──────────────────┐ +/// 1 │ 100 │ 1 │ 100 │ +/// ├──────────────────┤ ├──────────────────┤ +/// 2 │ 200 │ ─┐ 2 │ 200 │ +/// ├──────────────────┤ │ For row 1 on streamed side with ├──────────────────┤ +/// 3 │ 200 │ │ value 100, we emit rows 2 - 5. 3 │ 500 │ /// ├──────────────────┤ │ as matches when the operator is └──────────────────┘ /// 4 │ 300 │ │ `Operator::Lt` (<) Emitting all /// ├──────────────────┤ │ rows after the first match (row /// 5 │ 400 │ ─┘ 2 buffered side; 100 < 200) -/// └──────────────────┘ +/// └──────────────────┘ /// /// Processing Row 2: /// By sorting the streamed side we know /// -/// Sorted Buffered Side Sorted Streamed Side -/// ┌──────────────────┐ ┌──────────────────┐ -/// 1 │ 100 │ 1 │ 100 │ -/// ├──────────────────┤ ├──────────────────┤ -/// 2 │ 200 │ <- Start here when probing for the 2 │ 200 │ -/// ├──────────────────┤ streamed side row 2. ├──────────────────┤ -/// 3 │ 200 │ 3 │ 500 │ +/// Sorted Buffered Side Sorted Streamed Side +/// ┌──────────────────┐ ┌──────────────────┐ +/// 1 │ 100 │ 1 │ 100 │ +/// ├──────────────────┤ ├──────────────────┤ +/// 2 │ 200 │ <- Start here when probing for the 2 │ 200 │ +/// ├──────────────────┤ streamed side row 2. ├──────────────────┤ +/// 3 │ 200 │ 3 │ 500 │ /// ├──────────────────┤ └──────────────────┘ -/// 4 │ 300 │ -/// ├──────────────────┤ +/// 4 │ 300 │ +/// ├──────────────────┤ /// 5 │ 400 │ -/// └──────────────────┘ +/// └──────────────────┘ /// ``` /// /// ## Existence Joins (Semi, Anti, Mark) @@ -202,10 +206,10 @@ use crate::{ /// 1 │ 100 │ 1 │ 500 │ /// ├──────────────────┤ ├──────────────────┤ /// 2 │ 200 │ 2 │ 200 │ -/// ├──────────────────┤ ├──────────────────┤ +/// ├──────────────────┤ ├──────────────────┤ /// 3 │ 200 │ 3 │ 300 │ /// ├──────────────────┤ └──────────────────┘ -/// 4 │ 300 │ ─┐ +/// 4 │ 300 │ ─┐ /// ├──────────────────┤ | We emit matches for row 4 - 5 /// 5 │ 400 │ ─┘ on the buffered side. /// └──────────────────┘ @@ -236,11 +240,11 @@ use crate::{ /// /// # Mark Join: /// Sorts the probe side, then computes the min/max range of the probe keys and scans the buffered side only -/// within that range. +/// within that range. /// Complexity: `O(|S| + scan(R[range]))`. /// /// ## Nested Loop Join -/// Compares every row from `S` with every row from `R`. +/// Compares every row from `S` with every row from `R`. /// Complexity: `O(|S| * |R|)`. /// /// ## Nested Loop Join @@ -273,13 +277,12 @@ pub struct PiecewiseMergeJoinExec { left_child_plan_required_order: LexOrdering, /// The right sort order, descending for `<`, `<=` operations + ascending for `>`, `>=` operations /// Unsorted for mark joins - #[expect(dead_code)] right_batch_required_orders: LexOrdering, /// This determines the sort order of all join columns used in sorting the stream and buffered execution plans. sort_options: SortOptions, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, /// Number of partitions to process num_partitions: usize, } @@ -373,7 +376,7 @@ impl PiecewiseMergeJoinExec { left_child_plan_required_order, right_batch_required_orders, sort_options, - cache, + cache: Arc::new(cache), num_partitions, }) } @@ -466,6 +469,31 @@ impl PiecewiseMergeJoinExec { pub fn swap_inputs(&self) -> Result> { todo!() } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + let buffered = children.swap_remove(0); + let streamed = children.swap_remove(0); + Self { + buffered, + streamed, + on: self.on.clone(), + operator: self.operator, + join_type: self.join_type, + schema: Arc::clone(&self.schema), + left_child_plan_required_order: self.left_child_plan_required_order.clone(), + right_batch_required_orders: self.right_batch_required_orders.clone(), + sort_options: self.sort_options, + cache: Arc::clone(&self.cache), + num_partitions: self.num_partitions, + + // Re-set state. + metrics: ExecutionPlanMetricsSet::new(), + buffered_fut: Default::default(), + } + } } impl ExecutionPlan for PiecewiseMergeJoinExec { @@ -477,7 +505,7 @@ impl ExecutionPlan for PiecewiseMergeJoinExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -511,6 +539,7 @@ impl ExecutionPlan for PiecewiseMergeJoinExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); match &children[..] { [left, right] => Ok(Arc::new(PiecewiseMergeJoinExec::try_new( Arc::clone(left), @@ -527,6 +556,13 @@ impl ExecutionPlan for PiecewiseMergeJoinExec { } } + fn reset_state(self: Arc) -> Result> { + Ok(Arc::new(self.with_new_children_and_same_properties(vec![ + Arc::clone(&self.buffered), + Arc::clone(&self.streamed), + ]))) + } + fn execute( &self, partition: usize, diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index ae7a5fa764bcc..be4c83fc2d35c 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -23,7 +23,9 @@ use std::any::Any; use std::fmt::Formatter; use std::sync::Arc; -use crate::execution_plan::{EmissionType, boundedness_from_children}; +use crate::execution_plan::{ + EmissionType, boundedness_from_children, has_same_children_properties, +}; use crate::expressions::PhysicalSortExpr; use crate::joins::sort_merge_join::metrics::SortMergeJoinMetrics; use crate::joins::sort_merge_join::stream::SortMergeJoinStream; @@ -39,7 +41,7 @@ use crate::projection::{ }; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, - PlanProperties, SendableRecordBatchStream, Statistics, + PlanProperties, SendableRecordBatchStream, Statistics, check_if_same_properties, }; use arrow::compute::SortOptions; @@ -127,7 +129,7 @@ pub struct SortMergeJoinExec { /// Defines the null equality for the join. pub null_equality: NullEquality, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl SortMergeJoinExec { @@ -198,7 +200,7 @@ impl SortMergeJoinExec { right_sort_exprs, sort_options, null_equality, - cache, + cache: Arc::new(cache), }) } @@ -340,6 +342,20 @@ impl SortMergeJoinExec { reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema()) } } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + let left = children.swap_remove(0); + let right = children.swap_remove(0); + Self { + left, + right, + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for SortMergeJoinExec { @@ -405,7 +421,7 @@ impl ExecutionPlan for SortMergeJoinExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -440,6 +456,7 @@ impl ExecutionPlan for SortMergeJoinExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); match &children[..] { [left, right] => Ok(Arc::new(SortMergeJoinExec::try_new( Arc::clone(left), diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 1f6bc703a0300..bfb8df1df8095 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -32,8 +32,11 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::vec; +use crate::check_if_same_properties; use crate::common::SharedMemoryReservation; -use crate::execution_plan::{boundedness_from_children, emission_type_from_children}; +use crate::execution_plan::{ + boundedness_from_children, emission_type_from_children, has_same_children_properties, +}; use crate::joins::stream_join_utils::{ PruningJoinHashMap, SortedFilterExpr, StreamJoinMetrics, calculate_filter_expr_intervals, combine_two_batches, @@ -197,7 +200,7 @@ pub struct SymmetricHashJoinExec { /// Partition Mode mode: StreamJoinPartitionMode, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl SymmetricHashJoinExec { @@ -253,7 +256,7 @@ impl SymmetricHashJoinExec { left_sort_exprs, right_sort_exprs, mode, - cache, + cache: Arc::new(cache), }) } @@ -360,6 +363,20 @@ impl SymmetricHashJoinExec { } Ok(false) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + let left = children.swap_remove(0); + let right = children.swap_remove(0); + Self { + left, + right, + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for SymmetricHashJoinExec { @@ -411,7 +428,7 @@ impl ExecutionPlan for SymmetricHashJoinExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -453,6 +470,7 @@ impl ExecutionPlan for SymmetricHashJoinExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new(SymmetricHashJoinExec::try_new( Arc::clone(&children[0]), Arc::clone(&children[1]), diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index a9243fe04e28d..bca55df192e99 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1674,7 +1674,7 @@ fn swap_reverting_projection( pub fn swap_join_projection( left_schema_len: usize, right_schema_len: usize, - projection: Option<&Vec>, + projection: Option<&[usize]>, join_type: &JoinType, ) -> Option> { match join_type { @@ -1685,7 +1685,7 @@ pub fn swap_join_projection( | JoinType::RightAnti | JoinType::RightSemi | JoinType::LeftMark - | JoinType::RightMark => projection.cloned(), + | JoinType::RightMark => projection.map(ToOwned::to_owned), _ => projection.map(|p| { p.iter() .map(|i| { diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 05d6882821477..9925567a6c827 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -27,8 +27,13 @@ use super::{ DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; -use crate::execution_plan::{Boundedness, CardinalityEffect}; -use crate::{DisplayFormatType, Distribution, ExecutionPlan, Partitioning}; +use crate::execution_plan::{ + Boundedness, CardinalityEffect, has_same_children_properties, +}; +use crate::{ + DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + check_if_same_properties, +}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -50,7 +55,7 @@ pub struct GlobalLimitExec { fetch: Option, /// Execution metrics metrics: ExecutionPlanMetricsSet, - cache: PlanProperties, + cache: Arc, } impl GlobalLimitExec { @@ -62,7 +67,7 @@ impl GlobalLimitExec { skip, fetch, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), } } @@ -91,6 +96,17 @@ impl GlobalLimitExec { Boundedness::Bounded, ) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for GlobalLimitExec { @@ -129,7 +145,7 @@ impl ExecutionPlan for GlobalLimitExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -151,10 +167,11 @@ impl ExecutionPlan for GlobalLimitExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new(GlobalLimitExec::new( - Arc::clone(&children[0]), + children.swap_remove(0), self.skip, self.fetch, ))) @@ -214,7 +231,7 @@ impl ExecutionPlan for GlobalLimitExec { } /// LocalLimitExec applies a limit to a single partition -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct LocalLimitExec { /// Input execution plan input: Arc, @@ -222,7 +239,7 @@ pub struct LocalLimitExec { fetch: usize, /// Execution metrics metrics: ExecutionPlanMetricsSet, - cache: PlanProperties, + cache: Arc, } impl LocalLimitExec { @@ -233,7 +250,7 @@ impl LocalLimitExec { input, fetch, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), } } @@ -257,6 +274,17 @@ impl LocalLimitExec { Boundedness::Bounded, ) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for LocalLimitExec { @@ -286,7 +314,7 @@ impl ExecutionPlan for LocalLimitExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -306,6 +334,7 @@ impl ExecutionPlan for LocalLimitExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); match children.len() { 1 => Ok(Arc::new(LocalLimitExec::new( Arc::clone(&children[0]), diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 4a406ca648d57..5c684bec98f9b 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -161,7 +161,7 @@ pub struct LazyMemoryExec { /// Functions to generate batches for each partition batch_generators: Vec>>, /// Plan properties cache storing equivalence properties, partitioning, and execution mode - cache: PlanProperties, + cache: Arc, /// Execution metrics metrics: ExecutionPlanMetricsSet, } @@ -200,7 +200,8 @@ impl LazyMemoryExec { EmissionType::Incremental, boundedness, ) - .with_scheduling_type(SchedulingType::Cooperative); + .with_scheduling_type(SchedulingType::Cooperative) + .into(); Ok(Self { schema, @@ -215,9 +216,9 @@ impl LazyMemoryExec { match projection.as_ref() { Some(columns) => { let projected = Arc::new(self.schema.project(columns).unwrap()); - self.cache = self.cache.with_eq_properties(EquivalenceProperties::new( - Arc::clone(&projected), - )); + Arc::make_mut(&mut self.cache).set_eq_properties( + EquivalenceProperties::new(Arc::clone(&projected)), + ); self.schema = projected; self.projection = projection; self @@ -236,12 +237,12 @@ impl LazyMemoryExec { partition_count, generator_count ); - self.cache.partitioning = partitioning; + Arc::make_mut(&mut self.cache).partitioning = partitioning; Ok(()) } pub fn add_ordering(&mut self, ordering: impl IntoIterator) { - self.cache + Arc::make_mut(&mut self.cache) .eq_properties .add_orderings(std::iter::once(ordering)); } @@ -306,7 +307,7 @@ impl ExecutionPlan for LazyMemoryExec { Arc::clone(&self.schema) } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -365,7 +366,7 @@ impl ExecutionPlan for LazyMemoryExec { Ok(Arc::new(LazyMemoryExec { schema: Arc::clone(&self.schema), batch_generators: generators, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), metrics: ExecutionPlanMetricsSet::new(), projection: self.projection.clone(), })) diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index 4d00b73cff39c..f95b10771c02f 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -43,7 +43,7 @@ pub struct PlaceholderRowExec { schema: SchemaRef, /// Number of partitions partitions: usize, - cache: PlanProperties, + cache: Arc, } impl PlaceholderRowExec { @@ -54,7 +54,7 @@ impl PlaceholderRowExec { PlaceholderRowExec { schema, partitions, - cache, + cache: Arc::new(cache), } } @@ -63,7 +63,7 @@ impl PlaceholderRowExec { self.partitions = partitions; // Update output partitioning when updating partitions: let output_partitioning = Self::output_partitioning_helper(self.partitions); - self.cache = self.cache.with_partitioning(output_partitioning); + Arc::make_mut(&mut self.cache).partitioning = output_partitioning; self } @@ -132,7 +132,7 @@ impl ExecutionPlan for PlaceholderRowExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index e8608f17a1b20..c377993ee9799 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -26,13 +26,13 @@ use super::{ DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, SortOrderPushdownResult, Statistics, }; -use crate::execution_plan::CardinalityEffect; +use crate::execution_plan::{CardinalityEffect, has_same_children_properties}; use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, }; use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef}; -use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr}; +use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr, check_if_same_properties}; use std::any::Any; use std::collections::HashMap; use std::pin::Pin; @@ -77,7 +77,7 @@ pub struct ProjectionExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl ProjectionExec { @@ -152,7 +152,7 @@ impl ProjectionExec { projector, input, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), }) } @@ -192,6 +192,17 @@ impl ProjectionExec { input.boundedness(), )) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for ProjectionExec { @@ -245,7 +256,7 @@ impl ExecutionPlan for ProjectionExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -277,8 +288,9 @@ impl ExecutionPlan for ProjectionExec { self: Arc, mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); ProjectionExec::try_new( - self.projector.projection().clone(), + self.projector.projection().into_iter().cloned(), children.swap_remove(0), ) .map(|p| Arc::new(p) as _) diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 936a02581e89c..f8847cbacefb5 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -74,7 +74,7 @@ pub struct RecursiveQueryExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl RecursiveQueryExec { @@ -97,7 +97,7 @@ impl RecursiveQueryExec { is_distinct, work_table, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), }) } @@ -143,7 +143,7 @@ impl ExecutionPlan for RecursiveQueryExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index d50404c8fc1e8..7b1c57f66134e 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -31,7 +31,9 @@ use super::{ DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream, }; use crate::coalesce::LimitedBatchCoalescer; -use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; +use crate::execution_plan::{ + CardinalityEffect, EvaluationType, SchedulingType, has_same_children_properties, +}; use crate::hash_utils::create_hashes; use crate::metrics::{BaselineMetrics, SpillMetrics}; use crate::projection::{ProjectionExec, all_columns, make_with_child, update_expr}; @@ -39,7 +41,10 @@ use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::spill::spill_manager::SpillManager; use crate::spill::spill_pool::{self, SpillPoolWriter}; use crate::stream::RecordBatchStreamAdapter; -use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics}; +use crate::{ + DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics, + check_if_same_properties, +}; use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions}; use arrow::compute::take_arrays; @@ -744,7 +749,7 @@ pub struct RepartitionExec { /// `SortPreservingRepartitionExec`, false means `RepartitionExec`. preserve_order: bool, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } #[derive(Debug, Clone)] @@ -813,6 +818,18 @@ impl RepartitionExec { pub fn name(&self) -> &str { "RepartitionExec" } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + state: Default::default(), + ..Self::clone(self) + } + } } impl DisplayAs for RepartitionExec { @@ -872,7 +889,7 @@ impl ExecutionPlan for RepartitionExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -884,6 +901,7 @@ impl ExecutionPlan for RepartitionExec { self: Arc, mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); let mut repartition = RepartitionExec::try_new( children.swap_remove(0), self.partitioning().clone(), @@ -1185,7 +1203,7 @@ impl ExecutionPlan for RepartitionExec { _config: &ConfigOptions, ) -> Result>> { use Partitioning::*; - let mut new_properties = self.cache.clone(); + let mut new_properties = PlanProperties::clone(&self.cache); new_properties.partitioning = match new_properties.partitioning { RoundRobinBatch(_) => RoundRobinBatch(target_partitions), Hash(hash, _) => Hash(hash, target_partitions), @@ -1196,7 +1214,7 @@ impl ExecutionPlan for RepartitionExec { state: Arc::clone(&self.state), metrics: self.metrics.clone(), preserve_order: self.preserve_order, - cache: new_properties, + cache: new_properties.into(), }))) } } @@ -1216,7 +1234,7 @@ impl RepartitionExec { state: Default::default(), metrics: ExecutionPlanMetricsSet::new(), preserve_order, - cache, + cache: Arc::new(cache), }) } @@ -1277,7 +1295,7 @@ impl RepartitionExec { // to maintain order self.input.output_partitioning().partition_count() > 1; let eq_properties = Self::eq_properties_helper(&self.input, self.preserve_order); - self.cache = self.cache.with_eq_properties(eq_properties); + Arc::make_mut(&mut self.cache).set_eq_properties(eq_properties); self } diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 73ba889c9e40b..8dd498f10368b 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -57,11 +57,13 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use crate::execution_plan::has_same_children_properties; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::sorts::sort::sort_batch; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, + check_if_same_properties, }; use arrow::compute::concat_batches; @@ -93,7 +95,7 @@ pub struct PartialSortExec { /// Fetch highest/lowest n results fetch: Option, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl PartialSortExec { @@ -114,7 +116,7 @@ impl PartialSortExec { metrics_set: ExecutionPlanMetricsSet::new(), preserve_partitioning, fetch: None, - cache, + cache: Arc::new(cache), } } @@ -132,12 +134,8 @@ impl PartialSortExec { /// input partitions producing a single, sorted partition. pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self { self.preserve_partitioning = preserve_partitioning; - self.cache = self - .cache - .with_partitioning(Self::output_partitioning_helper( - &self.input, - self.preserve_partitioning, - )); + Arc::make_mut(&mut self.cache).partitioning = + Self::output_partitioning_helper(&self.input, self.preserve_partitioning); self } @@ -207,6 +205,16 @@ impl PartialSortExec { input.boundedness(), )) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + ..Self::clone(self) + } + } } impl DisplayAs for PartialSortExec { @@ -255,7 +263,7 @@ impl ExecutionPlan for PartialSortExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -283,6 +291,7 @@ impl ExecutionPlan for PartialSortExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); let new_partial_sort = PartialSortExec::new( self.expr.clone(), Arc::clone(&children[0]), diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 3e8fdf1f3ed7e..b071c784951cd 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -27,7 +27,9 @@ use std::sync::Arc; use parking_lot::RwLock; use crate::common::spawn_buffered; -use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; +use crate::execution_plan::{ + Boundedness, CardinalityEffect, EmissionType, has_same_children_properties, +}; use crate::expressions::PhysicalSortExpr; use crate::filter_pushdown::{ ChildFilterDescription, FilterDescription, FilterPushdownPhase, @@ -951,7 +953,7 @@ pub struct SortExec { /// Normalized common sort prefix between the input and the sort expressions (only used with fetch) common_sort_prefix: Vec, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, /// Filter matching the state of the sort for dynamic filter pushdown. /// If `fetch` is `Some`, this will also be set and a TopK operator may be used. /// If `fetch` is `None`, this will be `None`. @@ -973,7 +975,7 @@ impl SortExec { preserve_partitioning, fetch: None, common_sort_prefix: sort_prefix, - cache, + cache: Arc::new(cache), filter: None, } } @@ -992,12 +994,8 @@ impl SortExec { /// input partitions producing a single, sorted partition. pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self { self.preserve_partitioning = preserve_partitioning; - self.cache = self - .cache - .with_partitioning(Self::output_partitioning_helper( - &self.input, - self.preserve_partitioning, - )); + Arc::make_mut(&mut self.cache).partitioning = + Self::output_partitioning_helper(&self.input, self.preserve_partitioning); self } @@ -1021,7 +1019,7 @@ impl SortExec { preserve_partitioning: self.preserve_partitioning, common_sort_prefix: self.common_sort_prefix.clone(), fetch: self.fetch, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), filter: self.filter.clone(), } } @@ -1034,12 +1032,12 @@ impl SortExec { /// operation since rows that are not going to be included /// can be dropped. pub fn with_fetch(&self, fetch: Option) -> Self { - let mut cache = self.cache.clone(); + let mut cache = PlanProperties::clone(&self.cache); // If the SortExec can emit incrementally (that means the sort requirements // and properties of the input match), the SortExec can generate its result // without scanning the entire input when a fetch value exists. let is_pipeline_friendly = matches!( - self.cache.emission_type, + cache.emission_type, EmissionType::Incremental | EmissionType::Both ); if fetch.is_some() && is_pipeline_friendly { @@ -1051,7 +1049,7 @@ impl SortExec { }); let mut new_sort = self.cloned(); new_sort.fetch = fetch; - new_sort.cache = cache; + new_sort.cache = cache.into(); new_sort.filter = filter; new_sort } @@ -1206,7 +1204,7 @@ impl ExecutionPlan for SortExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -1235,14 +1233,17 @@ impl ExecutionPlan for SortExec { let mut new_sort = self.cloned(); assert_eq!(children.len(), 1, "SortExec should have exactly one child"); new_sort.input = Arc::clone(&children[0]); - // Recompute the properties based on the new input since they may have changed - let (cache, sort_prefix) = Self::compute_properties( - &new_sort.input, - new_sort.expr.clone(), - new_sort.preserve_partitioning, - )?; - new_sort.cache = cache; - new_sort.common_sort_prefix = sort_prefix; + + if !has_same_children_properties(&self, &children)? { + // Recompute the properties based on the new input since they may have changed + let (cache, sort_prefix) = Self::compute_properties( + &new_sort.input, + new_sort.expr.clone(), + new_sort.preserve_partitioning, + )?; + new_sort.cache = Arc::new(cache); + new_sort.common_sort_prefix = sort_prefix; + } Ok(Arc::new(new_sort)) } @@ -1466,7 +1467,7 @@ mod tests { pub struct SortedUnboundedExec { schema: Schema, batch_size: u64, - cache: PlanProperties, + cache: Arc, } impl DisplayAs for SortedUnboundedExec { @@ -1506,7 +1507,7 @@ mod tests { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -2259,7 +2260,9 @@ mod tests { let source = SortedUnboundedExec { schema: schema.clone(), batch_size: 2, - cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())), + cache: Arc::new(SortedUnboundedExec::compute_properties(Arc::new( + schema.clone(), + ))), }; let mut plan = SortExec::new( [PhysicalSortExpr::new_default(Arc::new(Column::new( diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 0ddea90a98bf3..c1d7432126b09 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -28,6 +28,7 @@ use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, + check_if_same_properties, }; use datafusion_common::{Result, assert_eq_or_internal_err, internal_err}; @@ -35,7 +36,9 @@ use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements}; -use crate::execution_plan::{EvaluationType, SchedulingType}; +use crate::execution_plan::{ + EvaluationType, SchedulingType, has_same_children_properties, +}; use log::{debug, trace}; /// Sort preserving merge execution plan @@ -93,7 +96,7 @@ pub struct SortPreservingMergeExec { /// Optional number of rows to fetch. Stops producing rows after this fetch fetch: Option, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, /// Use round-robin selection of tied winners of loser tree /// /// See [`Self::with_round_robin_repartition`] for more information. @@ -109,7 +112,7 @@ impl SortPreservingMergeExec { expr, metrics: ExecutionPlanMetricsSet::new(), fetch: None, - cache, + cache: Arc::new(cache), enable_round_robin_repartition: true, } } @@ -180,6 +183,16 @@ impl SortPreservingMergeExec { .with_evaluation_type(drive) .with_scheduling_type(scheduling) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + ..Self::clone(self) + } + } } impl DisplayAs for SortPreservingMergeExec { @@ -225,7 +238,7 @@ impl ExecutionPlan for SortPreservingMergeExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -240,7 +253,7 @@ impl ExecutionPlan for SortPreservingMergeExec { expr: self.expr.clone(), metrics: self.metrics.clone(), fetch: limit, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), enable_round_robin_repartition: true, })) } @@ -267,10 +280,11 @@ impl ExecutionPlan for SortPreservingMergeExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new( - SortPreservingMergeExec::new(self.expr.clone(), Arc::clone(&children[0])) + SortPreservingMergeExec::new(self.expr.clone(), children.swap_remove(0)) .with_fetch(self.fetch), )) } @@ -1350,7 +1364,7 @@ mod tests { #[derive(Debug, Clone)] struct CongestedExec { schema: Schema, - cache: PlanProperties, + cache: Arc, congestion: Arc, } @@ -1386,7 +1400,7 @@ mod tests { fn as_any(&self) -> &dyn Any { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } fn children(&self) -> Vec<&Arc> { @@ -1479,7 +1493,7 @@ mod tests { }; let source = CongestedExec { schema: schema.clone(), - cache: properties, + cache: Arc::new(properties), congestion: Arc::new(Congestion::new(partition_count)), }; let spm = SortPreservingMergeExec::new( diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index c8b8d95718cb8..1535482374110 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -67,7 +67,7 @@ pub struct StreamingTableExec { projected_output_ordering: Vec, infinite: bool, limit: Option, - cache: PlanProperties, + cache: Arc, metrics: ExecutionPlanMetricsSet, } @@ -111,7 +111,7 @@ impl StreamingTableExec { projected_output_ordering, infinite, limit, - cache, + cache: Arc::new(cache), metrics: ExecutionPlanMetricsSet::new(), }) } @@ -236,7 +236,7 @@ impl ExecutionPlan for StreamingTableExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -335,7 +335,7 @@ impl ExecutionPlan for StreamingTableExec { projected_output_ordering: self.projected_output_ordering.clone(), infinite: self.infinite, limit, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), metrics: self.metrics.clone(), })) } diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index c6d0940c35480..4df4b14ce26df 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -75,7 +75,7 @@ pub struct TestMemoryExec { /// The maximum number of records to read from this plan. If `None`, /// all records after filtering are returned. fetch: Option, - cache: PlanProperties, + cache: Arc, } impl DisplayAs for TestMemoryExec { @@ -134,7 +134,7 @@ impl ExecutionPlan for TestMemoryExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -235,11 +235,11 @@ impl TestMemoryExec { schema: SchemaRef, projection: Option>, ) -> Result { - let projected_schema = project_schema(&schema, projection.as_ref())?; + let projected_schema = project_schema(&schema, projection.as_deref())?; Ok(Self { partitions: partitions.to_vec(), schema, - cache: PlanProperties::new( + cache: Arc::new(PlanProperties::new( EquivalenceProperties::new_with_orderings( Arc::clone(&projected_schema), Vec::::new(), @@ -247,7 +247,7 @@ impl TestMemoryExec { Partitioning::UnknownPartitioning(partitions.len()), EmissionType::Incremental, Boundedness::Bounded, - ), + )), projected_schema, projection, sort_information: vec![], @@ -265,7 +265,7 @@ impl TestMemoryExec { ) -> Result> { let mut source = Self::try_new(partitions, schema, projection)?; let cache = source.compute_properties(); - source.cache = cache; + source.cache = Arc::new(cache); Ok(Arc::new(source)) } @@ -273,7 +273,7 @@ impl TestMemoryExec { pub fn update_cache(source: &Arc) -> TestMemoryExec { let cache = source.compute_properties(); let mut source = (**source).clone(); - source.cache = cache; + source.cache = Arc::new(cache); source } @@ -342,7 +342,7 @@ impl TestMemoryExec { } self.sort_information = sort_information; - self.cache = self.compute_properties(); + self.cache = Arc::new(self.compute_properties()); Ok(self) } diff --git a/datafusion/physical-plan/src/test/exec.rs b/datafusion/physical-plan/src/test/exec.rs index 4507cccba05a9..a8b21f70f7760 100644 --- a/datafusion/physical-plan/src/test/exec.rs +++ b/datafusion/physical-plan/src/test/exec.rs @@ -125,7 +125,7 @@ pub struct MockExec { /// if true (the default), sends data using a separate task to ensure the /// batches are not available without this stream yielding first use_task: bool, - cache: PlanProperties, + cache: Arc, } impl MockExec { @@ -142,7 +142,7 @@ impl MockExec { data, schema, use_task: true, - cache, + cache: Arc::new(cache), } } @@ -192,7 +192,7 @@ impl ExecutionPlan for MockExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -299,7 +299,7 @@ pub struct BarrierExec { /// all streams wait on this barrier to produce barrier: Arc, - cache: PlanProperties, + cache: Arc, } impl BarrierExec { @@ -312,7 +312,7 @@ impl BarrierExec { data, schema, barrier, - cache, + cache: Arc::new(cache), } } @@ -364,7 +364,7 @@ impl ExecutionPlan for BarrierExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -429,7 +429,7 @@ impl ExecutionPlan for BarrierExec { /// A mock execution plan that errors on a call to execute #[derive(Debug)] pub struct ErrorExec { - cache: PlanProperties, + cache: Arc, } impl Default for ErrorExec { @@ -446,7 +446,9 @@ impl ErrorExec { true, )])); let cache = Self::compute_properties(schema); - Self { cache } + Self { + cache: Arc::new(cache), + } } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -487,7 +489,7 @@ impl ExecutionPlan for ErrorExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -517,7 +519,7 @@ impl ExecutionPlan for ErrorExec { pub struct StatisticsExec { stats: Statistics, schema: Arc, - cache: PlanProperties, + cache: Arc, } impl StatisticsExec { pub fn new(stats: Statistics, schema: Schema) -> Self { @@ -530,7 +532,7 @@ impl StatisticsExec { Self { stats, schema: Arc::new(schema), - cache, + cache: Arc::new(cache), } } @@ -577,7 +579,7 @@ impl ExecutionPlan for StatisticsExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -623,7 +625,7 @@ pub struct BlockingExec { /// Ref-counting helper to check if the plan and the produced stream are still in memory. refs: Arc<()>, - cache: PlanProperties, + cache: Arc, } impl BlockingExec { @@ -633,7 +635,7 @@ impl BlockingExec { Self { schema, refs: Default::default(), - cache, + cache: Arc::new(cache), } } @@ -684,7 +686,7 @@ impl ExecutionPlan for BlockingExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -766,7 +768,7 @@ pub struct PanicExec { /// Number of output partitions. Each partition will produce this /// many empty output record batches prior to panicking batches_until_panics: Vec, - cache: PlanProperties, + cache: Arc, } impl PanicExec { @@ -778,7 +780,7 @@ impl PanicExec { Self { schema, batches_until_panics, - cache, + cache: Arc::new(cache), } } @@ -830,7 +832,7 @@ impl ExecutionPlan for PanicExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index d27c81b968490..6c9fe33bad04b 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -32,9 +32,10 @@ use super::{ SendableRecordBatchStream, Statistics, metrics::{ExecutionPlanMetricsSet, MetricsSet}, }; +use crate::check_if_same_properties; use crate::execution_plan::{ InvariantLevel, boundedness_from_children, check_default_invariants, - emission_type_from_children, + emission_type_from_children, has_same_children_properties, }; use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase}; use crate::metrics::BaselineMetrics; @@ -100,7 +101,7 @@ pub struct UnionExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl UnionExec { @@ -118,7 +119,7 @@ impl UnionExec { UnionExec { inputs, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), } } @@ -147,7 +148,7 @@ impl UnionExec { Ok(Arc::new(UnionExec { inputs, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), })) } } @@ -183,6 +184,17 @@ impl UnionExec { boundedness_from_children(inputs), )) } + + fn with_new_children_and_same_properties( + &self, + children: Vec>, + ) -> Self { + Self { + inputs: children, + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for UnionExec { @@ -210,7 +222,7 @@ impl ExecutionPlan for UnionExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -259,6 +271,7 @@ impl ExecutionPlan for UnionExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); UnionExec::try_new(children) } @@ -411,7 +424,7 @@ pub struct InterleaveExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl InterleaveExec { @@ -425,7 +438,7 @@ impl InterleaveExec { Ok(InterleaveExec { inputs, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), }) } @@ -447,6 +460,17 @@ impl InterleaveExec { boundedness_from_children(inputs), )) } + + fn with_new_children_and_same_properties( + &self, + children: Vec>, + ) -> Self { + Self { + inputs: children, + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for InterleaveExec { @@ -474,7 +498,7 @@ impl ExecutionPlan for InterleaveExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -495,6 +519,7 @@ impl ExecutionPlan for InterleaveExec { can_interleave(children.iter()), "Can not create InterleaveExec: new children can not be interleaved" ); + check_if_same_properties!(self, children); Ok(Arc::new(InterleaveExec::try_new(children)?)) } diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 5fef754e80780..d8a8937c46b5e 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -26,9 +26,10 @@ use super::metrics::{ RecordOutput, }; use super::{DisplayAs, ExecutionPlanProperties, PlanProperties}; +use crate::execution_plan::has_same_children_properties; use crate::{ DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream, - SendableRecordBatchStream, + SendableRecordBatchStream, check_if_same_properties, }; use arrow::array::{ @@ -74,7 +75,7 @@ pub struct UnnestExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl UnnestExec { @@ -100,7 +101,7 @@ impl UnnestExec { struct_column_indices, options, metrics: Default::default(), - cache, + cache: Arc::new(cache), }) } @@ -193,6 +194,17 @@ impl UnnestExec { pub fn options(&self) -> &UnnestOptions { &self.options } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for UnnestExec { @@ -221,7 +233,7 @@ impl ExecutionPlan for UnnestExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -231,10 +243,11 @@ impl ExecutionPlan for UnnestExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new(UnnestExec::new( - Arc::clone(&children[0]), + children.swap_remove(0), self.list_column_indices.clone(), self.struct_column_indices.clone(), Arc::clone(&self.schema), diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 987a400ec369e..9dd61b2170cf6 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -28,6 +28,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use super::utils::create_schema; +use crate::execution_plan::has_same_children_properties; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::windows::{ calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs, @@ -36,7 +37,7 @@ use crate::windows::{ use crate::{ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, InputOrderMode, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, Statistics, WindowExpr, + SendableRecordBatchStream, Statistics, WindowExpr, check_if_same_properties, }; use arrow::compute::take_record_batch; @@ -93,7 +94,7 @@ pub struct BoundedWindowAggExec { // See `get_ordered_partition_by_indices` for more details. ordered_partition_by_indices: Vec, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, /// If `can_rerepartition` is false, partition_keys is always empty. can_repartition: bool, } @@ -134,7 +135,7 @@ impl BoundedWindowAggExec { metrics: ExecutionPlanMetricsSet::new(), input_order_mode, ordered_partition_by_indices, - cache, + cache: Arc::new(cache), can_repartition, }) } @@ -248,6 +249,17 @@ impl BoundedWindowAggExec { total_byte_size: Precision::Absent, }) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for BoundedWindowAggExec { @@ -304,7 +316,7 @@ impl ExecutionPlan for BoundedWindowAggExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -339,6 +351,7 @@ impl ExecutionPlan for BoundedWindowAggExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new(BoundedWindowAggExec::try_new( self.window_expr.clone(), Arc::clone(&children[0]), diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index aa99f4f49885a..734a20800641b 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use super::utils::create_schema; -use crate::execution_plan::EmissionType; +use crate::execution_plan::{EmissionType, has_same_children_properties}; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::windows::{ calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs, @@ -32,7 +32,7 @@ use crate::windows::{ use crate::{ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, PhysicalExpr, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, Statistics, WindowExpr, + SendableRecordBatchStream, Statistics, WindowExpr, check_if_same_properties, }; use arrow::array::ArrayRef; @@ -65,7 +65,7 @@ pub struct WindowAggExec { // see `get_ordered_partition_by_indices` for more details. ordered_partition_by_indices: Vec, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, /// If `can_partition` is false, partition_keys is always empty. can_repartition: bool, } @@ -89,7 +89,7 @@ impl WindowAggExec { schema, metrics: ExecutionPlanMetricsSet::new(), ordered_partition_by_indices, - cache, + cache: Arc::new(cache), can_repartition, }) } @@ -158,6 +158,17 @@ impl WindowAggExec { .unwrap_or_else(Vec::new) } } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for WindowAggExec { @@ -206,7 +217,7 @@ impl ExecutionPlan for WindowAggExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -242,11 +253,12 @@ impl ExecutionPlan for WindowAggExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new(WindowAggExec::try_new( self.window_expr.clone(), - Arc::clone(&children[0]), + children.swap_remove(0), true, )?)) } diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index f1b9e3e88d123..67e7bb86a3d81 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -109,7 +109,7 @@ pub struct WorkTableExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl WorkTableExec { @@ -129,7 +129,7 @@ impl WorkTableExec { projection, work_table: Arc::new(WorkTable::new(name)), metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), }) } @@ -181,7 +181,7 @@ impl ExecutionPlan for WorkTableExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -263,7 +263,7 @@ impl ExecutionPlan for WorkTableExec { projection: self.projection.clone(), metrics: ExecutionPlanMetricsSet::new(), work_table, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), })) } } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 45868df4ced6c..c5a28b5cbb7d4 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -588,8 +588,8 @@ impl protobuf::PhysicalPlanNode { None }; - let filter = - FilterExec::try_new(predicate, input)?.with_projection(projection)?; + let filter = FilterExec::try_new(predicate, input)? + .with_projection(projection.map(Into::into))?; match filter_selectivity { Ok(filter_selectivity) => Ok(Arc::new( filter.with_default_selectivity(filter_selectivity)?, diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 31878e2e34b3d..39e591fdb7744 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -1825,7 +1825,7 @@ async fn roundtrip_projection_source() -> Result<()> { Arc::new(BinaryExpr::new(col("c", &schema)?, Operator::Eq, lit(1))), DataSourceExec::from_data_source(scan_config), )? - .with_projection(Some(vec![0, 1]))?, + .with_projection(Some(vec![0, 1].into()))?, ); roundtrip_test(filter) diff --git a/docs/source/library-user-guide/custom-table-providers.md b/docs/source/library-user-guide/custom-table-providers.md index 8e1dee9e843ac..b4da91e6e3eb9 100644 --- a/docs/source/library-user-guide/custom-table-providers.md +++ b/docs/source/library-user-guide/custom-table-providers.md @@ -108,7 +108,7 @@ impl ExecutionPlan for CustomExec { } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { unreachable!() } @@ -232,7 +232,7 @@ The `scan` method of the `TableProvider` returns a `Result &PlanProperties { +# fn properties(&self) -> &Arc { # unreachable!() # } # @@ -291,7 +291,7 @@ impl CustomExec { schema: SchemaRef, db: CustomDataSource, ) -> Self { - let projected_schema = project_schema(&schema, projections).unwrap(); + let projected_schema = project_schema(&schema, projections.map(AsRef::as_ref)).unwrap(); Self { db, projected_schema, @@ -424,7 +424,7 @@ This will allow you to use the custom table provider in DataFusion. For example, # } # # -# fn properties(&self) -> &PlanProperties { +# fn properties(&self) -> &Arc { # unreachable!() # } # @@ -483,7 +483,7 @@ This will allow you to use the custom table provider in DataFusion. For example, # schema: SchemaRef, # db: CustomDataSource, # ) -> Self { -# let projected_schema = project_schema(&schema, projections).unwrap(); +# let projected_schema = project_schema(&schema, projections.map(AsRef::as_ref)).unwrap(); # Self { # db, # projected_schema, From 1c29dfebc1668f90397532eb173a632a10556ac3 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 15 Jan 2026 14:38:26 -0500 Subject: [PATCH 2/2] Avoid changing project_schema signature --- .../custom_data_source/custom_datasource.rs | 3 +-- datafusion/catalog-listing/src/table.rs | 2 +- datafusion/common/src/stats.rs | 18 ++++++++++-------- datafusion/common/src/utils/mod.rs | 6 +++--- datafusion/core/src/datasource/empty.rs | 3 +-- .../core/tests/custom_sources_cases/mod.rs | 2 +- datafusion/datasource/src/memory.rs | 2 +- 7 files changed, 18 insertions(+), 18 deletions(-) diff --git a/datafusion-examples/examples/custom_data_source/custom_datasource.rs b/datafusion-examples/examples/custom_data_source/custom_datasource.rs index 9cda46fb9640a..7abb39e1a7130 100644 --- a/datafusion-examples/examples/custom_data_source/custom_datasource.rs +++ b/datafusion-examples/examples/custom_data_source/custom_datasource.rs @@ -202,8 +202,7 @@ impl CustomExec { schema: SchemaRef, db: CustomDataSource, ) -> Self { - let projected_schema = - project_schema(&schema, projections.map(AsRef::as_ref)).unwrap(); + let projected_schema = project_schema(&schema, projections).unwrap(); let cache = Self::compute_properties(projected_schema.clone()); Self { db, diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index e081280825135..38456944075fc 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -522,7 +522,7 @@ impl TableProvider for ListingTable { // if no files need to be read, return an `EmptyExec` if partitioned_file_lists.is_empty() { - let projected_schema = project_schema(&self.schema(), projection.as_deref())?; + let projected_schema = project_schema(&self.schema(), projection.as_ref())?; return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema)))); } diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index 751dff86efb66..b19c49772ad46 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -391,10 +391,11 @@ impl Statistics { /// For example, if we had statistics for columns `{"a", "b", "c"}`, /// projecting to `vec![2, 1]` would return statistics for columns `{"c", /// "b"}`. - pub fn project(mut self, projection: Option<&[usize]>) -> Self { + pub fn project(mut self, projection: Option>) -> Self { let Some(projection) = projection else { return self; }; + let projection = projection.as_ref(); #[expect(clippy::large_enum_variant)] enum Slot { @@ -1066,28 +1067,29 @@ mod tests { #[test] fn test_project_none() { - let stats = make_stats(vec![10, 20, 30]).project(None); + let projection: Option> = None; + let stats = make_stats(vec![10, 20, 30]).project(projection); assert_eq!(stats, make_stats(vec![10, 20, 30])); } #[test] fn test_project_empty() { - let projection: Option<&[_]> = Some(&[]); - let stats = make_stats(vec![10, 20, 30]).project(projection); + let projection = Some(vec![]); + let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref()); assert_eq!(stats, make_stats(vec![])); } #[test] fn test_project_swap() { - let projection: Option<&[_]> = Some(&[2, 1]); - let stats = make_stats(vec![10, 20, 30]).project(projection); + let projection = Some(vec![2, 1]); + let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref()); assert_eq!(stats, make_stats(vec![30, 20])); } #[test] fn test_project_repeated() { - let projection: Option<&[_]> = Some(&[1, 2, 1, 1, 0, 2]); - let stats = make_stats(vec![10, 20, 30]).project(projection); + let projection = Some(vec![1, 2, 1, 1, 0, 2]); + let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref()); assert_eq!(stats, make_stats(vec![20, 30, 20, 20, 10, 30])); } diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs index 545dc10af031a..60d5ee42ce8cb 100644 --- a/datafusion/common/src/utils/mod.rs +++ b/datafusion/common/src/utils/mod.rs @@ -59,7 +59,7 @@ use std::thread::available_parallelism; /// /// // Pick columns 'c' and 'b' /// let projection = Some(vec![2, 1]); -/// let projected_schema = project_schema(&schema, projection.as_deref()).unwrap(); +/// let projected_schema = project_schema(&schema, projection.as_ref()).unwrap(); /// /// let expected_schema = SchemaRef::new(Schema::new(vec![ /// Field::new("c", DataType::Utf8, true), @@ -70,10 +70,10 @@ use std::thread::available_parallelism; /// ``` pub fn project_schema( schema: &SchemaRef, - projection: Option<&[usize]>, + projection: Option>, ) -> Result { let schema = match projection { - Some(columns) => Arc::new(schema.project(columns)?), + Some(columns) => Arc::new(schema.project(columns.as_ref())?), None => Arc::clone(schema), }; Ok(schema) diff --git a/datafusion/core/src/datasource/empty.rs b/datafusion/core/src/datasource/empty.rs index 882abd921a155..5aeca92b1626d 100644 --- a/datafusion/core/src/datasource/empty.rs +++ b/datafusion/core/src/datasource/empty.rs @@ -77,8 +77,7 @@ impl TableProvider for EmptyTable { _limit: Option, ) -> Result> { // even though there is no data, projections apply - let projected_schema = - project_schema(&self.schema, projection.map(AsRef::as_ref))?; + let projected_schema = project_schema(&self.schema, projection)?; Ok(Arc::new( EmptyExec::new(projected_schema).with_partitions(self.partitions), )) diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index 5978b54edf296..6dd09ebd8832f 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -86,7 +86,7 @@ impl CustomExecutionPlan { fn new(projection: Option>) -> Self { let schema = TEST_CUSTOM_SCHEMA_REF!(); let schema = - project_schema(&schema, projection.as_deref()).expect("projected schema"); + project_schema(&schema, projection.as_ref()).expect("projected schema"); let cache = Self::compute_properties(schema); Self { projection, diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 3fc388cd3c4ad..1d12bb3200309 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -262,7 +262,7 @@ impl MemorySourceConfig { schema: SchemaRef, projection: Option>, ) -> Result { - let projected_schema = project_schema(&schema, projection.as_deref())?; + let projected_schema = project_schema(&schema, projection.as_ref())?; Ok(Self { partitions: partitions.to_vec(), schema,