Skip to content

Commit 094e7ee

Browse files
authored
physical plan: add reset_plan_states , plan re-use benchmark (#19806)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Part of #19796 ## What changes are included in this PR? This patch adds a benchmark which is intended to measure overhead of actions, required to perform making an independent instance of the execution plan to re-execute it, avoiding re-planning stage. There are several typical plans that are tested, covering projection, aggregation, filtration, re-partition. ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> The function `reset_plan_states(...)` is publically exported.
1 parent c173de1 commit 094e7ee

File tree

5 files changed

+229
-16
lines changed

5 files changed

+229
-16
lines changed

datafusion/core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,3 +281,7 @@ name = "spm"
281281
harness = false
282282
name = "preserve_file_partitioning"
283283
required-features = ["parquet"]
284+
285+
[[bench]]
286+
harness = false
287+
name = "reset_plan_states"
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::{Arc, LazyLock};
19+
20+
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
21+
use criterion::{Criterion, criterion_group, criterion_main};
22+
use datafusion::prelude::SessionContext;
23+
use datafusion_catalog::MemTable;
24+
use datafusion_physical_plan::ExecutionPlan;
25+
use datafusion_physical_plan::displayable;
26+
use datafusion_physical_plan::execution_plan::reset_plan_states;
27+
use tokio::runtime::Runtime;
28+
29+
const NUM_FIELDS: usize = 1000;
30+
const PREDICATE_LEN: usize = 50;
31+
32+
static SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
33+
Arc::new(Schema::new(
34+
(0..NUM_FIELDS)
35+
.map(|i| Arc::new(Field::new(format!("x_{i}"), DataType::Int64, false)))
36+
.collect::<Fields>(),
37+
))
38+
});
39+
40+
fn col_name(i: usize) -> String {
41+
format!("x_{i}")
42+
}
43+
44+
fn aggr_name(i: usize) -> String {
45+
format!("aggr_{i}")
46+
}
47+
48+
fn physical_plan(
49+
ctx: &SessionContext,
50+
rt: &Runtime,
51+
sql: &str,
52+
) -> Arc<dyn ExecutionPlan> {
53+
rt.block_on(async {
54+
ctx.sql(sql)
55+
.await
56+
.unwrap()
57+
.create_physical_plan()
58+
.await
59+
.unwrap()
60+
})
61+
}
62+
63+
fn predicate(col_name: impl Fn(usize) -> String, len: usize) -> String {
64+
let mut predicate = String::new();
65+
for i in 0..len {
66+
if i > 0 {
67+
predicate.push_str(" AND ");
68+
}
69+
predicate.push_str(&col_name(i));
70+
predicate.push_str(" = ");
71+
predicate.push_str(&i.to_string());
72+
}
73+
predicate
74+
}
75+
76+
/// Returns a typical plan for the query like:
77+
///
78+
/// ```sql
79+
/// SELECT aggr1(col1) as aggr1, aggr2(col2) as aggr2 FROM t
80+
/// WHERE p1
81+
/// HAVING p2
82+
/// ```
83+
///
84+
/// Where `p1` and `p2` some long predicates.
85+
///
86+
fn query1() -> String {
87+
let mut query = String::new();
88+
query.push_str("SELECT ");
89+
for i in 0..NUM_FIELDS {
90+
if i > 0 {
91+
query.push_str(", ");
92+
}
93+
query.push_str("AVG(");
94+
query.push_str(&col_name(i));
95+
query.push_str(") AS ");
96+
query.push_str(&aggr_name(i));
97+
}
98+
query.push_str(" FROM t WHERE ");
99+
query.push_str(&predicate(col_name, PREDICATE_LEN));
100+
query.push_str(" HAVING ");
101+
query.push_str(&predicate(aggr_name, PREDICATE_LEN));
102+
query
103+
}
104+
105+
/// Returns a typical plan for the query like:
106+
///
107+
/// ```sql
108+
/// SELECT projection FROM t JOIN v ON t.a = v.a
109+
/// WHERE p1
110+
/// ```
111+
///
112+
fn query2() -> String {
113+
let mut query = String::new();
114+
query.push_str("SELECT ");
115+
for i in (0..NUM_FIELDS).step_by(2) {
116+
if i > 0 {
117+
query.push_str(", ");
118+
}
119+
if (i / 2) % 2 == 0 {
120+
query.push_str(&format!("t.{}", col_name(i)));
121+
} else {
122+
query.push_str(&format!("v.{}", col_name(i)));
123+
}
124+
}
125+
query.push_str(" FROM t JOIN v ON t.x_0 = v.x_0 WHERE ");
126+
127+
fn qualified_name(i: usize) -> String {
128+
format!("t.{}", col_name(i))
129+
}
130+
131+
query.push_str(&predicate(qualified_name, PREDICATE_LEN));
132+
query
133+
}
134+
135+
/// Returns a typical plan for the query like:
136+
///
137+
/// ```sql
138+
/// SELECT projection FROM t
139+
/// WHERE p
140+
/// ```
141+
///
142+
fn query3() -> String {
143+
let mut query = String::new();
144+
query.push_str("SELECT ");
145+
146+
// Create non-trivial projection.
147+
for i in 0..NUM_FIELDS / 2 {
148+
if i > 0 {
149+
query.push_str(", ");
150+
}
151+
query.push_str(&col_name(i * 2));
152+
query.push_str(" + ");
153+
query.push_str(&col_name(i * 2 + 1));
154+
}
155+
156+
query.push_str(" FROM t WHERE ");
157+
query.push_str(&predicate(col_name, PREDICATE_LEN));
158+
query
159+
}
160+
161+
fn run_reset_states(b: &mut criterion::Bencher, plan: &Arc<dyn ExecutionPlan>) {
162+
b.iter(|| std::hint::black_box(reset_plan_states(Arc::clone(plan)).unwrap()));
163+
}
164+
165+
/// Benchmark is intended to measure overhead of actions, required to perform
166+
/// making an independent instance of the execution plan to re-execute it, avoiding
167+
/// re-planning stage.
168+
fn bench_reset_plan_states(c: &mut Criterion) {
169+
let rt = Runtime::new().unwrap();
170+
let ctx = SessionContext::new();
171+
ctx.register_table(
172+
"t",
173+
Arc::new(MemTable::try_new(Arc::clone(&SCHEMA), vec![vec![], vec![]]).unwrap()),
174+
)
175+
.unwrap();
176+
177+
ctx.register_table(
178+
"v",
179+
Arc::new(MemTable::try_new(Arc::clone(&SCHEMA), vec![vec![], vec![]]).unwrap()),
180+
)
181+
.unwrap();
182+
183+
macro_rules! bench_query {
184+
($query_producer: expr) => {{
185+
let sql = $query_producer();
186+
let plan = physical_plan(&ctx, &rt, &sql);
187+
log::debug!("plan:\n{}", displayable(plan.as_ref()).indent(true));
188+
move |b| run_reset_states(b, &plan)
189+
}};
190+
}
191+
192+
c.bench_function("query1", bench_query!(query1));
193+
c.bench_function("query2", bench_query!(query2));
194+
c.bench_function("query3", bench_query!(query3));
195+
}
196+
197+
criterion_group!(benches, bench_reset_plan_states);
198+
criterion_main!(benches);

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use crate::sort_pushdown::SortOrderPushdownResult;
2626
pub use crate::stream::EmptyRecordBatchStream;
2727

2828
pub use datafusion_common::hash_utils;
29+
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
2930
pub use datafusion_common::utils::project_schema;
3031
pub use datafusion_common::{ColumnStatistics, Statistics, internal_err};
3132
pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
@@ -1384,6 +1385,30 @@ pub fn check_not_null_constraints(
13841385
Ok(batch)
13851386
}
13861387

1388+
/// Make plan ready to be re-executed returning its clone with state reset for all nodes.
1389+
///
1390+
/// Some plans will change their internal states after execution, making them unable to be executed again.
1391+
/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan.
1392+
///
1393+
/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan.
1394+
/// However, if the data of the left table is derived from the work table, it will become outdated
1395+
/// as the work table changes. When the next iteration executes this plan again, we must clear the left table.
1396+
///
1397+
/// # Limitations
1398+
///
1399+
/// While this function enables plan reuse, it does not allow the same plan to be executed if it (OR):
1400+
///
1401+
/// * uses dynamic filters,
1402+
/// * represents a recursive query.
1403+
///
1404+
pub fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
1405+
plan.transform_up(|plan| {
1406+
let new_plan = Arc::clone(&plan).reset_state()?;
1407+
Ok(Transformed::yes(new_plan))
1408+
})
1409+
.data()
1410+
}
1411+
13871412
/// Utility function yielding a string representation of the given [`ExecutionPlan`].
13881413
pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
13891414
let formatted = displayable(plan.as_ref()).indent(true).to_string();

datafusion/physical-plan/src/recursive_query.rs

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::task::{Context, Poll};
2424
use super::work_table::{ReservedBatches, WorkTable};
2525
use crate::aggregates::group_values::{GroupValues, new_group_values};
2626
use crate::aggregates::order::GroupOrdering;
27-
use crate::execution_plan::{Boundedness, EmissionType};
27+
use crate::execution_plan::{Boundedness, EmissionType, reset_plan_states};
2828
use crate::metrics::{
2929
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
3030
};
@@ -387,20 +387,6 @@ fn assign_work_table(
387387
.data()
388388
}
389389

390-
/// Some plans will change their internal states after execution, making them unable to be executed again.
391-
/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan.
392-
///
393-
/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan.
394-
/// However, if the data of the left table is derived from the work table, it will become outdated
395-
/// as the work table changes. When the next iteration executes this plan again, we must clear the left table.
396-
fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
397-
plan.transform_up(|plan| {
398-
let new_plan = Arc::clone(&plan).reset_state()?;
399-
Ok(Transformed::yes(new_plan))
400-
})
401-
.data()
402-
}
403-
404390
impl Stream for RecursiveQueryStream {
405391
type Item = Result<RecordBatch>;
406392

datafusion/physical-plan/src/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ impl ExecutionPlan for TestMemoryExec {
146146
self: Arc<Self>,
147147
_: Vec<Arc<dyn ExecutionPlan>>,
148148
) -> Result<Arc<dyn ExecutionPlan>> {
149-
unimplemented!()
149+
Ok(self)
150150
}
151151

152152
fn repartitioned(

0 commit comments

Comments
 (0)