Skip to content

Commit 21b8a3f

Browse files
committed
add fast-path for with_new_children
This patch aims to implement a fast-path for the ExecutionPlan::with_new_children function for some plans, moving closer to a physical plan re-use implementation and improving planning performance. If the passed children properties are the same as in self, we do not actually recompute self's properties (which could be costly if projection mapping is required). Instead, we just replace the children and re-use self's properties as-is. To be able to compare two different properties -- ExecutionPlan::properties(...) signature is modified and now returns `&Arc<PlanProperties>`. If `children` properties are the same in `with_new_children` -- we clone our properties arc and then a parent plan will consider our properties as unchanged, doing the same. Also, there are other improvements. The patch includes the following changes: - Return `&Arc<PlanProperties>` from `ExecutionPlan::properties(...)` instead of a reference. - Implement `with_new_children` fast-path if there is no children properties changes for all major plans. - Store `Arc<[usize]>` instead of vector within `FilterExec`. - Store `Arc<[usize]>` instead of vector within projection of `HashJoinExec`. - Store `Arc<[Arc<AggregateFunctionExpr>]>` instead of vec for aggr expr and filters. - Store `Arc<[ProjectionExpr]> instead of vec in `ProjectionExprs` struct. - Get `Option<&[usize]>` instead of option vec ref in `project_schema` -- it makes API more flexible. Note: currently, `reset_plan_states` does not allow to re-use plan in general: it is not supported for dynamic filters and recursive queries features, as in this case state reset should update pointers in the children plans. Closes #19796
1 parent 094e7ee commit 21b8a3f

File tree

74 files changed

+882
-458
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+882
-458
lines changed

datafusion-examples/examples/custom_data_source/custom_datasource.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ impl TableProvider for CustomDataSource {
192192
struct CustomExec {
193193
db: CustomDataSource,
194194
projected_schema: SchemaRef,
195-
cache: PlanProperties,
195+
cache: Arc<PlanProperties>,
196196
}
197197

198198
impl CustomExec {
@@ -202,12 +202,13 @@ impl CustomExec {
202202
schema: SchemaRef,
203203
db: CustomDataSource,
204204
) -> Self {
205-
let projected_schema = project_schema(&schema, projections).unwrap();
205+
let projected_schema =
206+
project_schema(&schema, projections.map(AsRef::as_ref)).unwrap();
206207
let cache = Self::compute_properties(projected_schema.clone());
207208
Self {
208209
db,
209210
projected_schema,
210-
cache,
211+
cache: Arc::new(cache),
211212
}
212213
}
213214

@@ -238,7 +239,7 @@ impl ExecutionPlan for CustomExec {
238239
self
239240
}
240241

241-
fn properties(&self) -> &PlanProperties {
242+
fn properties(&self) -> &Arc<PlanProperties> {
242243
&self.cache
243244
}
244245

datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ impl ExternalBatchBufferer {
199199
struct BufferingExecutionPlan {
200200
schema: SchemaRef,
201201
input: Arc<dyn ExecutionPlan>,
202-
properties: PlanProperties,
202+
properties: Arc<PlanProperties>,
203203
}
204204

205205
impl BufferingExecutionPlan {
@@ -233,7 +233,7 @@ impl ExecutionPlan for BufferingExecutionPlan {
233233
self.schema.clone()
234234
}
235235

236-
fn properties(&self) -> &PlanProperties {
236+
fn properties(&self) -> &Arc<PlanProperties> {
237237
&self.properties
238238
}
239239

datafusion-examples/examples/proto/composed_extension_codec.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ impl ExecutionPlan for ParentExec {
106106
self
107107
}
108108

109-
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
109+
fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
110110
unreachable!()
111111
}
112112

@@ -182,7 +182,7 @@ impl ExecutionPlan for ChildExec {
182182
self
183183
}
184184

185-
fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
185+
fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
186186
unreachable!()
187187
}
188188

datafusion-examples/examples/relation_planner/table_sample.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,7 @@ pub struct SampleExec {
618618
upper_bound: f64,
619619
seed: u64,
620620
metrics: ExecutionPlanMetricsSet,
621-
cache: PlanProperties,
621+
cache: Arc<PlanProperties>,
622622
}
623623

624624
impl SampleExec {
@@ -656,7 +656,7 @@ impl SampleExec {
656656
upper_bound,
657657
seed,
658658
metrics: ExecutionPlanMetricsSet::new(),
659-
cache,
659+
cache: Arc::new(cache),
660660
})
661661
}
662662

@@ -686,7 +686,7 @@ impl ExecutionPlan for SampleExec {
686686
self
687687
}
688688

689-
fn properties(&self) -> &PlanProperties {
689+
fn properties(&self) -> &Arc<PlanProperties> {
690690
&self.cache
691691
}
692692

datafusion/catalog-listing/src/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,7 @@ impl TableProvider for ListingTable {
522522

523523
// if no files need to be read, return an `EmptyExec`
524524
if partitioned_file_lists.is_empty() {
525-
let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
525+
let projected_schema = project_schema(&self.schema(), projection.as_deref())?;
526526
return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
527527
}
528528

datafusion/catalog/src/memory/table.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ fn evaluate_filters_to_mask(
549549
struct DmlResultExec {
550550
rows_affected: u64,
551551
schema: SchemaRef,
552-
properties: PlanProperties,
552+
properties: Arc<PlanProperties>,
553553
}
554554

555555
impl DmlResultExec {
@@ -570,7 +570,7 @@ impl DmlResultExec {
570570
Self {
571571
rows_affected,
572572
schema,
573-
properties,
573+
properties: Arc::new(properties),
574574
}
575575
}
576576
}
@@ -604,7 +604,7 @@ impl ExecutionPlan for DmlResultExec {
604604
Arc::clone(&self.schema)
605605
}
606606

607-
fn properties(&self) -> &PlanProperties {
607+
fn properties(&self) -> &Arc<PlanProperties> {
608608
&self.properties
609609
}
610610

datafusion/common/src/stats.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ impl Statistics {
391391
/// For example, if we had statistics for columns `{"a", "b", "c"}`,
392392
/// projecting to `vec![2, 1]` would return statistics for columns `{"c",
393393
/// "b"}`.
394-
pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self {
394+
pub fn project(mut self, projection: Option<&[usize]>) -> Self {
395395
let Some(projection) = projection else {
396396
return self;
397397
};
@@ -1066,29 +1066,28 @@ mod tests {
10661066

10671067
#[test]
10681068
fn test_project_none() {
1069-
let projection = None;
1070-
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1069+
let stats = make_stats(vec![10, 20, 30]).project(None);
10711070
assert_eq!(stats, make_stats(vec![10, 20, 30]));
10721071
}
10731072

10741073
#[test]
10751074
fn test_project_empty() {
1076-
let projection = Some(vec![]);
1077-
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1075+
let projection: Option<&[_]> = Some(&[]);
1076+
let stats = make_stats(vec![10, 20, 30]).project(projection);
10781077
assert_eq!(stats, make_stats(vec![]));
10791078
}
10801079

10811080
#[test]
10821081
fn test_project_swap() {
1083-
let projection = Some(vec![2, 1]);
1084-
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1082+
let projection: Option<&[_]> = Some(&[2, 1]);
1083+
let stats = make_stats(vec![10, 20, 30]).project(projection);
10851084
assert_eq!(stats, make_stats(vec![30, 20]));
10861085
}
10871086

10881087
#[test]
10891088
fn test_project_repeated() {
1090-
let projection = Some(vec![1, 2, 1, 1, 0, 2]);
1091-
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1089+
let projection: Option<&[_]> = Some(&[1, 2, 1, 1, 0, 2]);
1090+
let stats = make_stats(vec![10, 20, 30]).project(projection);
10921091
assert_eq!(stats, make_stats(vec![20, 30, 20, 20, 10, 30]));
10931092
}
10941093

datafusion/common/src/utils/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ use std::thread::available_parallelism;
5959
///
6060
/// // Pick columns 'c' and 'b'
6161
/// let projection = Some(vec![2, 1]);
62-
/// let projected_schema = project_schema(&schema, projection.as_ref()).unwrap();
62+
/// let projected_schema = project_schema(&schema, projection.as_deref()).unwrap();
6363
///
6464
/// let expected_schema = SchemaRef::new(Schema::new(vec![
6565
/// Field::new("c", DataType::Utf8, true),
@@ -70,7 +70,7 @@ use std::thread::available_parallelism;
7070
/// ```
7171
pub fn project_schema(
7272
schema: &SchemaRef,
73-
projection: Option<&Vec<usize>>,
73+
projection: Option<&[usize]>,
7474
) -> Result<SchemaRef> {
7575
let schema = match projection {
7676
Some(columns) => Arc::new(schema.project(columns)?),

datafusion/core/benches/reset_plan_states.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ fn run_reset_states(b: &mut criterion::Bencher, plan: &Arc<dyn ExecutionPlan>) {
166166
/// making an independent instance of the execution plan to re-execute it, avoiding
167167
/// re-planning stage.
168168
fn bench_reset_plan_states(c: &mut Criterion) {
169+
env_logger::init();
170+
169171
let rt = Runtime::new().unwrap();
170172
let ctx = SessionContext::new();
171173
ctx.register_table(

datafusion/core/src/datasource/empty.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ impl TableProvider for EmptyTable {
7777
_limit: Option<usize>,
7878
) -> Result<Arc<dyn ExecutionPlan>> {
7979
// even though there is no data, projections apply
80-
let projected_schema = project_schema(&self.schema, projection)?;
80+
let projected_schema =
81+
project_schema(&self.schema, projection.map(AsRef::as_ref))?;
8182
Ok(Arc::new(
8283
EmptyExec::new(projected_schema).with_partitions(self.partitions),
8384
))

0 commit comments

Comments
 (0)