Skip to content

Commit 99cf634

Browse files
committed
add fast-path for with_new_children
This patch aims to implement a fast-path for the ExecutionPlan::with_new_children function for some plans, moving closer to a physical plan re-use implementation and improving planning performance. If the passed children properties are the same as in self, we do not actually recompute self's properties (which could be costly if projection mapping is required). Instead, we just replace the children and re-use self's properties as-is. To be able to compare two different properties -- ExecutionPlan::properties(...) signature is modified and now returns `&Arc<PlanProperties>`. If `children` properties are the same in `with_new_children` -- we clone our properties arc and then a parent plan will consider our properties as unchanged, doing the same. Also, there are other improvenets, all changes: - Return `&Arc<PlanProperties>` from `ExecutionPlan::properties(...)` instead of a reference. - Implement `with_new_children` fast-path if there is no children properties changes for plans: * SortExec * RepartitionExec * ProjectionExec * FilterExec * CoalescePartitionsExec * AggregateExec - Store `Arc<[usize]>` instead of vector within `FilterExec`. - Store `Arc<[Arc<AggregateFunctionExpr>]>` instead of vec for aggr expr and filters. - Store `Arc<[ProjectionExpr]> instead of vec in `ProjectionExprs` struct. Note: currently, `reset_plan_states` does not allow to re-use plan in general: it is not supported for dynamic filters and recursive queries features, as in this case state reset should update pointers in the children plans.
1 parent daadb0c commit 99cf634

File tree

70 files changed

+503
-393
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+503
-393
lines changed

datafusion-examples/examples/custom_data_source/custom_datasource.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ impl TableProvider for CustomDataSource {
192192
struct CustomExec {
193193
db: CustomDataSource,
194194
projected_schema: SchemaRef,
195-
cache: PlanProperties,
195+
cache: Arc<PlanProperties>,
196196
}
197197

198198
impl CustomExec {
@@ -202,12 +202,13 @@ impl CustomExec {
202202
schema: SchemaRef,
203203
db: CustomDataSource,
204204
) -> Self {
205-
let projected_schema = project_schema(&schema, projections).unwrap();
205+
let projected_schema =
206+
project_schema(&schema, projections.map(AsRef::as_ref)).unwrap();
206207
let cache = Self::compute_properties(projected_schema.clone());
207208
Self {
208209
db,
209210
projected_schema,
210-
cache,
211+
cache: Arc::new(cache),
211212
}
212213
}
213214

@@ -238,7 +239,7 @@ impl ExecutionPlan for CustomExec {
238239
self
239240
}
240241

241-
fn properties(&self) -> &PlanProperties {
242+
fn properties(&self) -> &Arc<PlanProperties> {
242243
&self.cache
243244
}
244245

datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ impl ExternalBatchBufferer {
199199
struct BufferingExecutionPlan {
200200
schema: SchemaRef,
201201
input: Arc<dyn ExecutionPlan>,
202-
properties: PlanProperties,
202+
properties: Arc<PlanProperties>,
203203
}
204204

205205
impl BufferingExecutionPlan {
@@ -233,7 +233,7 @@ impl ExecutionPlan for BufferingExecutionPlan {
233233
self.schema.clone()
234234
}
235235

236-
fn properties(&self) -> &PlanProperties {
236+
fn properties(&self) -> &Arc<PlanProperties> {
237237
&self.properties
238238
}
239239

datafusion-examples/examples/proto/composed_extension_codec.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ impl ExecutionPlan for ParentExec {
106106
self
107107
}
108108

109-
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
109+
fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
110110
unreachable!()
111111
}
112112

@@ -182,7 +182,7 @@ impl ExecutionPlan for ChildExec {
182182
self
183183
}
184184

185-
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
185+
fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
186186
unreachable!()
187187
}
188188

datafusion-examples/examples/relation_planner/table_sample.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,7 @@ pub struct SampleExec {
618618
upper_bound: f64,
619619
seed: u64,
620620
metrics: ExecutionPlanMetricsSet,
621-
cache: PlanProperties,
621+
cache: Arc<PlanProperties>,
622622
}
623623

624624
impl SampleExec {
@@ -656,7 +656,7 @@ impl SampleExec {
656656
upper_bound,
657657
seed,
658658
metrics: ExecutionPlanMetricsSet::new(),
659-
cache,
659+
cache: Arc::new(cache),
660660
})
661661
}
662662

@@ -686,7 +686,7 @@ impl ExecutionPlan for SampleExec {
686686
self
687687
}
688688

689-
fn properties(&self) -> &PlanProperties {
689+
fn properties(&self) -> &Arc<PlanProperties> {
690690
&self.cache
691691
}
692692

datafusion/catalog-listing/src/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,7 @@ impl TableProvider for ListingTable {
522522

523523
// if no files need to be read, return an `EmptyExec`
524524
if partitioned_file_lists.is_empty() {
525-
let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
525+
let projected_schema = project_schema(&self.schema(), projection.as_deref())?;
526526
return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
527527
}
528528

datafusion/catalog/src/memory/table.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ fn evaluate_filters_to_mask(
549549
struct DmlResultExec {
550550
rows_affected: u64,
551551
schema: SchemaRef,
552-
properties: PlanProperties,
552+
properties: Arc<PlanProperties>,
553553
}
554554

555555
impl DmlResultExec {
@@ -570,7 +570,7 @@ impl DmlResultExec {
570570
Self {
571571
rows_affected,
572572
schema,
573-
properties,
573+
properties: Arc::new(properties),
574574
}
575575
}
576576
}
@@ -604,7 +604,7 @@ impl ExecutionPlan for DmlResultExec {
604604
Arc::clone(&self.schema)
605605
}
606606

607-
fn properties(&self) -> &PlanProperties {
607+
fn properties(&self) -> &Arc<PlanProperties> {
608608
&self.properties
609609
}
610610

datafusion/common/src/stats.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ impl Statistics {
391391
/// For example, if we had statistics for columns `{"a", "b", "c"}`,
392392
/// projecting to `vec![2, 1]` would return statistics for columns `{"c",
393393
/// "b"}`.
394-
pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self {
394+
pub fn project(mut self, projection: Option<&[usize]>) -> Self {
395395
let Some(projection) = projection else {
396396
return self;
397397
};
@@ -1066,29 +1066,28 @@ mod tests {
10661066

10671067
#[test]
10681068
fn test_project_none() {
1069-
let projection = None;
1070-
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1069+
let stats = make_stats(vec![10, 20, 30]).project(None);
10711070
assert_eq!(stats, make_stats(vec![10, 20, 30]));
10721071
}
10731072

10741073
#[test]
10751074
fn test_project_empty() {
1076-
let projection = Some(vec![]);
1077-
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1075+
let projection: Option<&[_]> = Some(&[]);
1076+
let stats = make_stats(vec![10, 20, 30]).project(projection);
10781077
assert_eq!(stats, make_stats(vec![]));
10791078
}
10801079

10811080
#[test]
10821081
fn test_project_swap() {
1083-
let projection = Some(vec![2, 1]);
1084-
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1082+
let projection: Option<&[_]> = Some(&[2, 1]);
1083+
let stats = make_stats(vec![10, 20, 30]).project(projection);
10851084
assert_eq!(stats, make_stats(vec![30, 20]));
10861085
}
10871086

10881087
#[test]
10891088
fn test_project_repeated() {
1090-
let projection = Some(vec![1, 2, 1, 1, 0, 2]);
1091-
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1089+
let projection: Option<&[_]> = Some(&[1, 2, 1, 1, 0, 2]);
1090+
let stats = make_stats(vec![10, 20, 30]).project(projection);
10921091
assert_eq!(stats, make_stats(vec![20, 30, 20, 20, 10, 30]));
10931092
}
10941093

datafusion/common/src/utils/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ use std::thread::available_parallelism;
7070
/// ```
7171
pub fn project_schema(
7272
schema: &SchemaRef,
73-
projection: Option<&Vec<usize>>,
73+
projection: Option<&[usize]>,
7474
) -> Result<SchemaRef> {
7575
let schema = match projection {
7676
Some(columns) => Arc::new(schema.project(columns)?),

datafusion/core/src/datasource/empty.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ impl TableProvider for EmptyTable {
7777
_limit: Option<usize>,
7878
) -> Result<Arc<dyn ExecutionPlan>> {
7979
// even though there is no data, projections apply
80-
let projected_schema = project_schema(&self.schema, projection)?;
80+
let projected_schema =
81+
project_schema(&self.schema, projection.map(AsRef::as_ref))?;
8182
Ok(Arc::new(
8283
EmptyExec::new(projected_schema).with_partitions(self.partitions),
8384
))

datafusion/core/src/physical_planner.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3661,13 +3661,15 @@ mod tests {
36613661

36623662
#[derive(Debug)]
36633663
struct NoOpExecutionPlan {
3664-
cache: PlanProperties,
3664+
cache: Arc<PlanProperties>,
36653665
}
36663666

36673667
impl NoOpExecutionPlan {
36683668
fn new(schema: SchemaRef) -> Self {
36693669
let cache = Self::compute_properties(schema);
3670-
Self { cache }
3670+
Self {
3671+
cache: Arc::new(cache),
3672+
}
36713673
}
36723674

36733675
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
@@ -3705,7 +3707,7 @@ mod tests {
37053707
self
37063708
}
37073709

3708-
fn properties(&self) -> &PlanProperties {
3710+
fn properties(&self) -> &Arc<PlanProperties> {
37093711
&self.cache
37103712
}
37113713

@@ -3859,7 +3861,7 @@ digraph {
38593861
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
38603862
self.0.iter().collect::<Vec<_>>()
38613863
}
3862-
fn properties(&self) -> &PlanProperties {
3864+
fn properties(&self) -> &Arc<PlanProperties> {
38633865
unimplemented!()
38643866
}
38653867
fn execute(
@@ -3908,7 +3910,7 @@ digraph {
39083910
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
39093911
unimplemented!()
39103912
}
3911-
fn properties(&self) -> &PlanProperties {
3913+
fn properties(&self) -> &Arc<PlanProperties> {
39123914
unimplemented!()
39133915
}
39143916
fn execute(
@@ -4029,7 +4031,7 @@ digraph {
40294031
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
40304032
vec![]
40314033
}
4032-
fn properties(&self) -> &PlanProperties {
4034+
fn properties(&self) -> &Arc<PlanProperties> {
40334035
unimplemented!()
40344036
}
40354037
fn execute(

0 commit comments

Comments
 (0)