Skip to content

Commit b48ae49

Browse files
committed
add fast-path for with_new_children
This patch aims to implement a fast-path for the ExecutionPlan::with_new_children function for some plans, moving closer to a physical plan re-use implementation and improving planning performance. If the passed children properties are the same as in self, we do not actually recompute self's properties (which could be costly if projection mapping is required). Instead, we just replace the children and re-use self's properties as-is. To be able to compare two different properties -- ExecutionPlan::properties(...) signature is modified and now returns `&Arc<PlanProperties>`. If `children` properties are the same in `with_new_children` -- we clone our properties arc and then a parent plan will consider our properties as unchanged, doing the same. Also, there are other improvenets, all changes: - Return `&Arc<PlanProperties>` from `ExecutionPlan::properties(...)` instead of a reference. - Implement `with_new_children` fast-path if there is no children properties changes for plans: * SortExec * RepartitionExec * ProjectionExec * FilterExec * CoalescePartitionsExec * AggregateExec - Export `reset_plan_states` function. - Store `Arc<[usize]>` instead of vector within `FilterExec`. - Store `Arc<[Arc<AggregateFunctionExpr>]>` instead of vec for aggr expr and filters. - Store `Arc<[ProjectionExpr]> instead of vec in `ProjectionExprs` struct.
1 parent d103d88 commit b48ae49

File tree

70 files changed

+666
-385
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+666
-385
lines changed

datafusion-examples/examples/custom_data_source/custom_datasource.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ impl TableProvider for CustomDataSource {
192192
struct CustomExec {
193193
db: CustomDataSource,
194194
projected_schema: SchemaRef,
195-
cache: PlanProperties,
195+
cache: Arc<PlanProperties>,
196196
}
197197

198198
impl CustomExec {
@@ -202,12 +202,13 @@ impl CustomExec {
202202
schema: SchemaRef,
203203
db: CustomDataSource,
204204
) -> Self {
205-
let projected_schema = project_schema(&schema, projections).unwrap();
205+
let projected_schema =
206+
project_schema(&schema, projections.map(AsRef::as_ref)).unwrap();
206207
let cache = Self::compute_properties(projected_schema.clone());
207208
Self {
208209
db,
209210
projected_schema,
210-
cache,
211+
cache: Arc::new(cache),
211212
}
212213
}
213214

@@ -238,7 +239,7 @@ impl ExecutionPlan for CustomExec {
238239
self
239240
}
240241

241-
fn properties(&self) -> &PlanProperties {
242+
fn properties(&self) -> &Arc<PlanProperties> {
242243
&self.cache
243244
}
244245

datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ impl ExternalBatchBufferer {
199199
struct BufferingExecutionPlan {
200200
schema: SchemaRef,
201201
input: Arc<dyn ExecutionPlan>,
202-
properties: PlanProperties,
202+
properties: Arc<PlanProperties>,
203203
}
204204

205205
impl BufferingExecutionPlan {
@@ -233,7 +233,7 @@ impl ExecutionPlan for BufferingExecutionPlan {
233233
self.schema.clone()
234234
}
235235

236-
fn properties(&self) -> &PlanProperties {
236+
fn properties(&self) -> &Arc<PlanProperties> {
237237
&self.properties
238238
}
239239

datafusion-examples/examples/proto/composed_extension_codec.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ impl ExecutionPlan for ParentExec {
106106
self
107107
}
108108

109-
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
109+
fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
110110
unreachable!()
111111
}
112112

@@ -182,7 +182,7 @@ impl ExecutionPlan for ChildExec {
182182
self
183183
}
184184

185-
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
185+
fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
186186
unreachable!()
187187
}
188188

datafusion-examples/examples/relation_planner/table_sample.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,7 @@ pub struct SampleExec {
610610
upper_bound: f64,
611611
seed: u64,
612612
metrics: ExecutionPlanMetricsSet,
613-
cache: PlanProperties,
613+
cache: Arc<PlanProperties>,
614614
}
615615

616616
impl SampleExec {
@@ -648,7 +648,7 @@ impl SampleExec {
648648
upper_bound,
649649
seed,
650650
metrics: ExecutionPlanMetricsSet::new(),
651-
cache,
651+
cache: Arc::new(cache),
652652
})
653653
}
654654

@@ -678,7 +678,7 @@ impl ExecutionPlan for SampleExec {
678678
self
679679
}
680680

681-
fn properties(&self) -> &PlanProperties {
681+
fn properties(&self) -> &Arc<PlanProperties> {
682682
&self.cache
683683
}
684684

datafusion/catalog-listing/src/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,7 @@ impl TableProvider for ListingTable {
522522

523523
// if no files need to be read, return an `EmptyExec`
524524
if partitioned_file_lists.is_empty() {
525-
let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
525+
let projected_schema = project_schema(&self.schema(), projection.as_deref())?;
526526
return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
527527
}
528528

datafusion/catalog/src/memory/table.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ fn evaluate_filters_to_mask(
549549
struct DmlResultExec {
550550
rows_affected: u64,
551551
schema: SchemaRef,
552-
properties: PlanProperties,
552+
properties: Arc<PlanProperties>,
553553
}
554554

555555
impl DmlResultExec {
@@ -570,7 +570,7 @@ impl DmlResultExec {
570570
Self {
571571
rows_affected,
572572
schema,
573-
properties,
573+
properties: Arc::new(properties),
574574
}
575575
}
576576
}
@@ -604,7 +604,7 @@ impl ExecutionPlan for DmlResultExec {
604604
Arc::clone(&self.schema)
605605
}
606606

607-
fn properties(&self) -> &PlanProperties {
607+
fn properties(&self) -> &Arc<PlanProperties> {
608608
&self.properties
609609
}
610610

datafusion/common/src/stats.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ impl Statistics {
391391
/// For example, if we had statistics for columns `{"a", "b", "c"}`,
392392
/// projecting to `vec![2, 1]` would return statistics for columns `{"c",
393393
/// "b"}`.
394-
pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self {
394+
pub fn project(mut self, projection: Option<&[usize]>) -> Self {
395395
let Some(projection) = projection else {
396396
return self;
397397
};
@@ -1066,29 +1066,28 @@ mod tests {
10661066

10671067
#[test]
10681068
fn test_project_none() {
1069-
let projection = None;
1070-
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1069+
let stats = make_stats(vec![10, 20, 30]).project(None);
10711070
assert_eq!(stats, make_stats(vec![10, 20, 30]));
10721071
}
10731072

10741073
#[test]
10751074
fn test_project_empty() {
1076-
let projection = Some(vec![]);
1077-
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1075+
let projection: Option<&[_]> = Some(&[]);
1076+
let stats = make_stats(vec![10, 20, 30]).project(projection);
10781077
assert_eq!(stats, make_stats(vec![]));
10791078
}
10801079

10811080
#[test]
10821081
fn test_project_swap() {
1083-
let projection = Some(vec![2, 1]);
1084-
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1082+
let projection: Option<&[_]> = Some(&[2, 1]);
1083+
let stats = make_stats(vec![10, 20, 30]).project(projection);
10851084
assert_eq!(stats, make_stats(vec![30, 20]));
10861085
}
10871086

10881087
#[test]
10891088
fn test_project_repeated() {
1090-
let projection = Some(vec![1, 2, 1, 1, 0, 2]);
1091-
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1089+
let projection: Option<&[_]> = Some(&[1, 2, 1, 1, 0, 2]);
1090+
let stats = make_stats(vec![10, 20, 30]).project(projection);
10921091
assert_eq!(stats, make_stats(vec![20, 30, 20, 20, 10, 30]));
10931092
}
10941093

datafusion/common/src/utils/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ use std::thread::available_parallelism;
7070
/// ```
7171
pub fn project_schema(
7272
schema: &SchemaRef,
73-
projection: Option<&Vec<usize>>,
73+
projection: Option<&[usize]>,
7474
) -> Result<SchemaRef> {
7575
let schema = match projection {
7676
Some(columns) => Arc::new(schema.project(columns)?),

datafusion/core/src/datasource/empty.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ impl TableProvider for EmptyTable {
7777
_limit: Option<usize>,
7878
) -> Result<Arc<dyn ExecutionPlan>> {
7979
// even though there is no data, projections apply
80-
let projected_schema = project_schema(&self.schema, projection)?;
80+
let projected_schema =
81+
project_schema(&self.schema, projection.map(AsRef::as_ref))?;
8182
Ok(Arc::new(
8283
EmptyExec::new(projected_schema).with_partitions(self.partitions),
8384
))

datafusion/core/src/physical_planner.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3656,13 +3656,15 @@ mod tests {
36563656

36573657
#[derive(Debug)]
36583658
struct NoOpExecutionPlan {
3659-
cache: PlanProperties,
3659+
cache: Arc<PlanProperties>,
36603660
}
36613661

36623662
impl NoOpExecutionPlan {
36633663
fn new(schema: SchemaRef) -> Self {
36643664
let cache = Self::compute_properties(schema);
3665-
Self { cache }
3665+
Self {
3666+
cache: Arc::new(cache),
3667+
}
36663668
}
36673669

36683670
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
@@ -3700,7 +3702,7 @@ mod tests {
37003702
self
37013703
}
37023704

3703-
fn properties(&self) -> &PlanProperties {
3705+
fn properties(&self) -> &Arc<PlanProperties> {
37043706
&self.cache
37053707
}
37063708

@@ -3854,7 +3856,7 @@ digraph {
38543856
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
38553857
self.0.iter().collect::<Vec<_>>()
38563858
}
3857-
fn properties(&self) -> &PlanProperties {
3859+
fn properties(&self) -> &Arc<PlanProperties> {
38583860
unimplemented!()
38593861
}
38603862
fn execute(
@@ -3903,7 +3905,7 @@ digraph {
39033905
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
39043906
unimplemented!()
39053907
}
3906-
fn properties(&self) -> &PlanProperties {
3908+
fn properties(&self) -> &Arc<PlanProperties> {
39073909
unimplemented!()
39083910
}
39093911
fn execute(
@@ -4024,7 +4026,7 @@ digraph {
40244026
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
40254027
vec![]
40264028
}
4027-
fn properties(&self) -> &PlanProperties {
4029+
fn properties(&self) -> &Arc<PlanProperties> {
40284030
unimplemented!()
40294031
}
40304032
fn execute(

0 commit comments

Comments
 (0)