Skip to content

Commit fb13e41

Browse files
committed
add fast-path for with_new_children
This patch aims to implement a fast-path for the ExecutionPlan::with_new_children function for some plans, moving closer to a physical plan re-use implementation and improving planning performance. If the passed children properties are the same as in self, we do not actually recompute self's properties (which could be costly if projection mapping is required). Instead, we just replace the children and re-use self's properties as-is. To be able to compare two different properties -- ExecutionPlan::properties(...) signature is modified and now returns `&Arc<PlanProperties>`. If `children` properties are the same in `with_new_children` -- we clone our properties arc and then a parent plan will consider our properties as unchanged, doing the same. - Return `&Arc<PlanProperties>` from `ExecutionPlan::properties(...)` instead of a reference. - Implement `with_new_children` fast-path if there is no children properties changes for all major plans. Note: currently, `reset_plan_states` does not allow to re-use plan in general: it is not supported for dynamic filters and recursive queries features, as in this case state reset should update pointers in the children plans. Closes apache#19796
1 parent ca904b3 commit fb13e41

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+854
-369
lines changed

datafusion-examples/examples/custom_data_source/custom_datasource.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ impl TableProvider for CustomDataSource {
192192
struct CustomExec {
193193
db: CustomDataSource,
194194
projected_schema: SchemaRef,
195-
cache: PlanProperties,
195+
cache: Arc<PlanProperties>,
196196
}
197197

198198
impl CustomExec {
@@ -207,7 +207,7 @@ impl CustomExec {
207207
Self {
208208
db,
209209
projected_schema,
210-
cache,
210+
cache: Arc::new(cache),
211211
}
212212
}
213213

@@ -238,7 +238,7 @@ impl ExecutionPlan for CustomExec {
238238
self
239239
}
240240

241-
fn properties(&self) -> &PlanProperties {
241+
fn properties(&self) -> &Arc<PlanProperties> {
242242
&self.cache
243243
}
244244

datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ impl ExternalBatchBufferer {
199199
struct BufferingExecutionPlan {
200200
schema: SchemaRef,
201201
input: Arc<dyn ExecutionPlan>,
202-
properties: PlanProperties,
202+
properties: Arc<PlanProperties>,
203203
}
204204

205205
impl BufferingExecutionPlan {
@@ -233,7 +233,7 @@ impl ExecutionPlan for BufferingExecutionPlan {
233233
self.schema.clone()
234234
}
235235

236-
fn properties(&self) -> &PlanProperties {
236+
fn properties(&self) -> &Arc<PlanProperties> {
237237
&self.properties
238238
}
239239

datafusion-examples/examples/proto/composed_extension_codec.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ impl ExecutionPlan for ParentExec {
106106
self
107107
}
108108

109-
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
109+
fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
110110
unreachable!()
111111
}
112112

@@ -182,7 +182,7 @@ impl ExecutionPlan for ChildExec {
182182
self
183183
}
184184

185-
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
185+
fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
186186
unreachable!()
187187
}
188188

datafusion-examples/examples/relation_planner/table_sample.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,7 @@ pub struct SampleExec {
618618
upper_bound: f64,
619619
seed: u64,
620620
metrics: ExecutionPlanMetricsSet,
621-
cache: PlanProperties,
621+
cache: Arc<PlanProperties>,
622622
}
623623

624624
impl SampleExec {
@@ -656,7 +656,7 @@ impl SampleExec {
656656
upper_bound,
657657
seed,
658658
metrics: ExecutionPlanMetricsSet::new(),
659-
cache,
659+
cache: Arc::new(cache),
660660
})
661661
}
662662

@@ -686,7 +686,7 @@ impl ExecutionPlan for SampleExec {
686686
self
687687
}
688688

689-
fn properties(&self) -> &PlanProperties {
689+
fn properties(&self) -> &Arc<PlanProperties> {
690690
&self.cache
691691
}
692692

datafusion/catalog/src/memory/table.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ fn evaluate_filters_to_mask(
549549
struct DmlResultExec {
550550
rows_affected: u64,
551551
schema: SchemaRef,
552-
properties: PlanProperties,
552+
properties: Arc<PlanProperties>,
553553
}
554554

555555
impl DmlResultExec {
@@ -570,7 +570,7 @@ impl DmlResultExec {
570570
Self {
571571
rows_affected,
572572
schema,
573-
properties,
573+
properties: Arc::new(properties),
574574
}
575575
}
576576
}
@@ -604,7 +604,7 @@ impl ExecutionPlan for DmlResultExec {
604604
Arc::clone(&self.schema)
605605
}
606606

607-
fn properties(&self) -> &PlanProperties {
607+
fn properties(&self) -> &Arc<PlanProperties> {
608608
&self.properties
609609
}
610610

datafusion/core/benches/reset_plan_states.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ fn run_reset_states(b: &mut criterion::Bencher, plan: &Arc<dyn ExecutionPlan>) {
166166
/// making an independent instance of the execution plan to re-execute it, avoiding
167167
/// re-planning stage.
168168
fn bench_reset_plan_states(c: &mut Criterion) {
169+
env_logger::init();
170+
169171
let rt = Runtime::new().unwrap();
170172
let ctx = SessionContext::new();
171173
ctx.register_table(

datafusion/core/src/physical_planner.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3661,13 +3661,15 @@ mod tests {
36613661

36623662
#[derive(Debug)]
36633663
struct NoOpExecutionPlan {
3664-
cache: PlanProperties,
3664+
cache: Arc<PlanProperties>,
36653665
}
36663666

36673667
impl NoOpExecutionPlan {
36683668
fn new(schema: SchemaRef) -> Self {
36693669
let cache = Self::compute_properties(schema);
3670-
Self { cache }
3670+
Self {
3671+
cache: Arc::new(cache),
3672+
}
36713673
}
36723674

36733675
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
@@ -3705,7 +3707,7 @@ mod tests {
37053707
self
37063708
}
37073709

3708-
fn properties(&self) -> &PlanProperties {
3710+
fn properties(&self) -> &Arc<PlanProperties> {
37093711
&self.cache
37103712
}
37113713

@@ -3859,7 +3861,7 @@ digraph {
38593861
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
38603862
self.0.iter().collect::<Vec<_>>()
38613863
}
3862-
fn properties(&self) -> &PlanProperties {
3864+
fn properties(&self) -> &Arc<PlanProperties> {
38633865
unimplemented!()
38643866
}
38653867
fn execute(
@@ -3908,7 +3910,7 @@ digraph {
39083910
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
39093911
unimplemented!()
39103912
}
3911-
fn properties(&self) -> &PlanProperties {
3913+
fn properties(&self) -> &Arc<PlanProperties> {
39123914
unimplemented!()
39133915
}
39143916
fn execute(
@@ -4029,7 +4031,7 @@ digraph {
40294031
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
40304032
vec![]
40314033
}
4032-
fn properties(&self) -> &PlanProperties {
4034+
fn properties(&self) -> &Arc<PlanProperties> {
40334035
unimplemented!()
40344036
}
40354037
fn execute(

datafusion/core/tests/custom_sources_cases/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ struct CustomTableProvider;
7979
#[derive(Debug, Clone)]
8080
struct CustomExecutionPlan {
8181
projection: Option<Vec<usize>>,
82-
cache: PlanProperties,
82+
cache: Arc<PlanProperties>,
8383
}
8484

8585
impl CustomExecutionPlan {
@@ -88,7 +88,10 @@ impl CustomExecutionPlan {
8888
let schema =
8989
project_schema(&schema, projection.as_ref()).expect("projected schema");
9090
let cache = Self::compute_properties(schema);
91-
Self { projection, cache }
91+
Self {
92+
projection,
93+
cache: Arc::new(cache),
94+
}
9295
}
9396

9497
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
@@ -157,7 +160,7 @@ impl ExecutionPlan for CustomExecutionPlan {
157160
self
158161
}
159162

160-
fn properties(&self) -> &PlanProperties {
163+
fn properties(&self) -> &Arc<PlanProperties> {
161164
&self.cache
162165
}
163166

datafusion/core/tests/custom_sources_cases/provider_filter_pushdown.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,16 @@ fn create_batch(value: i32, num_rows: usize) -> Result<RecordBatch> {
6262
#[derive(Debug)]
6363
struct CustomPlan {
6464
batches: Vec<RecordBatch>,
65-
cache: PlanProperties,
65+
cache: Arc<PlanProperties>,
6666
}
6767

6868
impl CustomPlan {
6969
fn new(schema: SchemaRef, batches: Vec<RecordBatch>) -> Self {
7070
let cache = Self::compute_properties(schema);
71-
Self { batches, cache }
71+
Self {
72+
batches,
73+
cache: Arc::new(cache),
74+
}
7275
}
7376

7477
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
@@ -109,7 +112,7 @@ impl ExecutionPlan for CustomPlan {
109112
self
110113
}
111114

112-
fn properties(&self) -> &PlanProperties {
115+
fn properties(&self) -> &Arc<PlanProperties> {
113116
&self.cache
114117
}
115118

datafusion/core/tests/custom_sources_cases/statistics.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use async_trait::async_trait;
4545
struct StatisticsValidation {
4646
stats: Statistics,
4747
schema: Arc<Schema>,
48-
cache: PlanProperties,
48+
cache: Arc<PlanProperties>,
4949
}
5050

5151
impl StatisticsValidation {
@@ -59,7 +59,7 @@ impl StatisticsValidation {
5959
Self {
6060
stats,
6161
schema,
62-
cache,
62+
cache: Arc::new(cache),
6363
}
6464
}
6565

@@ -158,7 +158,7 @@ impl ExecutionPlan for StatisticsValidation {
158158
self
159159
}
160160

161-
fn properties(&self) -> &PlanProperties {
161+
fn properties(&self) -> &Arc<PlanProperties> {
162162
&self.cache
163163
}
164164

0 commit comments

Comments
 (0)