diff --git a/datafusion-examples/examples/custom_data_source/custom_datasource.rs b/datafusion-examples/examples/custom_data_source/custom_datasource.rs index b276ae32cf247..9cda46fb9640a 100644 --- a/datafusion-examples/examples/custom_data_source/custom_datasource.rs +++ b/datafusion-examples/examples/custom_data_source/custom_datasource.rs @@ -192,7 +192,7 @@ impl TableProvider for CustomDataSource { struct CustomExec { db: CustomDataSource, projected_schema: SchemaRef, - cache: PlanProperties, + cache: Arc, } impl CustomExec { @@ -202,12 +202,13 @@ impl CustomExec { schema: SchemaRef, db: CustomDataSource, ) -> Self { - let projected_schema = project_schema(&schema, projections).unwrap(); + let projected_schema = + project_schema(&schema, projections.map(AsRef::as_ref)).unwrap(); let cache = Self::compute_properties(projected_schema.clone()); Self { db, projected_schema, - cache, + cache: Arc::new(cache), } } @@ -238,7 +239,7 @@ impl ExecutionPlan for CustomExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs b/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs index 48475acbb1542..6f377ea1ce3b9 100644 --- a/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs +++ b/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs @@ -199,7 +199,7 @@ impl ExternalBatchBufferer { struct BufferingExecutionPlan { schema: SchemaRef, input: Arc, - properties: PlanProperties, + properties: Arc, } impl BufferingExecutionPlan { @@ -233,7 +233,7 @@ impl ExecutionPlan for BufferingExecutionPlan { self.schema.clone() } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.properties } diff --git a/datafusion-examples/examples/proto/composed_extension_codec.rs b/datafusion-examples/examples/proto/composed_extension_codec.rs index f3910d461b6a8..b4f3d4f098996 100644 --- a/datafusion-examples/examples/proto/composed_extension_codec.rs +++ b/datafusion-examples/examples/proto/composed_extension_codec.rs @@ -106,7 +106,7 @@ impl ExecutionPlan for ParentExec { self } - fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + fn properties(&self) -> &Arc { unreachable!() } @@ -182,7 +182,7 @@ impl ExecutionPlan for ChildExec { self } - fn properties(&self) -> &datafusion::physical_plan::PlanProperties { + fn properties(&self) -> &Arc { unreachable!() } diff --git a/datafusion-examples/examples/relation_planner/table_sample.rs b/datafusion-examples/examples/relation_planner/table_sample.rs index 657432ef31362..895f2fdd4ff3a 100644 --- a/datafusion-examples/examples/relation_planner/table_sample.rs +++ b/datafusion-examples/examples/relation_planner/table_sample.rs @@ -618,7 +618,7 @@ pub struct SampleExec { upper_bound: f64, seed: u64, metrics: ExecutionPlanMetricsSet, - cache: PlanProperties, + cache: Arc, } impl SampleExec { @@ -656,7 +656,7 @@ impl SampleExec { upper_bound, seed, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), }) } @@ -686,7 +686,7 @@ impl ExecutionPlan for SampleExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 38456944075fc..e081280825135 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -522,7 +522,7 @@ impl TableProvider for ListingTable { // if no files need to be read, return an `EmptyExec` if partitioned_file_lists.is_empty() { - let projected_schema = project_schema(&self.schema(), projection.as_ref())?; + let projected_schema = project_schema(&self.schema(), projection.as_deref())?; return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema)))); } diff --git a/datafusion/catalog/src/memory/table.rs b/datafusion/catalog/src/memory/table.rs index 7865eb016bee1..484b5f805e547 100644 --- a/datafusion/catalog/src/memory/table.rs +++ b/datafusion/catalog/src/memory/table.rs @@ -549,7 +549,7 @@ fn evaluate_filters_to_mask( struct DmlResultExec { rows_affected: u64, schema: SchemaRef, - properties: PlanProperties, + properties: Arc, } impl DmlResultExec { @@ -570,7 +570,7 @@ impl DmlResultExec { Self { rows_affected, schema, - properties, + properties: Arc::new(properties), } } } @@ -604,7 +604,7 @@ impl ExecutionPlan for DmlResultExec { Arc::clone(&self.schema) } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.properties } diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index ba13ef392d912..751dff86efb66 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -391,7 +391,7 @@ impl Statistics { /// For example, if we had statistics for columns `{"a", "b", "c"}`, /// projecting to `vec![2, 1]` would return statistics for columns `{"c", /// "b"}`. - pub fn project(mut self, projection: Option<&Vec>) -> Self { + pub fn project(mut self, projection: Option<&[usize]>) -> Self { let Some(projection) = projection else { return self; }; @@ -1066,29 +1066,28 @@ mod tests { #[test] fn test_project_none() { - let projection = None; - let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref()); + let stats = make_stats(vec![10, 20, 30]).project(None); assert_eq!(stats, make_stats(vec![10, 20, 30])); } #[test] fn test_project_empty() { - let projection = Some(vec![]); - let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref()); + let projection: Option<&[_]> = Some(&[]); + let stats = make_stats(vec![10, 20, 30]).project(projection); assert_eq!(stats, make_stats(vec![])); } #[test] fn test_project_swap() { - let projection = Some(vec![2, 1]); - let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref()); + let projection: Option<&[_]> = Some(&[2, 1]); + let stats = make_stats(vec![10, 20, 30]).project(projection); assert_eq!(stats, make_stats(vec![30, 20])); } #[test] fn test_project_repeated() { - let projection = Some(vec![1, 2, 1, 1, 0, 2]); - let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref()); + let projection: Option<&[_]> = Some(&[1, 2, 1, 1, 0, 2]); + let stats = make_stats(vec![10, 20, 30]).project(projection); assert_eq!(stats, make_stats(vec![20, 30, 20, 20, 10, 30])); } diff --git a/datafusion/common/src/utils/mod.rs b/datafusion/common/src/utils/mod.rs index 03310a7bde193..545dc10af031a 100644 --- a/datafusion/common/src/utils/mod.rs +++ b/datafusion/common/src/utils/mod.rs @@ -59,7 +59,7 @@ use std::thread::available_parallelism; /// /// // Pick columns 'c' and 'b' /// let projection = Some(vec![2, 1]); -/// let projected_schema = project_schema(&schema, projection.as_ref()).unwrap(); +/// let projected_schema = project_schema(&schema, projection.as_deref()).unwrap(); /// /// let expected_schema = SchemaRef::new(Schema::new(vec![ /// Field::new("c", DataType::Utf8, true), @@ -70,7 +70,7 @@ use std::thread::available_parallelism; /// ``` pub fn project_schema( schema: &SchemaRef, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result { let schema = match projection { Some(columns) => Arc::new(schema.project(columns)?), diff --git a/datafusion/core/benches/reset_plan_states.rs b/datafusion/core/benches/reset_plan_states.rs index f2f81f755b96e..5afae7f43242d 100644 --- a/datafusion/core/benches/reset_plan_states.rs +++ b/datafusion/core/benches/reset_plan_states.rs @@ -166,6 +166,8 @@ fn run_reset_states(b: &mut criterion::Bencher, plan: &Arc) { /// making an independent instance of the execution plan to re-execute it, avoiding /// re-planning stage. fn bench_reset_plan_states(c: &mut Criterion) { + env_logger::init(); + let rt = Runtime::new().unwrap(); let ctx = SessionContext::new(); ctx.register_table( diff --git a/datafusion/core/src/datasource/empty.rs b/datafusion/core/src/datasource/empty.rs index 5aeca92b1626d..882abd921a155 100644 --- a/datafusion/core/src/datasource/empty.rs +++ b/datafusion/core/src/datasource/empty.rs @@ -77,7 +77,8 @@ impl TableProvider for EmptyTable { _limit: Option, ) -> Result> { // even though there is no data, projections apply - let projected_schema = project_schema(&self.schema, projection)?; + let projected_schema = + project_schema(&self.schema, projection.map(AsRef::as_ref))?; Ok(Arc::new( EmptyExec::new(projected_schema).with_partitions(self.partitions), )) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 2715ad98202cb..c145d837f98e8 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -3661,13 +3661,15 @@ mod tests { #[derive(Debug)] struct NoOpExecutionPlan { - cache: PlanProperties, + cache: Arc, } impl NoOpExecutionPlan { fn new(schema: SchemaRef) -> Self { let cache = Self::compute_properties(schema); - Self { cache } + Self { + cache: Arc::new(cache), + } } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -3705,7 +3707,7 @@ mod tests { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -3859,7 +3861,7 @@ digraph { fn children(&self) -> Vec<&Arc> { self.0.iter().collect::>() } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { unimplemented!() } fn execute( @@ -3908,7 +3910,7 @@ digraph { fn children(&self) -> Vec<&Arc> { unimplemented!() } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { unimplemented!() } fn execute( @@ -4029,7 +4031,7 @@ digraph { fn children(&self) -> Vec<&Arc> { vec![] } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { unimplemented!() } fn execute( diff --git a/datafusion/core/tests/custom_sources_cases/mod.rs b/datafusion/core/tests/custom_sources_cases/mod.rs index 8453615c2886b..5978b54edf296 100644 --- a/datafusion/core/tests/custom_sources_cases/mod.rs +++ b/datafusion/core/tests/custom_sources_cases/mod.rs @@ -79,16 +79,19 @@ struct CustomTableProvider; #[derive(Debug, Clone)] struct CustomExecutionPlan { projection: Option>, - cache: PlanProperties, + cache: Arc, } impl CustomExecutionPlan { fn new(projection: Option>) -> Self { let schema = TEST_CUSTOM_SCHEMA_REF!(); let schema = - project_schema(&schema, projection.as_ref()).expect("projected schema"); + project_schema(&schema, projection.as_deref()).expect("projected schema"); let cache = Self::compute_properties(schema); - Self { projection, cache } + Self { + projection, + cache: Arc::new(cache), + } } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -157,7 +160,7 @@ impl ExecutionPlan for CustomExecutionPlan { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs index ca1eaa1f958ea..7a624d0636530 100644 --- a/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs +++ b/datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs @@ -62,13 +62,16 @@ fn create_batch(value: i32, num_rows: usize) -> Result { #[derive(Debug)] struct CustomPlan { batches: Vec, - cache: PlanProperties, + cache: Arc, } impl CustomPlan { fn new(schema: SchemaRef, batches: Vec) -> Self { let cache = Self::compute_properties(schema); - Self { batches, cache } + Self { + batches, + cache: Arc::new(cache), + } } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -109,7 +112,7 @@ impl ExecutionPlan for CustomPlan { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/core/tests/custom_sources_cases/statistics.rs b/datafusion/core/tests/custom_sources_cases/statistics.rs index 820c2a470b376..6964d72eacffe 100644 --- a/datafusion/core/tests/custom_sources_cases/statistics.rs +++ b/datafusion/core/tests/custom_sources_cases/statistics.rs @@ -45,7 +45,7 @@ use async_trait::async_trait; struct StatisticsValidation { stats: Statistics, schema: Arc, - cache: PlanProperties, + cache: Arc, } impl StatisticsValidation { @@ -59,7 +59,7 @@ impl StatisticsValidation { Self { stats, schema, - cache, + cache: Arc::new(cache), } } @@ -158,7 +158,7 @@ impl ExecutionPlan for StatisticsValidation { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/core/tests/fuzz_cases/once_exec.rs b/datafusion/core/tests/fuzz_cases/once_exec.rs index 49e2caaa7417c..69edf9be1d825 100644 --- a/datafusion/core/tests/fuzz_cases/once_exec.rs +++ b/datafusion/core/tests/fuzz_cases/once_exec.rs @@ -32,7 +32,7 @@ use std::sync::{Arc, Mutex}; pub struct OnceExec { /// the results to send back stream: Mutex>, - cache: PlanProperties, + cache: Arc, } impl Debug for OnceExec { @@ -46,7 +46,7 @@ impl OnceExec { let cache = Self::compute_properties(stream.schema()); Self { stream: Mutex::new(Some(stream)), - cache, + cache: Arc::new(cache), } } @@ -83,7 +83,7 @@ impl ExecutionPlan for OnceExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 94ae82a9ad755..b3a3d29d070b1 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -120,7 +120,7 @@ macro_rules! assert_plan { struct SortRequiredExec { input: Arc, expr: LexOrdering, - cache: PlanProperties, + cache: Arc, } impl SortRequiredExec { @@ -132,7 +132,7 @@ impl SortRequiredExec { Self { input, expr: requirement, - cache, + cache: Arc::new(cache), } } @@ -174,7 +174,7 @@ impl ExecutionPlan for SortRequiredExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index d265841246867..cb1784943764f 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -482,7 +482,7 @@ fn test_filter_with_projection() { let plan = Arc::new( FilterExec::try_new(predicate, Arc::clone(&scan)) .unwrap() - .with_projection(Some(projection)) + .with_projection(Some(projection.into())) .unwrap(), ); @@ -507,7 +507,7 @@ fn test_filter_with_projection() { let plan = Arc::new( FilterExec::try_new(predicate, scan) .unwrap() - .with_projection(Some(projection)) + .with_projection(Some(projection.into())) .unwrap(), ); insta::assert_snapshot!( diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index 1afdc4823f0a4..18861792f90d1 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -447,7 +447,7 @@ impl ExecutionPlan for TestNode { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { self.input.properties() } diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs b/datafusion/core/tests/physical_optimizer/join_selection.rs index 9234a95591baa..137ffc4702a16 100644 --- a/datafusion/core/tests/physical_optimizer/join_selection.rs +++ b/datafusion/core/tests/physical_optimizer/join_selection.rs @@ -762,7 +762,10 @@ async fn test_hash_join_swap_on_joins_with_projections( "ProjectionExec won't be added above if HashJoinExec contains embedded projection", ); - assert_eq!(swapped_join.projection, Some(vec![0_usize])); + assert_eq!( + swapped_join.projection.as_ref().map(|p| p.to_vec()), + Some(vec![0_usize]) + ); assert_eq!(swapped.schema().fields.len(), 1); assert_eq!(swapped.schema().fields[0].name(), "small_col"); Ok(()) @@ -979,7 +982,7 @@ impl RecordBatchStream for UnboundedStream { pub struct UnboundedExec { batch_produce: Option, batch: RecordBatch, - cache: PlanProperties, + cache: Arc, } impl UnboundedExec { @@ -995,7 +998,7 @@ impl UnboundedExec { Self { batch_produce, batch, - cache, + cache: Arc::new(cache), } } @@ -1052,7 +1055,7 @@ impl ExecutionPlan for UnboundedExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -1091,7 +1094,7 @@ pub enum SourceType { pub struct StatisticsExec { stats: Statistics, schema: Arc, - cache: PlanProperties, + cache: Arc, } impl StatisticsExec { @@ -1105,7 +1108,7 @@ impl StatisticsExec { Self { stats, schema: Arc::new(schema), - cache, + cache: Arc::new(cache), } } @@ -1153,7 +1156,7 @@ impl ExecutionPlan for StatisticsExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index feac8190ffde4..f8c91ba272a9f 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -454,7 +454,7 @@ impl ExecutionPlan for RequirementsTestExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { self.input.properties() } @@ -825,7 +825,7 @@ pub fn sort_expr_named(name: &str, index: usize) -> PhysicalSortExpr { pub struct TestScan { schema: SchemaRef, output_ordering: Vec, - plan_properties: PlanProperties, + plan_properties: Arc, // Store the requested ordering for display requested_ordering: Option, } @@ -859,7 +859,7 @@ impl TestScan { Self { schema, output_ordering, - plan_properties, + plan_properties: Arc::new(plan_properties), requested_ordering: None, } } @@ -915,7 +915,7 @@ impl ExecutionPlan for TestScan { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.plan_properties } diff --git a/datafusion/core/tests/user_defined/insert_operation.rs b/datafusion/core/tests/user_defined/insert_operation.rs index 7ad00dece1b24..4d2a31ca1f960 100644 --- a/datafusion/core/tests/user_defined/insert_operation.rs +++ b/datafusion/core/tests/user_defined/insert_operation.rs @@ -122,20 +122,22 @@ impl TableProvider for TestInsertTableProvider { #[derive(Debug)] struct TestInsertExec { op: InsertOp, - plan_properties: PlanProperties, + plan_properties: Arc, } impl TestInsertExec { fn new(op: InsertOp) -> Self { Self { op, - plan_properties: PlanProperties::new( - EquivalenceProperties::new(make_count_schema()), - Partitioning::UnknownPartitioning(1), - EmissionType::Incremental, - Boundedness::Bounded, - ) - .with_scheduling_type(SchedulingType::Cooperative), + plan_properties: Arc::new( + PlanProperties::new( + EquivalenceProperties::new(make_count_schema()), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ) + .with_scheduling_type(SchedulingType::Cooperative), + ), } } } @@ -159,7 +161,7 @@ impl ExecutionPlan for TestInsertExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.plan_properties } diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index d53e076739608..c2533e73d2be9 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -653,13 +653,17 @@ struct TopKExec { input: Arc, /// The maximum number of values k: usize, - cache: PlanProperties, + cache: Arc, } impl TopKExec { fn new(input: Arc, k: usize) -> Self { let cache = Self::compute_properties(input.schema()); - Self { input, k, cache } + Self { + input, + k, + cache: Arc::new(cache), + } } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -704,7 +708,7 @@ impl ExecutionPlan for TopKExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index 1d12bb3200309..3fc388cd3c4ad 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -262,7 +262,7 @@ impl MemorySourceConfig { schema: SchemaRef, projection: Option>, ) -> Result { - let projected_schema = project_schema(&schema, projection.as_ref())?; + let projected_schema = project_schema(&schema, projection.as_deref())?; Ok(Self { partitions: partitions.to_vec(), schema, diff --git a/datafusion/datasource/src/sink.rs b/datafusion/datasource/src/sink.rs index 5460a0ffdc3df..0533a83b82896 100644 --- a/datafusion/datasource/src/sink.rs +++ b/datafusion/datasource/src/sink.rs @@ -89,7 +89,7 @@ pub struct DataSinkExec { count_schema: SchemaRef, /// Optional required sort order for output data. sort_order: Option, - cache: PlanProperties, + cache: Arc, } impl Debug for DataSinkExec { @@ -117,7 +117,7 @@ impl DataSinkExec { sink, count_schema: make_count_schema(), sort_order, - cache, + cache: Arc::new(cache), } } @@ -174,7 +174,7 @@ impl ExecutionPlan for DataSinkExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index a3892dfac9778..5d7a3d67b195d 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -74,8 +74,8 @@ use datafusion_physical_plan::filter_pushdown::{ /// ```text /// ┌─────────────────────┐ -----► execute path /// │ │ ┄┄┄┄┄► init path -/// │ DataSourceExec │ -/// │ │ +/// │ DataSourceExec │ +/// │ │ /// └───────▲─────────────┘ /// ┊ │ /// ┊ │ @@ -229,7 +229,7 @@ pub struct DataSourceExec { /// The source of the data -- for example, `FileScanConfig` or `MemorySourceConfig` data_source: Arc, /// Cached plan properties such as sort order - cache: PlanProperties, + cache: Arc, } impl DisplayAs for DataSourceExec { @@ -253,7 +253,7 @@ impl ExecutionPlan for DataSourceExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -323,7 +323,7 @@ impl ExecutionPlan for DataSourceExec { fn with_fetch(&self, limit: Option) -> Option> { let data_source = self.data_source.with_fetch(limit)?; - let cache = self.cache.clone(); + let cache = Arc::clone(&self.cache); Some(Arc::new(Self { data_source, cache })) } @@ -367,7 +367,8 @@ impl ExecutionPlan for DataSourceExec { let mut new_node = self.clone(); new_node.data_source = data_source; // Re-compute properties since we have new filters which will impact equivalence info - new_node.cache = Self::compute_properties(&new_node.data_source); + new_node.cache = + Arc::new(Self::compute_properties(&new_node.data_source)); Ok(FilterPushdownPropagation { filters: res.filters, @@ -403,7 +404,10 @@ impl DataSourceExec { // Default constructor for `DataSourceExec`, setting the `cooperative` flag to `true`. pub fn new(data_source: Arc) -> Self { let cache = Self::compute_properties(&data_source); - Self { data_source, cache } + Self { + data_source, + cache: Arc::new(cache), + } } /// Return the source object @@ -412,20 +416,20 @@ impl DataSourceExec { } pub fn with_data_source(mut self, data_source: Arc) -> Self { - self.cache = Self::compute_properties(&data_source); + self.cache = Arc::new(Self::compute_properties(&data_source)); self.data_source = data_source; self } /// Assign constraints pub fn with_constraints(mut self, constraints: Constraints) -> Self { - self.cache = self.cache.with_constraints(constraints); + Arc::make_mut(&mut self.cache).set_constraints(constraints); self } /// Assign output partitioning pub fn with_partitioning(mut self, partitioning: Partitioning) -> Self { - self.cache = self.cache.with_partitioning(partitioning); + Arc::make_mut(&mut self.cache).partitioning = partitioning; self } diff --git a/datafusion/ffi/src/execution_plan.rs b/datafusion/ffi/src/execution_plan.rs index c879b022067c3..ec8f7538fee1a 100644 --- a/datafusion/ffi/src/execution_plan.rs +++ b/datafusion/ffi/src/execution_plan.rs @@ -90,7 +90,7 @@ impl FFI_ExecutionPlan { unsafe extern "C" fn properties_fn_wrapper( plan: &FFI_ExecutionPlan, ) -> FFI_PlanProperties { - plan.inner().properties().into() + plan.inner().properties().as_ref().into() } unsafe extern "C" fn children_fn_wrapper( @@ -192,7 +192,7 @@ impl Drop for FFI_ExecutionPlan { pub struct ForeignExecutionPlan { name: String, plan: FFI_ExecutionPlan, - properties: PlanProperties, + properties: Arc, children: Vec>, } @@ -244,7 +244,7 @@ impl TryFrom<&FFI_ExecutionPlan> for Arc { let plan = ForeignExecutionPlan { name, plan: plan.clone(), - properties, + properties: Arc::new(properties), children, }; @@ -262,7 +262,7 @@ impl ExecutionPlan for ForeignExecutionPlan { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.properties } @@ -278,7 +278,7 @@ impl ExecutionPlan for ForeignExecutionPlan { plan: self.plan.clone(), name: self.name.clone(), children, - properties: self.properties.clone(), + properties: Arc::clone(&self.properties), })) } @@ -305,19 +305,19 @@ pub(crate) mod tests { #[derive(Debug)] pub struct EmptyExec { - props: PlanProperties, + props: Arc, children: Vec>, } impl EmptyExec { pub fn new(schema: arrow::datatypes::SchemaRef) -> Self { Self { - props: PlanProperties::new( + props: Arc::new(PlanProperties::new( datafusion::physical_expr::EquivalenceProperties::new(schema), Partitioning::UnknownPartitioning(3), EmissionType::Incremental, Boundedness::Bounded, - ), + )), children: Vec::default(), } } @@ -342,7 +342,7 @@ pub(crate) mod tests { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.props } @@ -355,7 +355,7 @@ pub(crate) mod tests { children: Vec>, ) -> Result> { Ok(Arc::new(EmptyExec { - props: self.props.clone(), + props: Arc::clone(&self.props), children, })) } diff --git a/datafusion/ffi/src/tests/async_provider.rs b/datafusion/ffi/src/tests/async_provider.rs index 6149736c58555..8370cf19e6589 100644 --- a/datafusion/ffi/src/tests/async_provider.rs +++ b/datafusion/ffi/src/tests/async_provider.rs @@ -162,7 +162,7 @@ impl Drop for AsyncTableProvider { #[derive(Debug)] struct AsyncTestExecutionPlan { - properties: datafusion_physical_plan::PlanProperties, + properties: Arc, batch_request: mpsc::Sender, batch_receiver: broadcast::Receiver>, } @@ -173,12 +173,12 @@ impl AsyncTestExecutionPlan { batch_receiver: broadcast::Receiver>, ) -> Self { Self { - properties: datafusion_physical_plan::PlanProperties::new( + properties: Arc::new(datafusion_physical_plan::PlanProperties::new( EquivalenceProperties::new(super::create_test_schema()), Partitioning::UnknownPartitioning(3), datafusion_physical_plan::execution_plan::EmissionType::Incremental, datafusion_physical_plan::execution_plan::Boundedness::Bounded, - ), + )), batch_request, batch_receiver, } @@ -194,7 +194,7 @@ impl ExecutionPlan for AsyncTestExecutionPlan { self } - fn properties(&self) -> &datafusion_physical_plan::PlanProperties { + fn properties(&self) -> &Arc { &self.properties } diff --git a/datafusion/physical-expr/src/equivalence/properties/mod.rs b/datafusion/physical-expr/src/equivalence/properties/mod.rs index 70f97139f8af4..f643445a449ae 100644 --- a/datafusion/physical-expr/src/equivalence/properties/mod.rs +++ b/datafusion/physical-expr/src/equivalence/properties/mod.rs @@ -207,8 +207,13 @@ impl EquivalenceProperties { } /// Adds constraints to the properties. - pub fn with_constraints(mut self, constraints: Constraints) -> Self { + pub fn set_constraints(&mut self, constraints: Constraints) { self.constraints = constraints; + } + + /// Adds constraints to the properties. + pub fn with_constraints(mut self, constraints: Constraints) -> Self { + self.set_constraints(constraints); self } diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index 540fd620c92ce..f0e3ce4b79d48 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -125,7 +125,7 @@ impl From for (Arc, String) { /// indices. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ProjectionExprs { - exprs: Vec, + exprs: Arc<[ProjectionExpr]>, } impl std::fmt::Display for ProjectionExprs { @@ -137,14 +137,16 @@ impl std::fmt::Display for ProjectionExprs { impl From> for ProjectionExprs { fn from(value: Vec) -> Self { - Self { exprs: value } + Self { + exprs: value.into_iter().collect(), + } } } impl From<&[ProjectionExpr]> for ProjectionExprs { fn from(value: &[ProjectionExpr]) -> Self { Self { - exprs: value.to_vec(), + exprs: value.iter().cloned().collect(), } } } @@ -152,7 +154,7 @@ impl From<&[ProjectionExpr]> for ProjectionExprs { impl FromIterator for ProjectionExprs { fn from_iter>(exprs: T) -> Self { Self { - exprs: exprs.into_iter().collect::>(), + exprs: exprs.into_iter().collect(), } } } @@ -169,7 +171,7 @@ impl ProjectionExprs { I: IntoIterator, { Self { - exprs: exprs.into_iter().collect::>(), + exprs: exprs.into_iter().collect(), } } @@ -285,13 +287,13 @@ impl ProjectionExprs { { let exprs = self .exprs - .into_iter() - .map(|mut proj| { - proj.expr = f(proj.expr)?; - Ok(proj) + .iter() + .map(|proj| { + let expr = f(Arc::clone(&proj.expr))?; + Ok(ProjectionExpr::new(expr, proj.alias.clone())) }) - .collect::>>()?; - Ok(Self::new(exprs)) + .collect::>()?; + Ok(Self { exprs }) } /// Apply another projection on top of this projection, returning the combined projection. @@ -361,7 +363,7 @@ impl ProjectionExprs { /// applied on top of this projection. pub fn try_merge(&self, other: &ProjectionExprs) -> Result { let mut new_exprs = Vec::with_capacity(other.exprs.len()); - for proj_expr in &other.exprs { + for proj_expr in other.exprs.iter() { let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)? .ok_or_else(|| { internal_datafusion_err!( @@ -607,7 +609,7 @@ impl ProjectionExprs { ) -> Result { let mut column_statistics = vec![]; - for proj_expr in &self.exprs { + for proj_expr in self.exprs.iter() { let expr = &proj_expr.expr; let col_stats = if let Some(col) = expr.as_any().downcast_ref::() { std::mem::take(&mut stats.column_statistics[col.index()]) @@ -754,15 +756,6 @@ impl Projector { } } -impl IntoIterator for ProjectionExprs { - type Item = ProjectionExpr; - type IntoIter = std::vec::IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.exprs.into_iter() - } -} - /// The function operates in two modes: /// /// 1) When `sync_with_child` is `true`: diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index f3ec083efb240..957359e2f05c9 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -312,7 +312,7 @@ pub fn adjust_input_keys_ordering( filter.clone(), join_type, // TODO: although projection is not used in the join here, because projection pushdown is after enforce_distribution. Maybe we need to handle it later. Same as filter. - projection.clone(), + projection.clone().map(|p| p.to_vec()), PartitionMode::Partitioned, *null_equality, *null_aware, @@ -644,7 +644,7 @@ pub fn reorder_join_keys_to_inputs( new_join_on, filter.clone(), join_type, - projection.clone(), + projection.clone().map(|p| p.to_vec()), PartitionMode::Partitioned, *null_equality, *null_aware, diff --git a/datafusion/physical-optimizer/src/join_selection.rs b/datafusion/physical-optimizer/src/join_selection.rs index 7412d0ba97812..5c16f45ee72f1 100644 --- a/datafusion/physical-optimizer/src/join_selection.rs +++ b/datafusion/physical-optimizer/src/join_selection.rs @@ -197,7 +197,7 @@ pub(crate) fn try_collect_left( hash_join.on().to_vec(), hash_join.filter().cloned(), hash_join.join_type(), - hash_join.projection.clone(), + hash_join.projection.clone().map(|p| p.to_vec()), PartitionMode::CollectLeft, hash_join.null_equality(), hash_join.null_aware, @@ -210,7 +210,7 @@ pub(crate) fn try_collect_left( hash_join.on().to_vec(), hash_join.filter().cloned(), hash_join.join_type(), - hash_join.projection.clone(), + hash_join.projection.clone().map(|p| p.to_vec()), PartitionMode::CollectLeft, hash_join.null_equality(), hash_join.null_aware, @@ -260,7 +260,7 @@ pub(crate) fn partitioned_hash_join( hash_join.on().to_vec(), hash_join.filter().cloned(), hash_join.join_type(), - hash_join.projection.clone(), + hash_join.projection.clone().map(|p| p.to_vec()), partition_mode, hash_join.null_equality(), hash_join.null_aware, diff --git a/datafusion/physical-optimizer/src/output_requirements.rs b/datafusion/physical-optimizer/src/output_requirements.rs index 0dc6a25fbc0b7..9c4169ec654f8 100644 --- a/datafusion/physical-optimizer/src/output_requirements.rs +++ b/datafusion/physical-optimizer/src/output_requirements.rs @@ -98,7 +98,7 @@ pub struct OutputRequirementExec { input: Arc, order_requirement: Option, dist_requirement: Distribution, - cache: PlanProperties, + cache: Arc, fetch: Option, } @@ -114,7 +114,7 @@ impl OutputRequirementExec { input, order_requirement: requirements, dist_requirement, - cache, + cache: Arc::new(cache), fetch, } } @@ -200,7 +200,7 @@ impl ExecutionPlan for OutputRequirementExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-optimizer/src/update_aggr_exprs.rs b/datafusion/physical-optimizer/src/update_aggr_exprs.rs index c0aab4080da77..19d46c8e5e6e9 100644 --- a/datafusion/physical-optimizer/src/update_aggr_exprs.rs +++ b/datafusion/physical-optimizer/src/update_aggr_exprs.rs @@ -108,7 +108,7 @@ impl PhysicalOptimizerRule for OptimizeAggregateOrder { input.equivalence_properties(), )?; - let aggr_exec = aggr_exec.with_new_aggr_exprs(aggr_exprs); + let aggr_exec = aggr_exec.with_new_aggr_exprs(aggr_exprs.into()); Ok(Transformed::yes(Arc::new(aggr_exec) as _)) } else { diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 4dd9482ac4322..5219a2bd2afce 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -25,7 +25,9 @@ use crate::aggregates::{ no_grouping::AggregateStream, row_hash::GroupedHashAggregateStream, topk_stream::GroupedTopKAggregateStream, }; -use crate::execution_plan::{CardinalityEffect, EmissionType}; +use crate::execution_plan::{ + CardinalityEffect, EmissionType, has_same_children_properties, +}; use crate::filter_pushdown::{ ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, PushedDownPredicate, @@ -33,7 +35,7 @@ use crate::filter_pushdown::{ use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::{ DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode, - SendableRecordBatchStream, Statistics, + SendableRecordBatchStream, Statistics, check_if_same_properties, }; use datafusion_common::config::ConfigOptions; use datafusion_physical_expr::utils::collect_columns; @@ -510,9 +512,9 @@ pub struct AggregateExec { /// Group by expressions group_by: PhysicalGroupBy, /// Aggregate expressions - aggr_expr: Vec>, + aggr_expr: Arc<[Arc]>, /// FILTER (WHERE clause) expression for each aggregate expression - filter_expr: Vec>>, + filter_expr: Arc<[Option>]>, /// Set if the output of this aggregation is truncated by a upstream sort/limit clause limit: Option, /// Input plan, could be a partial aggregate or the input to the aggregate @@ -530,7 +532,7 @@ pub struct AggregateExec { required_input_ordering: Option, /// Describes how the input is ordered relative to the group by columns input_order_mode: InputOrderMode, - cache: PlanProperties, + cache: Arc, /// During initialization, if the plan supports dynamic filtering (see [`AggrDynFilter`]), /// it is set to `Some(..)` regardless of whether it can be pushed down to a child node. /// @@ -546,7 +548,7 @@ impl AggregateExec { /// Rewrites aggregate exec with new aggregate expressions. pub fn with_new_aggr_exprs( &self, - aggr_expr: Vec>, + aggr_expr: Arc<[Arc]>, ) -> Self { Self { aggr_expr, @@ -554,10 +556,10 @@ impl AggregateExec { required_input_ordering: self.required_input_ordering.clone(), metrics: ExecutionPlanMetricsSet::new(), input_order_mode: self.input_order_mode.clone(), - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), mode: self.mode, group_by: self.group_by.clone(), - filter_expr: self.filter_expr.clone(), + filter_expr: Arc::clone(&self.filter_expr), limit: self.limit, input: Arc::clone(&self.input), schema: Arc::clone(&self.schema), @@ -574,11 +576,11 @@ impl AggregateExec { required_input_ordering: self.required_input_ordering.clone(), metrics: ExecutionPlanMetricsSet::new(), input_order_mode: self.input_order_mode.clone(), - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), mode: self.mode, group_by: self.group_by.clone(), - aggr_expr: self.aggr_expr.clone(), - filter_expr: self.filter_expr.clone(), + aggr_expr: Arc::clone(&self.aggr_expr), + filter_expr: Arc::clone(&self.filter_expr), input: Arc::clone(&self.input), schema: Arc::clone(&self.schema), input_schema: Arc::clone(&self.input_schema), @@ -702,8 +704,8 @@ impl AggregateExec { let mut exec = AggregateExec { mode, group_by, - aggr_expr, - filter_expr, + aggr_expr: aggr_expr.into(), + filter_expr: filter_expr.into(), input, schema, input_schema, @@ -711,7 +713,7 @@ impl AggregateExec { required_input_ordering, limit: None, input_order_mode, - cache, + cache: Arc::new(cache), dynamic_filter: None, }; @@ -1060,6 +1062,17 @@ impl AggregateExec { _ => Precision::Absent, } } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for AggregateExec { @@ -1195,7 +1208,7 @@ impl ExecutionPlan for AggregateExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -1236,14 +1249,16 @@ impl ExecutionPlan for AggregateExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); + let mut me = AggregateExec::try_new_with_schema( self.mode, self.group_by.clone(), - self.aggr_expr.clone(), - self.filter_expr.clone(), - Arc::clone(&children[0]), + self.aggr_expr.to_vec(), + self.filter_expr.to_vec(), + children.swap_remove(0), Arc::clone(&self.input_schema), Arc::clone(&self.schema), )?; @@ -2281,14 +2296,17 @@ mod tests { struct TestYieldingExec { /// True if this exec should yield back to runtime the first time it is polled pub yield_first: bool, - cache: PlanProperties, + cache: Arc, } impl TestYieldingExec { fn new(yield_first: bool) -> Self { let schema = some_data().0; let cache = Self::compute_properties(schema); - Self { yield_first, cache } + Self { + yield_first, + cache: Arc::new(cache), + } } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -2329,7 +2347,7 @@ mod tests { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index a55d70ca6fb27..576fe25e07669 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -276,7 +276,7 @@ impl AggregateStream { partition: usize, ) -> Result { let agg_schema = Arc::clone(&agg.schema); - let agg_filter_expr = agg.filter_expr.clone(); + let agg_filter_expr = Arc::clone(&agg.filter_expr); let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition); let input = agg.input.execute(partition, Arc::clone(context))?; @@ -285,7 +285,7 @@ impl AggregateStream { let filter_expressions = match agg.mode { AggregateMode::Partial | AggregateMode::Single - | AggregateMode::SinglePartitioned => agg_filter_expr, + | AggregateMode::SinglePartitioned => agg_filter_expr.to_vec(), AggregateMode::Final | AggregateMode::FinalPartitioned => { vec![None; agg.aggr_expr.len()] } diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 1ae7202711112..0df66145aa357 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -466,7 +466,7 @@ impl GroupedHashAggregateStream { debug!("Creating GroupedHashAggregateStream"); let agg_schema = Arc::clone(&agg.schema); let agg_group_by = agg.group_by.clone(); - let agg_filter_expr = agg.filter_expr.clone(); + let agg_filter_expr = Arc::clone(&agg.filter_expr); let batch_size = context.session_config().batch_size(); let input = agg.input.execute(partition, Arc::clone(context))?; @@ -475,7 +475,7 @@ impl GroupedHashAggregateStream { let timer = baseline_metrics.elapsed_compute().timer(); - let aggregate_exprs = agg.aggr_expr.clone(); + let aggregate_exprs = Arc::clone(&agg.aggr_expr); // arguments for each aggregate, one vec of expressions per // aggregate @@ -494,7 +494,7 @@ impl GroupedHashAggregateStream { let filter_expressions = match agg.mode { AggregateMode::Partial | AggregateMode::Single - | AggregateMode::SinglePartitioned => agg_filter_expr, + | AggregateMode::SinglePartitioned => agg_filter_expr.to_vec(), AggregateMode::Final | AggregateMode::FinalPartitioned => { vec![None; agg.aggr_expr.len()] } diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 1fb8f93a38782..eca31ea0e194f 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -51,7 +51,7 @@ pub struct AnalyzeExec { pub(crate) input: Arc, /// The output schema for RecordBatches of this exec node schema: SchemaRef, - cache: PlanProperties, + cache: Arc, } impl AnalyzeExec { @@ -70,7 +70,7 @@ impl AnalyzeExec { metric_types, input, schema, - cache, + cache: Arc::new(cache), } } @@ -131,7 +131,7 @@ impl ExecutionPlan for AnalyzeExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/async_func.rs b/datafusion/physical-plan/src/async_func.rs index a61fd95949d1a..93701a25b3bf4 100644 --- a/datafusion/physical-plan/src/async_func.rs +++ b/datafusion/physical-plan/src/async_func.rs @@ -16,10 +16,12 @@ // under the License. use crate::coalesce::LimitedBatchCoalescer; +use crate::execution_plan::has_same_children_properties; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::stream::RecordBatchStreamAdapter; use crate::{ DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, + check_if_same_properties, }; use arrow::array::RecordBatch; use arrow_schema::{Fields, Schema, SchemaRef}; @@ -45,12 +47,12 @@ use std::task::{Context, Poll, ready}; /// /// The schema of the output of the AsyncFuncExec is: /// Input columns followed by one column for each async expression -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct AsyncFuncExec { /// The async expressions to evaluate async_exprs: Vec>, input: Arc, - cache: PlanProperties, + cache: Arc, metrics: ExecutionPlanMetricsSet, } @@ -84,7 +86,7 @@ impl AsyncFuncExec { Ok(Self { input, async_exprs, - cache, + cache: Arc::new(cache), metrics: ExecutionPlanMetricsSet::new(), }) } @@ -113,6 +115,17 @@ impl AsyncFuncExec { pub fn input(&self) -> &Arc { &self.input } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for AsyncFuncExec { @@ -149,7 +162,7 @@ impl ExecutionPlan for AsyncFuncExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -159,16 +172,17 @@ impl ExecutionPlan for AsyncFuncExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { assert_eq_or_internal_err!( children.len(), 1, "AsyncFuncExec wrong number of children" ); + check_if_same_properties!(self, children); Ok(Arc::new(AsyncFuncExec::try_new( self.async_exprs.clone(), - Arc::clone(&children[0]), + children.swap_remove(0), )?)) } diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index dfcd3cb0bcae7..27a521ec6bc5d 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -27,6 +27,7 @@ use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics}; use crate::projection::ProjectionExec; use crate::{ DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, + check_if_same_properties, }; use arrow::datatypes::SchemaRef; @@ -36,7 +37,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus}; -use crate::execution_plan::CardinalityEffect; +use crate::execution_plan::{CardinalityEffect, has_same_children_properties}; use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, @@ -71,7 +72,7 @@ pub struct CoalesceBatchesExec { fetch: Option, /// Execution metrics metrics: ExecutionPlanMetricsSet, - cache: PlanProperties, + cache: Arc, } #[expect(deprecated)] @@ -84,7 +85,7 @@ impl CoalesceBatchesExec { target_batch_size, fetch: None, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), } } @@ -115,6 +116,17 @@ impl CoalesceBatchesExec { input.boundedness(), ) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } #[expect(deprecated)] @@ -159,7 +171,7 @@ impl ExecutionPlan for CoalesceBatchesExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -177,10 +189,11 @@ impl ExecutionPlan for CoalesceBatchesExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new( - CoalesceBatchesExec::new(Arc::clone(&children[0]), self.target_batch_size) + CoalesceBatchesExec::new(children.swap_remove(0), self.target_batch_size) .with_fetch(self.fetch), )) } @@ -222,7 +235,7 @@ impl ExecutionPlan for CoalesceBatchesExec { target_batch_size: self.target_batch_size, fetch: limit, metrics: self.metrics.clone(), - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), })) } diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index d83f90eb3d8c1..8210a152b2e8f 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -27,11 +27,13 @@ use super::{ DisplayAs, ExecutionPlanProperties, PlanProperties, SendableRecordBatchStream, Statistics, }; -use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; +use crate::execution_plan::{ + CardinalityEffect, EvaluationType, SchedulingType, has_same_children_properties, +}; use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase}; use crate::projection::{ProjectionExec, make_with_child}; use crate::sort_pushdown::SortOrderPushdownResult; -use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; +use crate::{DisplayFormatType, ExecutionPlan, Partitioning, check_if_same_properties}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_common::config::ConfigOptions; @@ -47,7 +49,7 @@ pub struct CoalescePartitionsExec { input: Arc, /// Execution metrics metrics: ExecutionPlanMetricsSet, - cache: PlanProperties, + cache: Arc, /// Optional number of rows to fetch. Stops producing rows after this fetch pub(crate) fetch: Option, } @@ -59,7 +61,7 @@ impl CoalescePartitionsExec { CoalescePartitionsExec { input, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), fetch: None, } } @@ -100,6 +102,17 @@ impl CoalescePartitionsExec { .with_evaluation_type(drive) .with_scheduling_type(scheduling) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for CoalescePartitionsExec { @@ -135,7 +148,7 @@ impl ExecutionPlan for CoalescePartitionsExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -149,9 +162,10 @@ impl ExecutionPlan for CoalescePartitionsExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { - let mut plan = CoalescePartitionsExec::new(Arc::clone(&children[0])); + check_if_same_properties!(self, children); + let mut plan = CoalescePartitionsExec::new(children.swap_remove(0)); plan.fetch = self.fetch; Ok(Arc::new(plan)) } @@ -274,7 +288,7 @@ impl ExecutionPlan for CoalescePartitionsExec { input: Arc::clone(&self.input), fetch: limit, metrics: self.metrics.clone(), - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), })) } diff --git a/datafusion/physical-plan/src/common.rs b/datafusion/physical-plan/src/common.rs index 32dc60b56ad48..590f6f09e8b9e 100644 --- a/datafusion/physical-plan/src/common.rs +++ b/datafusion/physical-plan/src/common.rs @@ -181,7 +181,7 @@ pub fn compute_record_batch_statistics( /// Checks if the given projection is valid for the given schema. pub fn can_project( schema: &arrow::datatypes::SchemaRef, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result<()> { match projection { Some(columns) => { diff --git a/datafusion/physical-plan/src/coop.rs b/datafusion/physical-plan/src/coop.rs index a1fad86777408..6740ed43d35ec 100644 --- a/datafusion/physical-plan/src/coop.rs +++ b/datafusion/physical-plan/src/coop.rs @@ -82,14 +82,14 @@ use crate::filter_pushdown::{ use crate::projection::ProjectionExec; use crate::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, SortOrderPushdownResult, + SendableRecordBatchStream, SortOrderPushdownResult, check_if_same_properties, }; use arrow::record_batch::RecordBatch; use arrow_schema::Schema; use datafusion_common::{Result, Statistics, assert_eq_or_internal_err}; use datafusion_execution::TaskContext; -use crate::execution_plan::SchedulingType; +use crate::execution_plan::{SchedulingType, has_same_children_properties}; use crate::stream::RecordBatchStreamAdapter; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use futures::{Stream, StreamExt}; @@ -212,16 +212,15 @@ where #[derive(Debug, Clone)] pub struct CooperativeExec { input: Arc, - properties: PlanProperties, + properties: Arc, } impl CooperativeExec { /// Creates a new `CooperativeExec` operator that wraps the given input execution plan. pub fn new(input: Arc) -> Self { - let properties = input - .properties() - .clone() - .with_scheduling_type(SchedulingType::Cooperative); + let properties = PlanProperties::clone(input.properties()) + .with_scheduling_type(SchedulingType::Cooperative) + .into(); Self { input, properties } } @@ -230,6 +229,16 @@ impl CooperativeExec { pub fn input(&self) -> &Arc { &self.input } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + ..Self::clone(self) + } + } } impl DisplayAs for CooperativeExec { @@ -255,7 +264,7 @@ impl ExecutionPlan for CooperativeExec { self.input.schema() } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.properties } @@ -276,6 +285,7 @@ impl ExecutionPlan for CooperativeExec { 1, "CooperativeExec requires exactly one child" ); + check_if_same_properties!(self, children); Ok(Arc::new(CooperativeExec::new(children.swap_remove(0)))) } diff --git a/datafusion/physical-plan/src/display.rs b/datafusion/physical-plan/src/display.rs index 52c37a106b39e..d525f44541d5f 100644 --- a/datafusion/physical-plan/src/display.rs +++ b/datafusion/physical-plan/src/display.rs @@ -1153,7 +1153,7 @@ mod tests { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { unimplemented!() } diff --git a/datafusion/physical-plan/src/empty.rs b/datafusion/physical-plan/src/empty.rs index fcfbcfa3e8277..d3e62f91c0570 100644 --- a/datafusion/physical-plan/src/empty.rs +++ b/datafusion/physical-plan/src/empty.rs @@ -43,7 +43,7 @@ pub struct EmptyExec { schema: SchemaRef, /// Number of partitions partitions: usize, - cache: PlanProperties, + cache: Arc, } impl EmptyExec { @@ -53,7 +53,7 @@ impl EmptyExec { EmptyExec { schema, partitions: 1, - cache, + cache: Arc::new(cache), } } @@ -62,7 +62,7 @@ impl EmptyExec { self.partitions = partitions; // Changing partitions may invalidate output partitioning, so update it: let output_partitioning = Self::output_partitioning_helper(self.partitions); - self.cache = self.cache.with_partitioning(output_partitioning); + Arc::make_mut(&mut self.cache).partitioning = output_partitioning; self } @@ -114,7 +114,7 @@ impl ExecutionPlan for EmptyExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 8d72921d06735..4fa1367d17fce 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -128,7 +128,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { /// /// This information is available via methods on [`ExecutionPlanProperties`] /// trait, which is implemented for all `ExecutionPlan`s. - fn properties(&self) -> &PlanProperties; + fn properties(&self) -> &Arc; /// Returns an error if this individual node does not conform to its invariants. /// These invariants are typically only checked in debug mode. @@ -1047,12 +1047,17 @@ impl PlanProperties { self } - /// Overwrite equivalence properties with its new value. - pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self { + /// Set equivalence properties having mut reference. + pub fn set_eq_properties(&mut self, eq_properties: EquivalenceProperties) { // Changing equivalence properties also changes output ordering, so // make sure to overwrite it: self.output_ordering = eq_properties.output_ordering(); self.eq_properties = eq_properties; + } + + /// Overwrite equivalence properties with its new value. + pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) -> Self { + self.set_eq_properties(eq_properties); self } @@ -1084,9 +1089,14 @@ impl PlanProperties { self } + /// Set constraints having mut reference. + pub fn set_constraints(&mut self, constraints: Constraints) { + self.eq_properties.set_constraints(constraints); + } + /// Overwrite constraints with its new value. pub fn with_constraints(mut self, constraints: Constraints) -> Self { - self.eq_properties = self.eq_properties.with_constraints(constraints); + self.set_constraints(constraints); self } @@ -1409,6 +1419,41 @@ pub fn reset_plan_states(plan: Arc) -> Result, + children: &[Arc], +) -> Result { + let old_children = plan.children(); + assert_eq_or_internal_err!( + children.len(), + old_children.len(), + "Wrong number of children" + ); + for (lhs, rhs) in plan.children().iter().zip(children.iter()) { + if !Arc::ptr_eq(lhs.properties(), rhs.properties()) { + return Ok(false); + } + } + Ok(true) +} + +/// Helper macro to avoid properties re-computation if passed children properties +/// the same as plan already has. Could be used to implement fast-path for method +/// [`ExecutionPlan::with_new_children`]. +#[macro_export] +macro_rules! check_if_same_properties { + ($plan: expr, $children: expr) => { + if has_same_children_properties(&$plan, &$children)? { + let plan = $plan.with_new_children_and_same_properties($children); + return Ok(Arc::new(plan)); + } + }; +} + /// Utility function yielding a string representation of the given [`ExecutionPlan`]. pub fn get_plan_string(plan: &Arc) -> Vec { let formatted = displayable(plan.as_ref()).indent(true).to_string(); @@ -1471,7 +1516,7 @@ mod tests { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { unimplemented!() } @@ -1538,7 +1583,7 @@ mod tests { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { unimplemented!() } diff --git a/datafusion/physical-plan/src/explain.rs b/datafusion/physical-plan/src/explain.rs index aa3c0afefe8b5..bf21b0484689a 100644 --- a/datafusion/physical-plan/src/explain.rs +++ b/datafusion/physical-plan/src/explain.rs @@ -44,7 +44,7 @@ pub struct ExplainExec { stringified_plans: Vec, /// control which plans to print verbose: bool, - cache: PlanProperties, + cache: Arc, } impl ExplainExec { @@ -59,7 +59,7 @@ impl ExplainExec { schema, stringified_plans, verbose, - cache, + cache: Arc::new(cache), } } @@ -112,7 +112,7 @@ impl ExecutionPlan for ExplainExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index e78e50c9868fb..4adc4c8ce4da9 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -26,9 +26,10 @@ use super::{ ColumnStatistics, DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; +use crate::check_if_same_properties; use crate::coalesce::{LimitedBatchCoalescer, PushBatchStatus}; use crate::common::can_project; -use crate::execution_plan::CardinalityEffect; +use crate::execution_plan::{CardinalityEffect, has_same_children_properties}; use crate::filter_pushdown::{ ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, PushedDown, PushedDownPredicate, @@ -83,9 +84,9 @@ pub struct FilterExec { /// Selectivity for statistics. 0 = no rows, 100 = all rows default_selectivity: u8, /// Properties equivalence properties, partitioning, etc. - cache: PlanProperties, + cache: Arc, /// The projection indices of the columns in the output schema of join - projection: Option>, + projection: Option>, /// Target batch size for output batches batch_size: usize, /// Number of rows to fetch @@ -113,7 +114,7 @@ impl FilterExec { input: Arc::clone(&input), metrics: ExecutionPlanMetricsSet::new(), default_selectivity, - cache, + cache: Arc::new(cache), projection: None, batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE, fetch: None, @@ -139,9 +140,9 @@ impl FilterExec { } /// Return new instance of [FilterExec] with the given projection. - pub fn with_projection(&self, projection: Option>) -> Result { + pub fn with_projection(&self, projection: Option>) -> Result { // Check if the projection is valid - can_project(&self.schema(), projection.as_ref())?; + can_project(&self.schema(), projection.as_deref())?; let projection = match projection { Some(projection) => match &self.projection { @@ -155,14 +156,14 @@ impl FilterExec { &self.input, &self.predicate, self.default_selectivity, - projection.as_ref(), + projection.as_deref(), )?; Ok(Self { predicate: Arc::clone(&self.predicate), input: Arc::clone(&self.input), metrics: self.metrics.clone(), default_selectivity: self.default_selectivity, - cache, + cache: Arc::new(cache), projection, batch_size: self.batch_size, fetch: self.fetch, @@ -175,7 +176,7 @@ impl FilterExec { input: Arc::clone(&self.input), metrics: self.metrics.clone(), default_selectivity: self.default_selectivity, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), projection: self.projection.clone(), batch_size, fetch: self.fetch, @@ -198,8 +199,8 @@ impl FilterExec { } /// Projection - pub fn projection(&self) -> Option<&Vec> { - self.projection.as_ref() + pub fn projection(&self) -> Option<&[usize]> { + self.projection.as_ref().map(|p| p.as_ref()) } /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics. @@ -276,7 +277,7 @@ impl FilterExec { input: &Arc, predicate: &Arc, default_selectivity: u8, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result { // Combine the equal predicates with the input equivalence properties // to construct the equivalence properties: @@ -328,6 +329,17 @@ impl FilterExec { input.boundedness(), )) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for FilterExec { @@ -382,7 +394,7 @@ impl ExecutionPlan for FilterExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -399,12 +411,13 @@ impl ExecutionPlan for FilterExec { self: Arc, mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); FilterExec::try_new(Arc::clone(&self.predicate), children.swap_remove(0)) .and_then(|e| { let selectivity = e.default_selectivity(); e.with_default_selectivity(selectivity) }) - .and_then(|e| e.with_projection(self.projection().cloned())) + .and_then(|e| e.with_projection(self.projection.as_ref().map(Arc::clone))) .map(|e| e.with_fetch(self.fetch).unwrap()) } @@ -425,7 +438,7 @@ impl ExecutionPlan for FilterExec { predicate: Arc::clone(&self.predicate), input: self.input.execute(partition, context)?, metrics, - projection: self.projection.clone(), + projection: self.projection.as_ref().map(|p| p.to_vec()), batch_coalescer: LimitedBatchCoalescer::new( self.schema(), self.batch_size, @@ -453,7 +466,7 @@ impl ExecutionPlan for FilterExec { self.predicate(), self.default_selectivity, )?; - Ok(stats.project(self.projection.as_ref())) + Ok(stats.project(self.projection.as_ref().map(|p| p.as_ref()))) } fn cardinality_effect(&self) -> CardinalityEffect { @@ -583,12 +596,12 @@ impl ExecutionPlan for FilterExec { input: Arc::clone(&filter_input), metrics: self.metrics.clone(), default_selectivity: self.default_selectivity, - cache: Self::compute_properties( + cache: Arc::new(Self::compute_properties( &filter_input, &new_predicate, self.default_selectivity, - self.projection.as_ref(), - )?, + self.projection.as_ref().map(|p| p.as_ref()), + )?), projection: None, batch_size: self.batch_size, fetch: self.fetch, @@ -608,7 +621,7 @@ impl ExecutionPlan for FilterExec { input: Arc::clone(&self.input), metrics: self.metrics.clone(), default_selectivity: self.default_selectivity, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), projection: self.projection.clone(), batch_size: self.batch_size, fetch, @@ -618,7 +631,7 @@ impl ExecutionPlan for FilterExec { impl EmbeddedProjection for FilterExec { fn with_projection(&self, projection: Option>) -> Result { - self.with_projection(projection) + self.with_projection(projection.map(|p| p.into_iter().collect())) } } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 4f32b6176ec39..4df2cf04032b9 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -25,7 +25,9 @@ use super::utils::{ OnceAsync, OnceFut, StatefulStreamResult, adjust_right_output_partitioning, reorder_output_after_swap, }; -use crate::execution_plan::{EmissionType, boundedness_from_children}; +use crate::execution_plan::{ + EmissionType, boundedness_from_children, has_same_children_properties, +}; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::projection::{ ProjectionExec, join_allows_pushdown, join_table_borders, new_join_children, @@ -34,7 +36,7 @@ use crate::projection::{ use crate::{ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, Statistics, handle_state, + SendableRecordBatchStream, Statistics, check_if_same_properties, handle_state, }; use arrow::array::{RecordBatch, RecordBatchOptions}; @@ -94,7 +96,7 @@ pub struct CrossJoinExec { /// Execution plan metrics metrics: ExecutionPlanMetricsSet, /// Properties such as schema, equivalence properties, ordering, partitioning, etc. - cache: PlanProperties, + cache: Arc, } impl CrossJoinExec { @@ -125,7 +127,7 @@ impl CrossJoinExec { schema, left_fut: Default::default(), metrics: ExecutionPlanMetricsSet::default(), - cache, + cache: Arc::new(cache), } } @@ -192,6 +194,23 @@ impl CrossJoinExec { &self.right.schema(), ) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + let left = children.swap_remove(0); + let right = children.swap_remove(0); + + Self { + left, + right, + metrics: ExecutionPlanMetricsSet::new(), + left_fut: Default::default(), + cache: Arc::clone(&self.cache), + schema: Arc::clone(&self.schema), + } + } } /// Asynchronously collect the result of the left child @@ -256,7 +275,7 @@ impl ExecutionPlan for CrossJoinExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -272,6 +291,7 @@ impl ExecutionPlan for CrossJoinExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new(CrossJoinExec::new( Arc::clone(&children[0]), Arc::clone(&children[1]), @@ -285,7 +305,7 @@ impl ExecutionPlan for CrossJoinExec { schema: Arc::clone(&self.schema), left_fut: Default::default(), // reset the build side! metrics: ExecutionPlanMetricsSet::default(), - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), }; Ok(Arc::new(new_exec)) } diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 131b07461ebe5..73d2446937616 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -22,7 +22,9 @@ use std::sync::{Arc, OnceLock}; use std::{any::Any, vec}; use crate::ExecutionPlanProperties; -use crate::execution_plan::{EmissionType, boundedness_from_children}; +use crate::execution_plan::{ + EmissionType, boundedness_from_children, has_same_children_properties, +}; use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, @@ -466,7 +468,7 @@ pub struct HashJoinExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// The projection indices of the columns in the output schema of join - pub projection: Option>, + pub projection: Option>, /// Information of index and left / right placement of columns column_indices: Vec, /// The equality null-handling behavior of the join algorithm. @@ -474,7 +476,7 @@ pub struct HashJoinExec { /// Flag to indicate if this is a null-aware anti join pub null_aware: bool, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, /// Dynamic filter for pushing down to the probe side /// Set when dynamic filter pushdown is detected in handle_child_pushdown_result. /// HashJoinExec also needs to keep a shared bounds accumulator for coordinating updates. @@ -566,7 +568,7 @@ impl HashJoinExec { let join_schema = Arc::new(join_schema); // check if the projection is valid - can_project(&join_schema, projection.as_ref())?; + can_project(&join_schema, projection.as_deref())?; let cache = Self::compute_properties( &left, @@ -575,7 +577,7 @@ impl HashJoinExec { *join_type, &on, partition_mode, - projection.as_ref(), + projection.as_deref(), )?; // Initialize both dynamic filter and bounds accumulator to None @@ -592,11 +594,11 @@ impl HashJoinExec { random_state, mode: partition_mode, metrics: ExecutionPlanMetricsSet::new(), - projection, + projection: projection.map(Into::into), column_indices, null_equality, null_aware, - cache, + cache: Arc::new(cache), dynamic_filter: None, }) } @@ -688,7 +690,7 @@ impl HashJoinExec { /// Return new instance of [HashJoinExec] with the given projection. pub fn with_projection(&self, projection: Option>) -> Result { // check if the projection is valid - can_project(&self.schema(), projection.as_ref())?; + can_project(&self.schema(), projection.as_deref())?; let projection = match projection { Some(projection) => match &self.projection { Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), @@ -717,7 +719,7 @@ impl HashJoinExec { join_type: JoinType, on: JoinOnRef, mode: PartitionMode, - projection: Option<&Vec>, + projection: Option<&[usize]>, ) -> Result { // Calculate equivalence properties: let mut eq_properties = join_equivalence_properties( @@ -824,7 +826,7 @@ impl HashJoinExec { swap_join_projection( left.schema().fields().len(), right.schema().fields().len(), - self.projection.as_ref(), + self.projection.as_deref(), self.join_type(), ), partition_mode, @@ -937,7 +939,7 @@ impl ExecutionPlan for HashJoinExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -998,6 +1000,20 @@ impl ExecutionPlan for HashJoinExec { self: Arc, children: Vec>, ) -> Result> { + let cache = if has_same_children_properties(&self, &children)? { + Arc::clone(&self.cache) + } else { + Arc::new(Self::compute_properties( + &children[0], + &children[1], + &self.join_schema, + self.join_type, + &self.on, + self.mode, + self.projection.as_deref(), + )?) + }; + Ok(Arc::new(HashJoinExec { left: Arc::clone(&children[0]), right: Arc::clone(&children[1]), @@ -1013,15 +1029,7 @@ impl ExecutionPlan for HashJoinExec { column_indices: self.column_indices.clone(), null_equality: self.null_equality, null_aware: self.null_aware, - cache: Self::compute_properties( - &children[0], - &children[1], - &self.join_schema, - self.join_type, - &self.on, - self.mode, - self.projection.as_ref(), - )?, + cache, // Keep the dynamic filter, bounds accumulator will be reset dynamic_filter: self.dynamic_filter.clone(), })) @@ -1044,7 +1052,7 @@ impl ExecutionPlan for HashJoinExec { column_indices: self.column_indices.clone(), null_equality: self.null_equality, null_aware: self.null_aware, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), // Reset dynamic filter and bounds accumulator to initial state dynamic_filter: None, })) @@ -1240,7 +1248,7 @@ impl ExecutionPlan for HashJoinExec { &self.join_schema, )?; // Project statistics if there is a projection - Ok(stats.project(self.projection.as_ref())) + Ok(stats.project(self.projection.as_deref())) } /// Tries to push `projection` down through `hash_join`. If possible, performs the @@ -1373,7 +1381,7 @@ impl ExecutionPlan for HashJoinExec { column_indices: self.column_indices.clone(), null_equality: self.null_equality, null_aware: self.null_aware, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), dynamic_filter: Some(HashJoinExecDynamicFilter { filter: dynamic_filter, build_accumulator: OnceLock::new(), diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 44637321a7e35..8d36f73ea9c28 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -29,7 +29,9 @@ use super::utils::{ reorder_output_after_swap, swap_join_projection, }; use crate::common::can_project; -use crate::execution_plan::{EmissionType, boundedness_from_children}; +use crate::execution_plan::{ + EmissionType, boundedness_from_children, has_same_children_properties, +}; use crate::joins::SharedBitmapBuilder; use crate::joins::utils::{ BuildProbeJoinMetrics, ColumnIndex, JoinFilter, OnceAsync, OnceFut, @@ -46,6 +48,7 @@ use crate::projection::{ use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, + check_if_same_properties, }; use arrow::array::{ @@ -197,7 +200,7 @@ pub struct NestedLoopJoinExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl NestedLoopJoinExec { @@ -233,7 +236,7 @@ impl NestedLoopJoinExec { column_indices, projection, metrics: Default::default(), - cache, + cache: Arc::new(cache), }) } @@ -335,7 +338,7 @@ impl NestedLoopJoinExec { pub fn with_projection(&self, projection: Option>) -> Result { // check if the projection is valid - can_project(&self.schema(), projection.as_ref())?; + can_project(&self.schema(), projection.as_deref())?; let projection = match projection { Some(projection) => match &self.projection { Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), @@ -371,7 +374,7 @@ impl NestedLoopJoinExec { swap_join_projection( left.schema().fields().len(), right.schema().fields().len(), - self.projection.as_ref(), + self.projection.as_deref(), self.join_type(), ), )?; @@ -399,6 +402,27 @@ impl NestedLoopJoinExec { Ok(plan) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + let left = children.swap_remove(0); + let right = children.swap_remove(0); + + Self { + left, + right, + metrics: ExecutionPlanMetricsSet::new(), + build_side_data: Default::default(), + cache: Arc::clone(&self.cache), + filter: self.filter.clone(), + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + column_indices: self.column_indices.clone(), + projection: self.projection.clone(), + } + } } impl DisplayAs for NestedLoopJoinExec { @@ -453,7 +477,7 @@ impl ExecutionPlan for NestedLoopJoinExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -476,6 +500,7 @@ impl ExecutionPlan for NestedLoopJoinExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new(NestedLoopJoinExec::try_new( Arc::clone(&children[0]), Arc::clone(&children[1]), @@ -577,7 +602,7 @@ impl ExecutionPlan for NestedLoopJoinExec { &self.join_schema, )?; - Ok(stats.project(self.projection.as_ref())) + Ok(stats.project(self.projection.as_deref())) } /// Tries to push `projection` down through `nested_loop_join`. If possible, performs the diff --git a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs index 508be2e3984f4..235f29f03ffe2 100644 --- a/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs @@ -41,7 +41,9 @@ use std::fmt::Formatter; use std::sync::Arc; use std::sync::atomic::AtomicUsize; -use crate::execution_plan::{EmissionType, boundedness_from_children}; +use crate::execution_plan::{ + EmissionType, boundedness_from_children, has_same_children_properties, +}; use crate::joins::piecewise_merge_join::classic_join::{ ClassicPWMJStream, PiecewiseMergeJoinStreamState, @@ -51,7 +53,9 @@ use crate::joins::piecewise_merge_join::utils::{ }; use crate::joins::utils::asymmetric_join_output_partitioning; use crate::metrics::MetricsSet; -use crate::{DisplayAs, DisplayFormatType, ExecutionPlanProperties}; +use crate::{ + DisplayAs, DisplayFormatType, ExecutionPlanProperties, check_if_same_properties, +}; use crate::{ ExecutionPlan, PlanProperties, joins::{ @@ -86,7 +90,7 @@ use crate::{ /// Both sides are sorted so that we can iterate from index 0 to the end on each side. This ordering ensures /// that when we find the first matching pair of rows, we can emit the current stream row joined with all remaining /// probe rows from the match position onward, without rescanning earlier probe rows. -/// +/// /// For `<` and `<=` operators, both inputs are sorted in **descending** order, while for `>` and `>=` operators /// they are sorted in **ascending** order. This choice ensures that the pointer on the buffered side can advance /// monotonically as we stream new batches from the stream side. @@ -129,34 +133,34 @@ use crate::{ /// /// Processing Row 1: /// -/// Sorted Buffered Side Sorted Streamed Side -/// ┌──────────────────┐ ┌──────────────────┐ -/// 1 │ 100 │ 1 │ 100 │ -/// ├──────────────────┤ ├──────────────────┤ -/// 2 │ 200 │ ─┐ 2 │ 200 │ -/// ├──────────────────┤ │ For row 1 on streamed side with ├──────────────────┤ -/// 3 │ 200 │ │ value 100, we emit rows 2 - 5. 3 │ 500 │ +/// Sorted Buffered Side Sorted Streamed Side +/// ┌──────────────────┐ ┌──────────────────┐ +/// 1 │ 100 │ 1 │ 100 │ +/// ├──────────────────┤ ├──────────────────┤ +/// 2 │ 200 │ ─┐ 2 │ 200 │ +/// ├──────────────────┤ │ For row 1 on streamed side with ├──────────────────┤ +/// 3 │ 200 │ │ value 100, we emit rows 2 - 5. 3 │ 500 │ /// ├──────────────────┤ │ as matches when the operator is └──────────────────┘ /// 4 │ 300 │ │ `Operator::Lt` (<) Emitting all /// ├──────────────────┤ │ rows after the first match (row /// 5 │ 400 │ ─┘ 2 buffered side; 100 < 200) -/// └──────────────────┘ +/// └──────────────────┘ /// /// Processing Row 2: /// By sorting the streamed side we know /// -/// Sorted Buffered Side Sorted Streamed Side -/// ┌──────────────────┐ ┌──────────────────┐ -/// 1 │ 100 │ 1 │ 100 │ -/// ├──────────────────┤ ├──────────────────┤ -/// 2 │ 200 │ <- Start here when probing for the 2 │ 200 │ -/// ├──────────────────┤ streamed side row 2. ├──────────────────┤ -/// 3 │ 200 │ 3 │ 500 │ +/// Sorted Buffered Side Sorted Streamed Side +/// ┌──────────────────┐ ┌──────────────────┐ +/// 1 │ 100 │ 1 │ 100 │ +/// ├──────────────────┤ ├──────────────────┤ +/// 2 │ 200 │ <- Start here when probing for the 2 │ 200 │ +/// ├──────────────────┤ streamed side row 2. ├──────────────────┤ +/// 3 │ 200 │ 3 │ 500 │ /// ├──────────────────┤ └──────────────────┘ -/// 4 │ 300 │ -/// ├──────────────────┤ +/// 4 │ 300 │ +/// ├──────────────────┤ /// 5 │ 400 │ -/// └──────────────────┘ +/// └──────────────────┘ /// ``` /// /// ## Existence Joins (Semi, Anti, Mark) @@ -202,10 +206,10 @@ use crate::{ /// 1 │ 100 │ 1 │ 500 │ /// ├──────────────────┤ ├──────────────────┤ /// 2 │ 200 │ 2 │ 200 │ -/// ├──────────────────┤ ├──────────────────┤ +/// ├──────────────────┤ ├──────────────────┤ /// 3 │ 200 │ 3 │ 300 │ /// ├──────────────────┤ └──────────────────┘ -/// 4 │ 300 │ ─┐ +/// 4 │ 300 │ ─┐ /// ├──────────────────┤ | We emit matches for row 4 - 5 /// 5 │ 400 │ ─┘ on the buffered side. /// └──────────────────┘ @@ -236,11 +240,11 @@ use crate::{ /// /// # Mark Join: /// Sorts the probe side, then computes the min/max range of the probe keys and scans the buffered side only -/// within that range. +/// within that range. /// Complexity: `O(|S| + scan(R[range]))`. /// /// ## Nested Loop Join -/// Compares every row from `S` with every row from `R`. +/// Compares every row from `S` with every row from `R`. /// Complexity: `O(|S| * |R|)`. /// /// ## Nested Loop Join @@ -273,13 +277,12 @@ pub struct PiecewiseMergeJoinExec { left_child_plan_required_order: LexOrdering, /// The right sort order, descending for `<`, `<=` operations + ascending for `>`, `>=` operations /// Unsorted for mark joins - #[expect(dead_code)] right_batch_required_orders: LexOrdering, /// This determines the sort order of all join columns used in sorting the stream and buffered execution plans. sort_options: SortOptions, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, /// Number of partitions to process num_partitions: usize, } @@ -373,7 +376,7 @@ impl PiecewiseMergeJoinExec { left_child_plan_required_order, right_batch_required_orders, sort_options, - cache, + cache: Arc::new(cache), num_partitions, }) } @@ -466,6 +469,31 @@ impl PiecewiseMergeJoinExec { pub fn swap_inputs(&self) -> Result> { todo!() } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + let buffered = children.swap_remove(0); + let streamed = children.swap_remove(0); + Self { + buffered, + streamed, + on: self.on.clone(), + operator: self.operator, + join_type: self.join_type, + schema: Arc::clone(&self.schema), + left_child_plan_required_order: self.left_child_plan_required_order.clone(), + right_batch_required_orders: self.right_batch_required_orders.clone(), + sort_options: self.sort_options, + cache: Arc::clone(&self.cache), + num_partitions: self.num_partitions, + + // Re-set state. + metrics: ExecutionPlanMetricsSet::new(), + buffered_fut: Default::default(), + } + } } impl ExecutionPlan for PiecewiseMergeJoinExec { @@ -477,7 +505,7 @@ impl ExecutionPlan for PiecewiseMergeJoinExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -511,6 +539,7 @@ impl ExecutionPlan for PiecewiseMergeJoinExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); match &children[..] { [left, right] => Ok(Arc::new(PiecewiseMergeJoinExec::try_new( Arc::clone(left), @@ -527,6 +556,13 @@ impl ExecutionPlan for PiecewiseMergeJoinExec { } } + fn reset_state(self: Arc) -> Result> { + Ok(Arc::new(self.with_new_children_and_same_properties(vec![ + Arc::clone(&self.buffered), + Arc::clone(&self.streamed), + ]))) + } + fn execute( &self, partition: usize, diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs index ae7a5fa764bcc..be4c83fc2d35c 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/exec.rs @@ -23,7 +23,9 @@ use std::any::Any; use std::fmt::Formatter; use std::sync::Arc; -use crate::execution_plan::{EmissionType, boundedness_from_children}; +use crate::execution_plan::{ + EmissionType, boundedness_from_children, has_same_children_properties, +}; use crate::expressions::PhysicalSortExpr; use crate::joins::sort_merge_join::metrics::SortMergeJoinMetrics; use crate::joins::sort_merge_join::stream::SortMergeJoinStream; @@ -39,7 +41,7 @@ use crate::projection::{ }; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, - PlanProperties, SendableRecordBatchStream, Statistics, + PlanProperties, SendableRecordBatchStream, Statistics, check_if_same_properties, }; use arrow::compute::SortOptions; @@ -127,7 +129,7 @@ pub struct SortMergeJoinExec { /// Defines the null equality for the join. pub null_equality: NullEquality, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl SortMergeJoinExec { @@ -198,7 +200,7 @@ impl SortMergeJoinExec { right_sort_exprs, sort_options, null_equality, - cache, + cache: Arc::new(cache), }) } @@ -340,6 +342,20 @@ impl SortMergeJoinExec { reorder_output_after_swap(Arc::new(new_join), &left.schema(), &right.schema()) } } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + let left = children.swap_remove(0); + let right = children.swap_remove(0); + Self { + left, + right, + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for SortMergeJoinExec { @@ -405,7 +421,7 @@ impl ExecutionPlan for SortMergeJoinExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -440,6 +456,7 @@ impl ExecutionPlan for SortMergeJoinExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); match &children[..] { [left, right] => Ok(Arc::new(SortMergeJoinExec::try_new( Arc::clone(left), diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 1f6bc703a0300..bfb8df1df8095 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -32,8 +32,11 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::vec; +use crate::check_if_same_properties; use crate::common::SharedMemoryReservation; -use crate::execution_plan::{boundedness_from_children, emission_type_from_children}; +use crate::execution_plan::{ + boundedness_from_children, emission_type_from_children, has_same_children_properties, +}; use crate::joins::stream_join_utils::{ PruningJoinHashMap, SortedFilterExpr, StreamJoinMetrics, calculate_filter_expr_intervals, combine_two_batches, @@ -197,7 +200,7 @@ pub struct SymmetricHashJoinExec { /// Partition Mode mode: StreamJoinPartitionMode, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl SymmetricHashJoinExec { @@ -253,7 +256,7 @@ impl SymmetricHashJoinExec { left_sort_exprs, right_sort_exprs, mode, - cache, + cache: Arc::new(cache), }) } @@ -360,6 +363,20 @@ impl SymmetricHashJoinExec { } Ok(false) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + let left = children.swap_remove(0); + let right = children.swap_remove(0); + Self { + left, + right, + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for SymmetricHashJoinExec { @@ -411,7 +428,7 @@ impl ExecutionPlan for SymmetricHashJoinExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -453,6 +470,7 @@ impl ExecutionPlan for SymmetricHashJoinExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new(SymmetricHashJoinExec::try_new( Arc::clone(&children[0]), Arc::clone(&children[1]), diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index a9243fe04e28d..bca55df192e99 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1674,7 +1674,7 @@ fn swap_reverting_projection( pub fn swap_join_projection( left_schema_len: usize, right_schema_len: usize, - projection: Option<&Vec>, + projection: Option<&[usize]>, join_type: &JoinType, ) -> Option> { match join_type { @@ -1685,7 +1685,7 @@ pub fn swap_join_projection( | JoinType::RightAnti | JoinType::RightSemi | JoinType::LeftMark - | JoinType::RightMark => projection.cloned(), + | JoinType::RightMark => projection.map(ToOwned::to_owned), _ => projection.map(|p| { p.iter() .map(|i| { diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 05d6882821477..9925567a6c827 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -27,8 +27,13 @@ use super::{ DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics, }; -use crate::execution_plan::{Boundedness, CardinalityEffect}; -use crate::{DisplayFormatType, Distribution, ExecutionPlan, Partitioning}; +use crate::execution_plan::{ + Boundedness, CardinalityEffect, has_same_children_properties, +}; +use crate::{ + DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + check_if_same_properties, +}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -50,7 +55,7 @@ pub struct GlobalLimitExec { fetch: Option, /// Execution metrics metrics: ExecutionPlanMetricsSet, - cache: PlanProperties, + cache: Arc, } impl GlobalLimitExec { @@ -62,7 +67,7 @@ impl GlobalLimitExec { skip, fetch, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), } } @@ -91,6 +96,17 @@ impl GlobalLimitExec { Boundedness::Bounded, ) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for GlobalLimitExec { @@ -129,7 +145,7 @@ impl ExecutionPlan for GlobalLimitExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -151,10 +167,11 @@ impl ExecutionPlan for GlobalLimitExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new(GlobalLimitExec::new( - Arc::clone(&children[0]), + children.swap_remove(0), self.skip, self.fetch, ))) @@ -214,7 +231,7 @@ impl ExecutionPlan for GlobalLimitExec { } /// LocalLimitExec applies a limit to a single partition -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct LocalLimitExec { /// Input execution plan input: Arc, @@ -222,7 +239,7 @@ pub struct LocalLimitExec { fetch: usize, /// Execution metrics metrics: ExecutionPlanMetricsSet, - cache: PlanProperties, + cache: Arc, } impl LocalLimitExec { @@ -233,7 +250,7 @@ impl LocalLimitExec { input, fetch, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), } } @@ -257,6 +274,17 @@ impl LocalLimitExec { Boundedness::Bounded, ) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for LocalLimitExec { @@ -286,7 +314,7 @@ impl ExecutionPlan for LocalLimitExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -306,6 +334,7 @@ impl ExecutionPlan for LocalLimitExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); match children.len() { 1 => Ok(Arc::new(LocalLimitExec::new( Arc::clone(&children[0]), diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 4a406ca648d57..5c684bec98f9b 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -161,7 +161,7 @@ pub struct LazyMemoryExec { /// Functions to generate batches for each partition batch_generators: Vec>>, /// Plan properties cache storing equivalence properties, partitioning, and execution mode - cache: PlanProperties, + cache: Arc, /// Execution metrics metrics: ExecutionPlanMetricsSet, } @@ -200,7 +200,8 @@ impl LazyMemoryExec { EmissionType::Incremental, boundedness, ) - .with_scheduling_type(SchedulingType::Cooperative); + .with_scheduling_type(SchedulingType::Cooperative) + .into(); Ok(Self { schema, @@ -215,9 +216,9 @@ impl LazyMemoryExec { match projection.as_ref() { Some(columns) => { let projected = Arc::new(self.schema.project(columns).unwrap()); - self.cache = self.cache.with_eq_properties(EquivalenceProperties::new( - Arc::clone(&projected), - )); + Arc::make_mut(&mut self.cache).set_eq_properties( + EquivalenceProperties::new(Arc::clone(&projected)), + ); self.schema = projected; self.projection = projection; self @@ -236,12 +237,12 @@ impl LazyMemoryExec { partition_count, generator_count ); - self.cache.partitioning = partitioning; + Arc::make_mut(&mut self.cache).partitioning = partitioning; Ok(()) } pub fn add_ordering(&mut self, ordering: impl IntoIterator) { - self.cache + Arc::make_mut(&mut self.cache) .eq_properties .add_orderings(std::iter::once(ordering)); } @@ -306,7 +307,7 @@ impl ExecutionPlan for LazyMemoryExec { Arc::clone(&self.schema) } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -365,7 +366,7 @@ impl ExecutionPlan for LazyMemoryExec { Ok(Arc::new(LazyMemoryExec { schema: Arc::clone(&self.schema), batch_generators: generators, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), metrics: ExecutionPlanMetricsSet::new(), projection: self.projection.clone(), })) diff --git a/datafusion/physical-plan/src/placeholder_row.rs b/datafusion/physical-plan/src/placeholder_row.rs index 4d00b73cff39c..f95b10771c02f 100644 --- a/datafusion/physical-plan/src/placeholder_row.rs +++ b/datafusion/physical-plan/src/placeholder_row.rs @@ -43,7 +43,7 @@ pub struct PlaceholderRowExec { schema: SchemaRef, /// Number of partitions partitions: usize, - cache: PlanProperties, + cache: Arc, } impl PlaceholderRowExec { @@ -54,7 +54,7 @@ impl PlaceholderRowExec { PlaceholderRowExec { schema, partitions, - cache, + cache: Arc::new(cache), } } @@ -63,7 +63,7 @@ impl PlaceholderRowExec { self.partitions = partitions; // Update output partitioning when updating partitions: let output_partitioning = Self::output_partitioning_helper(self.partitions); - self.cache = self.cache.with_partitioning(output_partitioning); + Arc::make_mut(&mut self.cache).partitioning = output_partitioning; self } @@ -132,7 +132,7 @@ impl ExecutionPlan for PlaceholderRowExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index e8608f17a1b20..c377993ee9799 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -26,13 +26,13 @@ use super::{ DisplayAs, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, SortOrderPushdownResult, Statistics, }; -use crate::execution_plan::CardinalityEffect; +use crate::execution_plan::{CardinalityEffect, has_same_children_properties}; use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, }; use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef}; -use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr}; +use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr, check_if_same_properties}; use std::any::Any; use std::collections::HashMap; use std::pin::Pin; @@ -77,7 +77,7 @@ pub struct ProjectionExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl ProjectionExec { @@ -152,7 +152,7 @@ impl ProjectionExec { projector, input, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), }) } @@ -192,6 +192,17 @@ impl ProjectionExec { input.boundedness(), )) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for ProjectionExec { @@ -245,7 +256,7 @@ impl ExecutionPlan for ProjectionExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -277,8 +288,9 @@ impl ExecutionPlan for ProjectionExec { self: Arc, mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); ProjectionExec::try_new( - self.projector.projection().clone(), + self.projector.projection().into_iter().cloned(), children.swap_remove(0), ) .map(|p| Arc::new(p) as _) diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 936a02581e89c..f8847cbacefb5 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -74,7 +74,7 @@ pub struct RecursiveQueryExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl RecursiveQueryExec { @@ -97,7 +97,7 @@ impl RecursiveQueryExec { is_distinct, work_table, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), }) } @@ -143,7 +143,7 @@ impl ExecutionPlan for RecursiveQueryExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index d50404c8fc1e8..7b1c57f66134e 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -31,7 +31,9 @@ use super::{ DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream, }; use crate::coalesce::LimitedBatchCoalescer; -use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType}; +use crate::execution_plan::{ + CardinalityEffect, EvaluationType, SchedulingType, has_same_children_properties, +}; use crate::hash_utils::create_hashes; use crate::metrics::{BaselineMetrics, SpillMetrics}; use crate::projection::{ProjectionExec, all_columns, make_with_child, update_expr}; @@ -39,7 +41,10 @@ use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::spill::spill_manager::SpillManager; use crate::spill::spill_pool::{self, SpillPoolWriter}; use crate::stream::RecordBatchStreamAdapter; -use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics}; +use crate::{ + DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics, + check_if_same_properties, +}; use arrow::array::{PrimitiveArray, RecordBatch, RecordBatchOptions}; use arrow::compute::take_arrays; @@ -744,7 +749,7 @@ pub struct RepartitionExec { /// `SortPreservingRepartitionExec`, false means `RepartitionExec`. preserve_order: bool, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } #[derive(Debug, Clone)] @@ -813,6 +818,18 @@ impl RepartitionExec { pub fn name(&self) -> &str { "RepartitionExec" } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + state: Default::default(), + ..Self::clone(self) + } + } } impl DisplayAs for RepartitionExec { @@ -872,7 +889,7 @@ impl ExecutionPlan for RepartitionExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -884,6 +901,7 @@ impl ExecutionPlan for RepartitionExec { self: Arc, mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); let mut repartition = RepartitionExec::try_new( children.swap_remove(0), self.partitioning().clone(), @@ -1185,7 +1203,7 @@ impl ExecutionPlan for RepartitionExec { _config: &ConfigOptions, ) -> Result>> { use Partitioning::*; - let mut new_properties = self.cache.clone(); + let mut new_properties = PlanProperties::clone(&self.cache); new_properties.partitioning = match new_properties.partitioning { RoundRobinBatch(_) => RoundRobinBatch(target_partitions), Hash(hash, _) => Hash(hash, target_partitions), @@ -1196,7 +1214,7 @@ impl ExecutionPlan for RepartitionExec { state: Arc::clone(&self.state), metrics: self.metrics.clone(), preserve_order: self.preserve_order, - cache: new_properties, + cache: new_properties.into(), }))) } } @@ -1216,7 +1234,7 @@ impl RepartitionExec { state: Default::default(), metrics: ExecutionPlanMetricsSet::new(), preserve_order, - cache, + cache: Arc::new(cache), }) } @@ -1277,7 +1295,7 @@ impl RepartitionExec { // to maintain order self.input.output_partitioning().partition_count() > 1; let eq_properties = Self::eq_properties_helper(&self.input, self.preserve_order); - self.cache = self.cache.with_eq_properties(eq_properties); + Arc::make_mut(&mut self.cache).set_eq_properties(eq_properties); self } diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 73ba889c9e40b..8dd498f10368b 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -57,11 +57,13 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use crate::execution_plan::has_same_children_properties; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::sorts::sort::sort_batch; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, + check_if_same_properties, }; use arrow::compute::concat_batches; @@ -93,7 +95,7 @@ pub struct PartialSortExec { /// Fetch highest/lowest n results fetch: Option, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl PartialSortExec { @@ -114,7 +116,7 @@ impl PartialSortExec { metrics_set: ExecutionPlanMetricsSet::new(), preserve_partitioning, fetch: None, - cache, + cache: Arc::new(cache), } } @@ -132,12 +134,8 @@ impl PartialSortExec { /// input partitions producing a single, sorted partition. pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self { self.preserve_partitioning = preserve_partitioning; - self.cache = self - .cache - .with_partitioning(Self::output_partitioning_helper( - &self.input, - self.preserve_partitioning, - )); + Arc::make_mut(&mut self.cache).partitioning = + Self::output_partitioning_helper(&self.input, self.preserve_partitioning); self } @@ -207,6 +205,16 @@ impl PartialSortExec { input.boundedness(), )) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + ..Self::clone(self) + } + } } impl DisplayAs for PartialSortExec { @@ -255,7 +263,7 @@ impl ExecutionPlan for PartialSortExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -283,6 +291,7 @@ impl ExecutionPlan for PartialSortExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); let new_partial_sort = PartialSortExec::new( self.expr.clone(), Arc::clone(&children[0]), diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 3e8fdf1f3ed7e..b071c784951cd 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -27,7 +27,9 @@ use std::sync::Arc; use parking_lot::RwLock; use crate::common::spawn_buffered; -use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; +use crate::execution_plan::{ + Boundedness, CardinalityEffect, EmissionType, has_same_children_properties, +}; use crate::expressions::PhysicalSortExpr; use crate::filter_pushdown::{ ChildFilterDescription, FilterDescription, FilterPushdownPhase, @@ -951,7 +953,7 @@ pub struct SortExec { /// Normalized common sort prefix between the input and the sort expressions (only used with fetch) common_sort_prefix: Vec, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, /// Filter matching the state of the sort for dynamic filter pushdown. /// If `fetch` is `Some`, this will also be set and a TopK operator may be used. /// If `fetch` is `None`, this will be `None`. @@ -973,7 +975,7 @@ impl SortExec { preserve_partitioning, fetch: None, common_sort_prefix: sort_prefix, - cache, + cache: Arc::new(cache), filter: None, } } @@ -992,12 +994,8 @@ impl SortExec { /// input partitions producing a single, sorted partition. pub fn with_preserve_partitioning(mut self, preserve_partitioning: bool) -> Self { self.preserve_partitioning = preserve_partitioning; - self.cache = self - .cache - .with_partitioning(Self::output_partitioning_helper( - &self.input, - self.preserve_partitioning, - )); + Arc::make_mut(&mut self.cache).partitioning = + Self::output_partitioning_helper(&self.input, self.preserve_partitioning); self } @@ -1021,7 +1019,7 @@ impl SortExec { preserve_partitioning: self.preserve_partitioning, common_sort_prefix: self.common_sort_prefix.clone(), fetch: self.fetch, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), filter: self.filter.clone(), } } @@ -1034,12 +1032,12 @@ impl SortExec { /// operation since rows that are not going to be included /// can be dropped. pub fn with_fetch(&self, fetch: Option) -> Self { - let mut cache = self.cache.clone(); + let mut cache = PlanProperties::clone(&self.cache); // If the SortExec can emit incrementally (that means the sort requirements // and properties of the input match), the SortExec can generate its result // without scanning the entire input when a fetch value exists. let is_pipeline_friendly = matches!( - self.cache.emission_type, + cache.emission_type, EmissionType::Incremental | EmissionType::Both ); if fetch.is_some() && is_pipeline_friendly { @@ -1051,7 +1049,7 @@ impl SortExec { }); let mut new_sort = self.cloned(); new_sort.fetch = fetch; - new_sort.cache = cache; + new_sort.cache = cache.into(); new_sort.filter = filter; new_sort } @@ -1206,7 +1204,7 @@ impl ExecutionPlan for SortExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -1235,14 +1233,17 @@ impl ExecutionPlan for SortExec { let mut new_sort = self.cloned(); assert_eq!(children.len(), 1, "SortExec should have exactly one child"); new_sort.input = Arc::clone(&children[0]); - // Recompute the properties based on the new input since they may have changed - let (cache, sort_prefix) = Self::compute_properties( - &new_sort.input, - new_sort.expr.clone(), - new_sort.preserve_partitioning, - )?; - new_sort.cache = cache; - new_sort.common_sort_prefix = sort_prefix; + + if !has_same_children_properties(&self, &children)? { + // Recompute the properties based on the new input since they may have changed + let (cache, sort_prefix) = Self::compute_properties( + &new_sort.input, + new_sort.expr.clone(), + new_sort.preserve_partitioning, + )?; + new_sort.cache = Arc::new(cache); + new_sort.common_sort_prefix = sort_prefix; + } Ok(Arc::new(new_sort)) } @@ -1466,7 +1467,7 @@ mod tests { pub struct SortedUnboundedExec { schema: Schema, batch_size: u64, - cache: PlanProperties, + cache: Arc, } impl DisplayAs for SortedUnboundedExec { @@ -1506,7 +1507,7 @@ mod tests { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -2259,7 +2260,9 @@ mod tests { let source = SortedUnboundedExec { schema: schema.clone(), batch_size: 2, - cache: SortedUnboundedExec::compute_properties(Arc::new(schema.clone())), + cache: Arc::new(SortedUnboundedExec::compute_properties(Arc::new( + schema.clone(), + ))), }; let mut plan = SortExec::new( [PhysicalSortExpr::new_default(Arc::new(Column::new( diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index 0ddea90a98bf3..c1d7432126b09 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -28,6 +28,7 @@ use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, + check_if_same_properties, }; use datafusion_common::{Result, assert_eq_or_internal_err, internal_err}; @@ -35,7 +36,9 @@ use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::MemoryConsumer; use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements}; -use crate::execution_plan::{EvaluationType, SchedulingType}; +use crate::execution_plan::{ + EvaluationType, SchedulingType, has_same_children_properties, +}; use log::{debug, trace}; /// Sort preserving merge execution plan @@ -93,7 +96,7 @@ pub struct SortPreservingMergeExec { /// Optional number of rows to fetch. Stops producing rows after this fetch fetch: Option, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, /// Use round-robin selection of tied winners of loser tree /// /// See [`Self::with_round_robin_repartition`] for more information. @@ -109,7 +112,7 @@ impl SortPreservingMergeExec { expr, metrics: ExecutionPlanMetricsSet::new(), fetch: None, - cache, + cache: Arc::new(cache), enable_round_robin_repartition: true, } } @@ -180,6 +183,16 @@ impl SortPreservingMergeExec { .with_evaluation_type(drive) .with_scheduling_type(scheduling) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + ..Self::clone(self) + } + } } impl DisplayAs for SortPreservingMergeExec { @@ -225,7 +238,7 @@ impl ExecutionPlan for SortPreservingMergeExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -240,7 +253,7 @@ impl ExecutionPlan for SortPreservingMergeExec { expr: self.expr.clone(), metrics: self.metrics.clone(), fetch: limit, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), enable_round_robin_repartition: true, })) } @@ -267,10 +280,11 @@ impl ExecutionPlan for SortPreservingMergeExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new( - SortPreservingMergeExec::new(self.expr.clone(), Arc::clone(&children[0])) + SortPreservingMergeExec::new(self.expr.clone(), children.swap_remove(0)) .with_fetch(self.fetch), )) } @@ -1350,7 +1364,7 @@ mod tests { #[derive(Debug, Clone)] struct CongestedExec { schema: Schema, - cache: PlanProperties, + cache: Arc, congestion: Arc, } @@ -1386,7 +1400,7 @@ mod tests { fn as_any(&self) -> &dyn Any { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } fn children(&self) -> Vec<&Arc> { @@ -1479,7 +1493,7 @@ mod tests { }; let source = CongestedExec { schema: schema.clone(), - cache: properties, + cache: Arc::new(properties), congestion: Arc::new(Congestion::new(partition_count)), }; let spm = SortPreservingMergeExec::new( diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index c8b8d95718cb8..1535482374110 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -67,7 +67,7 @@ pub struct StreamingTableExec { projected_output_ordering: Vec, infinite: bool, limit: Option, - cache: PlanProperties, + cache: Arc, metrics: ExecutionPlanMetricsSet, } @@ -111,7 +111,7 @@ impl StreamingTableExec { projected_output_ordering, infinite, limit, - cache, + cache: Arc::new(cache), metrics: ExecutionPlanMetricsSet::new(), }) } @@ -236,7 +236,7 @@ impl ExecutionPlan for StreamingTableExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -335,7 +335,7 @@ impl ExecutionPlan for StreamingTableExec { projected_output_ordering: self.projected_output_ordering.clone(), infinite: self.infinite, limit, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), metrics: self.metrics.clone(), })) } diff --git a/datafusion/physical-plan/src/test.rs b/datafusion/physical-plan/src/test.rs index c6d0940c35480..4df4b14ce26df 100644 --- a/datafusion/physical-plan/src/test.rs +++ b/datafusion/physical-plan/src/test.rs @@ -75,7 +75,7 @@ pub struct TestMemoryExec { /// The maximum number of records to read from this plan. If `None`, /// all records after filtering are returned. fetch: Option, - cache: PlanProperties, + cache: Arc, } impl DisplayAs for TestMemoryExec { @@ -134,7 +134,7 @@ impl ExecutionPlan for TestMemoryExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -235,11 +235,11 @@ impl TestMemoryExec { schema: SchemaRef, projection: Option>, ) -> Result { - let projected_schema = project_schema(&schema, projection.as_ref())?; + let projected_schema = project_schema(&schema, projection.as_deref())?; Ok(Self { partitions: partitions.to_vec(), schema, - cache: PlanProperties::new( + cache: Arc::new(PlanProperties::new( EquivalenceProperties::new_with_orderings( Arc::clone(&projected_schema), Vec::::new(), @@ -247,7 +247,7 @@ impl TestMemoryExec { Partitioning::UnknownPartitioning(partitions.len()), EmissionType::Incremental, Boundedness::Bounded, - ), + )), projected_schema, projection, sort_information: vec![], @@ -265,7 +265,7 @@ impl TestMemoryExec { ) -> Result> { let mut source = Self::try_new(partitions, schema, projection)?; let cache = source.compute_properties(); - source.cache = cache; + source.cache = Arc::new(cache); Ok(Arc::new(source)) } @@ -273,7 +273,7 @@ impl TestMemoryExec { pub fn update_cache(source: &Arc) -> TestMemoryExec { let cache = source.compute_properties(); let mut source = (**source).clone(); - source.cache = cache; + source.cache = Arc::new(cache); source } @@ -342,7 +342,7 @@ impl TestMemoryExec { } self.sort_information = sort_information; - self.cache = self.compute_properties(); + self.cache = Arc::new(self.compute_properties()); Ok(self) } diff --git a/datafusion/physical-plan/src/test/exec.rs b/datafusion/physical-plan/src/test/exec.rs index 4507cccba05a9..a8b21f70f7760 100644 --- a/datafusion/physical-plan/src/test/exec.rs +++ b/datafusion/physical-plan/src/test/exec.rs @@ -125,7 +125,7 @@ pub struct MockExec { /// if true (the default), sends data using a separate task to ensure the /// batches are not available without this stream yielding first use_task: bool, - cache: PlanProperties, + cache: Arc, } impl MockExec { @@ -142,7 +142,7 @@ impl MockExec { data, schema, use_task: true, - cache, + cache: Arc::new(cache), } } @@ -192,7 +192,7 @@ impl ExecutionPlan for MockExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -299,7 +299,7 @@ pub struct BarrierExec { /// all streams wait on this barrier to produce barrier: Arc, - cache: PlanProperties, + cache: Arc, } impl BarrierExec { @@ -312,7 +312,7 @@ impl BarrierExec { data, schema, barrier, - cache, + cache: Arc::new(cache), } } @@ -364,7 +364,7 @@ impl ExecutionPlan for BarrierExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -429,7 +429,7 @@ impl ExecutionPlan for BarrierExec { /// A mock execution plan that errors on a call to execute #[derive(Debug)] pub struct ErrorExec { - cache: PlanProperties, + cache: Arc, } impl Default for ErrorExec { @@ -446,7 +446,9 @@ impl ErrorExec { true, )])); let cache = Self::compute_properties(schema); - Self { cache } + Self { + cache: Arc::new(cache), + } } /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. @@ -487,7 +489,7 @@ impl ExecutionPlan for ErrorExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -517,7 +519,7 @@ impl ExecutionPlan for ErrorExec { pub struct StatisticsExec { stats: Statistics, schema: Arc, - cache: PlanProperties, + cache: Arc, } impl StatisticsExec { pub fn new(stats: Statistics, schema: Schema) -> Self { @@ -530,7 +532,7 @@ impl StatisticsExec { Self { stats, schema: Arc::new(schema), - cache, + cache: Arc::new(cache), } } @@ -577,7 +579,7 @@ impl ExecutionPlan for StatisticsExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -623,7 +625,7 @@ pub struct BlockingExec { /// Ref-counting helper to check if the plan and the produced stream are still in memory. refs: Arc<()>, - cache: PlanProperties, + cache: Arc, } impl BlockingExec { @@ -633,7 +635,7 @@ impl BlockingExec { Self { schema, refs: Default::default(), - cache, + cache: Arc::new(cache), } } @@ -684,7 +686,7 @@ impl ExecutionPlan for BlockingExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -766,7 +768,7 @@ pub struct PanicExec { /// Number of output partitions. Each partition will produce this /// many empty output record batches prior to panicking batches_until_panics: Vec, - cache: PlanProperties, + cache: Arc, } impl PanicExec { @@ -778,7 +780,7 @@ impl PanicExec { Self { schema, batches_until_panics, - cache, + cache: Arc::new(cache), } } @@ -830,7 +832,7 @@ impl ExecutionPlan for PanicExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index d27c81b968490..6c9fe33bad04b 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -32,9 +32,10 @@ use super::{ SendableRecordBatchStream, Statistics, metrics::{ExecutionPlanMetricsSet, MetricsSet}, }; +use crate::check_if_same_properties; use crate::execution_plan::{ InvariantLevel, boundedness_from_children, check_default_invariants, - emission_type_from_children, + emission_type_from_children, has_same_children_properties, }; use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase}; use crate::metrics::BaselineMetrics; @@ -100,7 +101,7 @@ pub struct UnionExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl UnionExec { @@ -118,7 +119,7 @@ impl UnionExec { UnionExec { inputs, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), } } @@ -147,7 +148,7 @@ impl UnionExec { Ok(Arc::new(UnionExec { inputs, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), })) } } @@ -183,6 +184,17 @@ impl UnionExec { boundedness_from_children(inputs), )) } + + fn with_new_children_and_same_properties( + &self, + children: Vec>, + ) -> Self { + Self { + inputs: children, + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for UnionExec { @@ -210,7 +222,7 @@ impl ExecutionPlan for UnionExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -259,6 +271,7 @@ impl ExecutionPlan for UnionExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); UnionExec::try_new(children) } @@ -411,7 +424,7 @@ pub struct InterleaveExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl InterleaveExec { @@ -425,7 +438,7 @@ impl InterleaveExec { Ok(InterleaveExec { inputs, metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), }) } @@ -447,6 +460,17 @@ impl InterleaveExec { boundedness_from_children(inputs), )) } + + fn with_new_children_and_same_properties( + &self, + children: Vec>, + ) -> Self { + Self { + inputs: children, + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for InterleaveExec { @@ -474,7 +498,7 @@ impl ExecutionPlan for InterleaveExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -495,6 +519,7 @@ impl ExecutionPlan for InterleaveExec { can_interleave(children.iter()), "Can not create InterleaveExec: new children can not be interleaved" ); + check_if_same_properties!(self, children); Ok(Arc::new(InterleaveExec::try_new(children)?)) } diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 5fef754e80780..d8a8937c46b5e 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -26,9 +26,10 @@ use super::metrics::{ RecordOutput, }; use super::{DisplayAs, ExecutionPlanProperties, PlanProperties}; +use crate::execution_plan::has_same_children_properties; use crate::{ DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream, - SendableRecordBatchStream, + SendableRecordBatchStream, check_if_same_properties, }; use arrow::array::{ @@ -74,7 +75,7 @@ pub struct UnnestExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl UnnestExec { @@ -100,7 +101,7 @@ impl UnnestExec { struct_column_indices, options, metrics: Default::default(), - cache, + cache: Arc::new(cache), }) } @@ -193,6 +194,17 @@ impl UnnestExec { pub fn options(&self) -> &UnnestOptions { &self.options } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for UnnestExec { @@ -221,7 +233,7 @@ impl ExecutionPlan for UnnestExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -231,10 +243,11 @@ impl ExecutionPlan for UnnestExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new(UnnestExec::new( - Arc::clone(&children[0]), + children.swap_remove(0), self.list_column_indices.clone(), self.struct_column_indices.clone(), Arc::clone(&self.schema), diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 987a400ec369e..9dd61b2170cf6 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -28,6 +28,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use super::utils::create_schema; +use crate::execution_plan::has_same_children_properties; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::windows::{ calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs, @@ -36,7 +37,7 @@ use crate::windows::{ use crate::{ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, InputOrderMode, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, Statistics, WindowExpr, + SendableRecordBatchStream, Statistics, WindowExpr, check_if_same_properties, }; use arrow::compute::take_record_batch; @@ -93,7 +94,7 @@ pub struct BoundedWindowAggExec { // See `get_ordered_partition_by_indices` for more details. ordered_partition_by_indices: Vec, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, /// If `can_rerepartition` is false, partition_keys is always empty. can_repartition: bool, } @@ -134,7 +135,7 @@ impl BoundedWindowAggExec { metrics: ExecutionPlanMetricsSet::new(), input_order_mode, ordered_partition_by_indices, - cache, + cache: Arc::new(cache), can_repartition, }) } @@ -248,6 +249,17 @@ impl BoundedWindowAggExec { total_byte_size: Precision::Absent, }) } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for BoundedWindowAggExec { @@ -304,7 +316,7 @@ impl ExecutionPlan for BoundedWindowAggExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -339,6 +351,7 @@ impl ExecutionPlan for BoundedWindowAggExec { self: Arc, children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new(BoundedWindowAggExec::try_new( self.window_expr.clone(), Arc::clone(&children[0]), diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index aa99f4f49885a..734a20800641b 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use super::utils::create_schema; -use crate::execution_plan::EmissionType; +use crate::execution_plan::{EmissionType, has_same_children_properties}; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::windows::{ calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs, @@ -32,7 +32,7 @@ use crate::windows::{ use crate::{ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, PhysicalExpr, PlanProperties, RecordBatchStream, - SendableRecordBatchStream, Statistics, WindowExpr, + SendableRecordBatchStream, Statistics, WindowExpr, check_if_same_properties, }; use arrow::array::ArrayRef; @@ -65,7 +65,7 @@ pub struct WindowAggExec { // see `get_ordered_partition_by_indices` for more details. ordered_partition_by_indices: Vec, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, /// If `can_partition` is false, partition_keys is always empty. can_repartition: bool, } @@ -89,7 +89,7 @@ impl WindowAggExec { schema, metrics: ExecutionPlanMetricsSet::new(), ordered_partition_by_indices, - cache, + cache: Arc::new(cache), can_repartition, }) } @@ -158,6 +158,17 @@ impl WindowAggExec { .unwrap_or_else(Vec::new) } } + + fn with_new_children_and_same_properties( + &self, + mut children: Vec>, + ) -> Self { + Self { + input: children.swap_remove(0), + metrics: ExecutionPlanMetricsSet::new(), + ..Self::clone(self) + } + } } impl DisplayAs for WindowAggExec { @@ -206,7 +217,7 @@ impl ExecutionPlan for WindowAggExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -242,11 +253,12 @@ impl ExecutionPlan for WindowAggExec { fn with_new_children( self: Arc, - children: Vec>, + mut children: Vec>, ) -> Result> { + check_if_same_properties!(self, children); Ok(Arc::new(WindowAggExec::try_new( self.window_expr.clone(), - Arc::clone(&children[0]), + children.swap_remove(0), true, )?)) } diff --git a/datafusion/physical-plan/src/work_table.rs b/datafusion/physical-plan/src/work_table.rs index f1b9e3e88d123..67e7bb86a3d81 100644 --- a/datafusion/physical-plan/src/work_table.rs +++ b/datafusion/physical-plan/src/work_table.rs @@ -109,7 +109,7 @@ pub struct WorkTableExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Cache holding plan properties like equivalences, output partitioning etc. - cache: PlanProperties, + cache: Arc, } impl WorkTableExec { @@ -129,7 +129,7 @@ impl WorkTableExec { projection, work_table: Arc::new(WorkTable::new(name)), metrics: ExecutionPlanMetricsSet::new(), - cache, + cache: Arc::new(cache), }) } @@ -181,7 +181,7 @@ impl ExecutionPlan for WorkTableExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.cache } @@ -263,7 +263,7 @@ impl ExecutionPlan for WorkTableExec { projection: self.projection.clone(), metrics: ExecutionPlanMetricsSet::new(), work_table, - cache: self.cache.clone(), + cache: Arc::clone(&self.cache), })) } } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 45868df4ced6c..c5a28b5cbb7d4 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -588,8 +588,8 @@ impl protobuf::PhysicalPlanNode { None }; - let filter = - FilterExec::try_new(predicate, input)?.with_projection(projection)?; + let filter = FilterExec::try_new(predicate, input)? + .with_projection(projection.map(Into::into))?; match filter_selectivity { Ok(filter_selectivity) => Ok(Arc::new( filter.with_default_selectivity(filter_selectivity)?, diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 31878e2e34b3d..39e591fdb7744 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -1825,7 +1825,7 @@ async fn roundtrip_projection_source() -> Result<()> { Arc::new(BinaryExpr::new(col("c", &schema)?, Operator::Eq, lit(1))), DataSourceExec::from_data_source(scan_config), )? - .with_projection(Some(vec![0, 1]))?, + .with_projection(Some(vec![0, 1].into()))?, ); roundtrip_test(filter) diff --git a/docs/source/library-user-guide/custom-table-providers.md b/docs/source/library-user-guide/custom-table-providers.md index 8e1dee9e843ac..b4da91e6e3eb9 100644 --- a/docs/source/library-user-guide/custom-table-providers.md +++ b/docs/source/library-user-guide/custom-table-providers.md @@ -108,7 +108,7 @@ impl ExecutionPlan for CustomExec { } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { unreachable!() } @@ -232,7 +232,7 @@ The `scan` method of the `TableProvider` returns a `Result &PlanProperties { +# fn properties(&self) -> &Arc { # unreachable!() # } # @@ -291,7 +291,7 @@ impl CustomExec { schema: SchemaRef, db: CustomDataSource, ) -> Self { - let projected_schema = project_schema(&schema, projections).unwrap(); + let projected_schema = project_schema(&schema, projections.map(AsRef::as_ref)).unwrap(); Self { db, projected_schema, @@ -424,7 +424,7 @@ This will allow you to use the custom table provider in DataFusion. For example, # } # # -# fn properties(&self) -> &PlanProperties { +# fn properties(&self) -> &Arc { # unreachable!() # } # @@ -483,7 +483,7 @@ This will allow you to use the custom table provider in DataFusion. For example, # schema: SchemaRef, # db: CustomDataSource, # ) -> Self { -# let projected_schema = project_schema(&schema, projections).unwrap(); +# let projected_schema = project_schema(&schema, projections.map(AsRef::as_ref)).unwrap(); # Self { # db, # projected_schema,