Skip to content

Commit 2506a7b

Browse files
committed
physical plan: add plan re-use benchmark
This patch adds a benchmark which is intended to measure overhead of actions, required to perform making an independent instance of the execution plan to re-execute it, avoiding re-planning stage. There are several typical plans that are tested, covering projection, aggregation, filtration, re-partition. Also, the function `reset_plan_states(...)` is publically exported.
1 parent 4c67d02 commit 2506a7b

File tree

5 files changed

+229
-16
lines changed

5 files changed

+229
-16
lines changed

datafusion/core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,3 +280,7 @@ name = "spm"
280280
harness = false
281281
name = "preserve_file_partitioning"
282282
required-features = ["parquet"]
283+
284+
[[bench]]
285+
harness = false
286+
name = "reset_plan_states"
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::{Arc, LazyLock};
19+
20+
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
21+
use criterion::{Criterion, criterion_group, criterion_main};
22+
use datafusion::prelude::SessionContext;
23+
use datafusion_catalog::MemTable;
24+
use datafusion_physical_plan::ExecutionPlan;
25+
use datafusion_physical_plan::displayable;
26+
use datafusion_physical_plan::execution_plan::reset_plan_states;
27+
use tokio::runtime::Runtime;
28+
29+
const NUM_FIELDS: usize = 1000;
30+
const PREDICATE_LEN: usize = 50;
31+
32+
static SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
33+
Arc::new(Schema::new(
34+
(0..NUM_FIELDS)
35+
.map(|i| Arc::new(Field::new(format!("x_{i}"), DataType::Int64, false)))
36+
.collect::<Fields>(),
37+
))
38+
});
39+
40+
fn col_name(i: usize) -> String {
41+
format!("x_{i}")
42+
}
43+
44+
fn aggr_name(i: usize) -> String {
45+
format!("aggr_{i}")
46+
}
47+
48+
fn physical_plan(
49+
ctx: &SessionContext,
50+
rt: &Runtime,
51+
sql: &str,
52+
) -> Arc<dyn ExecutionPlan> {
53+
rt.block_on(async {
54+
ctx.sql(sql)
55+
.await
56+
.unwrap()
57+
.create_physical_plan()
58+
.await
59+
.unwrap()
60+
})
61+
}
62+
63+
fn predicate(col_name: impl Fn(usize) -> String, len: usize) -> String {
64+
let mut predicate = String::new();
65+
for i in 0..len {
66+
if i > 0 {
67+
predicate.push_str(" AND ");
68+
}
69+
predicate.push_str(&col_name(i));
70+
predicate.push_str(" = ");
71+
predicate.push_str(&i.to_string());
72+
}
73+
predicate
74+
}
75+
76+
/// Returns a typical plan for the query like:
77+
///
78+
/// ```sql
79+
/// SELECT aggr1(col1) as aggr1, aggr2(col2) as aggr2 FROM t
80+
/// WHERE p1
81+
/// HAVING p2
82+
/// ```
83+
///
84+
/// Where `p1` and `p2` some long predicates.
85+
///
86+
fn query1() -> String {
87+
let mut query = String::new();
88+
query.push_str("SELECT ");
89+
for i in 0..NUM_FIELDS {
90+
if i > 0 {
91+
query.push_str(", ");
92+
}
93+
query.push_str("AVG(");
94+
query.push_str(&col_name(i));
95+
query.push_str(") AS ");
96+
query.push_str(&aggr_name(i));
97+
}
98+
query.push_str(" FROM t WHERE ");
99+
query.push_str(&predicate(col_name, PREDICATE_LEN));
100+
query.push_str(" HAVING ");
101+
query.push_str(&predicate(aggr_name, PREDICATE_LEN));
102+
query
103+
}
104+
105+
/// Returns a typical plan for the query like:
106+
///
107+
/// ```sql
108+
/// SELECT projection FROM t JOIN v ON t.a = v.a
109+
/// WHERE p1
110+
/// ```
111+
///
112+
fn query2() -> String {
113+
let mut query = String::new();
114+
query.push_str("SELECT ");
115+
for i in (0..NUM_FIELDS).step_by(2) {
116+
if i > 0 {
117+
query.push_str(", ");
118+
}
119+
if (i / 2) % 2 == 0 {
120+
query.push_str(&format!("t.{}", col_name(i)));
121+
} else {
122+
query.push_str(&format!("v.{}", col_name(i)));
123+
}
124+
}
125+
query.push_str(" FROM t JOIN v ON t.x_0 = v.x_0 WHERE ");
126+
127+
fn qualified_name(i: usize) -> String {
128+
format!("t.{}", col_name(i))
129+
}
130+
131+
query.push_str(&predicate(qualified_name, PREDICATE_LEN));
132+
query
133+
}
134+
135+
/// Returns a typical plan for the query like:
136+
///
137+
/// ```sql
138+
/// SELECT projection FROM t
139+
/// WHERE p
140+
/// ```
141+
///
142+
fn query3() -> String {
143+
let mut query = String::new();
144+
query.push_str("SELECT ");
145+
146+
// Create non-trivial projection.
147+
for i in 0..NUM_FIELDS / 2 {
148+
if i > 0 {
149+
query.push_str(", ");
150+
}
151+
query.push_str(&col_name(i * 2));
152+
query.push_str(" + ");
153+
query.push_str(&col_name(i * 2 + 1));
154+
}
155+
156+
query.push_str(" FROM t WHERE ");
157+
query.push_str(&predicate(col_name, PREDICATE_LEN));
158+
query
159+
}
160+
161+
fn bench_reset_plan_states(b: &mut criterion::Bencher, plan: &Arc<dyn ExecutionPlan>) {
162+
b.iter(|| std::hint::black_box(reset_plan_states(Arc::clone(plan)).unwrap()));
163+
}
164+
165+
/// Benchmark is intended to measure overhead of actions, required to perform
166+
/// making an independent instance of the execution plan to re-execute it, avoiding
167+
/// re-planning stage.
168+
fn bench_plan_reuse(c: &mut Criterion) {
169+
let rt = Runtime::new().unwrap();
170+
let ctx = SessionContext::new();
171+
ctx.register_table(
172+
"t",
173+
Arc::new(MemTable::try_new(Arc::clone(&SCHEMA), vec![vec![], vec![]]).unwrap()),
174+
)
175+
.unwrap();
176+
177+
ctx.register_table(
178+
"v",
179+
Arc::new(MemTable::try_new(Arc::clone(&SCHEMA), vec![vec![], vec![]]).unwrap()),
180+
)
181+
.unwrap();
182+
183+
macro_rules! bench_query {
184+
($query_producer: expr) => {{
185+
let sql = $query_producer();
186+
let plan = physical_plan(&ctx, &rt, &sql);
187+
log::debug!("plan:\n{}", displayable(plan.as_ref()).indent(true));
188+
move |b| bench_reset_plan_states(b, &plan)
189+
}};
190+
}
191+
192+
c.bench_function("query1", bench_query!(query1));
193+
c.bench_function("query2", bench_query!(query2));
194+
c.bench_function("query3", bench_query!(query3));
195+
}
196+
197+
criterion_group!(benches, bench_plan_reuse);
198+
criterion_main!(benches);

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use crate::sort_pushdown::SortOrderPushdownResult;
2626
pub use crate::stream::EmptyRecordBatchStream;
2727

2828
pub use datafusion_common::hash_utils;
29+
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
2930
pub use datafusion_common::utils::project_schema;
3031
pub use datafusion_common::{ColumnStatistics, Statistics, internal_err};
3132
pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
@@ -1384,6 +1385,30 @@ pub fn check_not_null_constraints(
13841385
Ok(batch)
13851386
}
13861387

1388+
/// Make plan ready to be re-executed returning its clone with state reset for all nodes.
1389+
///
1390+
/// Some plans will change their internal states after execution, making them unable to be executed again.
1391+
/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan.
1392+
///
1393+
/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan.
1394+
/// However, if the data of the left table is derived from the work table, it will become outdated
1395+
/// as the work table changes. When the next iteration executes this plan again, we must clear the left table.
1396+
///
1397+
/// # Limitations
1398+
///
1399+
/// While this function enables plan reuse, it does not allow the same plan to be executed if it (OR):
1400+
///
1401+
/// * uses dynamic filters,
1402+
/// * represents a recursive query.
1403+
///
1404+
pub fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
1405+
plan.transform_up(|plan| {
1406+
let new_plan = Arc::clone(&plan).reset_state()?;
1407+
Ok(Transformed::yes(new_plan))
1408+
})
1409+
.data()
1410+
}
1411+
13871412
/// Utility function yielding a string representation of the given [`ExecutionPlan`].
13881413
pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
13891414
let formatted = displayable(plan.as_ref()).indent(true).to_string();

datafusion/physical-plan/src/recursive_query.rs

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::task::{Context, Poll};
2424
use super::work_table::{ReservedBatches, WorkTable};
2525
use crate::aggregates::group_values::{GroupValues, new_group_values};
2626
use crate::aggregates::order::GroupOrdering;
27-
use crate::execution_plan::{Boundedness, EmissionType};
27+
use crate::execution_plan::{Boundedness, EmissionType, reset_plan_states};
2828
use crate::metrics::{
2929
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput,
3030
};
@@ -387,20 +387,6 @@ fn assign_work_table(
387387
.data()
388388
}
389389

390-
/// Some plans will change their internal states after execution, making them unable to be executed again.
391-
/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan.
392-
///
393-
/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan.
394-
/// However, if the data of the left table is derived from the work table, it will become outdated
395-
/// as the work table changes. When the next iteration executes this plan again, we must clear the left table.
396-
fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
397-
plan.transform_up(|plan| {
398-
let new_plan = Arc::clone(&plan).reset_state()?;
399-
Ok(Transformed::yes(new_plan))
400-
})
401-
.data()
402-
}
403-
404390
impl Stream for RecursiveQueryStream {
405391
type Item = Result<RecordBatch>;
406392

datafusion/physical-plan/src/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ impl ExecutionPlan for TestMemoryExec {
146146
self: Arc<Self>,
147147
_: Vec<Arc<dyn ExecutionPlan>>,
148148
) -> Result<Arc<dyn ExecutionPlan>> {
149-
unimplemented!()
149+
Ok(self)
150150
}
151151

152152
fn repartitioned(

0 commit comments

Comments
 (0)