Skip to content

Commit e28f292

Browse files
committed
physical plan: add plan re-use benchmark
This patch adds a benchmark which is intended to measure overhead of actions, required to perform making an independent instance of the execution plan to re-execute it, avoiding re-planning stage. There are several typical plans that are tested, covering projection, aggregation, filtration, re-partition. Also, the function `reset_plan_states(...)` is publically exported.
1 parent 4c67d02 commit e28f292

File tree

5 files changed

+374
-16
lines changed

5 files changed

+374
-16
lines changed

datafusion/physical-plan/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ tokio = { workspace = true, features = [
9090
harness = false
9191
name = "partial_ordering"
9292

93+
[[bench]]
94+
harness = false
95+
name = "plan_reuse"
96+
9397
[[bench]]
9498
harness = false
9599
name = "spill_io"
Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::sync::{Arc, LazyLock};
19+
20+
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
21+
use criterion::{Criterion, criterion_group, criterion_main};
22+
use datafusion_common::{NullEquality, Result};
23+
use datafusion_execution::TaskContext;
24+
use datafusion_expr::{JoinType, Operator};
25+
use datafusion_functions_aggregate::average::avg_udaf;
26+
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
27+
use datafusion_physical_expr::expressions::{self, binary, lit};
28+
use datafusion_physical_expr::projection::ProjectionExpr;
29+
use datafusion_physical_expr::{Partitioning, PhysicalExpr};
30+
use datafusion_physical_plan::aggregates::{
31+
AggregateExec, AggregateMode, PhysicalGroupBy,
32+
};
33+
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
34+
use datafusion_physical_plan::execution_plan::reset_plan_states;
35+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
36+
use datafusion_physical_plan::projection::ProjectionExec;
37+
use datafusion_physical_plan::repartition::RepartitionExec;
38+
use datafusion_physical_plan::{
39+
ExecutionPlan, execute_stream, filter::FilterExec, test::TestMemoryExec,
40+
};
41+
42+
const NUM_FIELDS: usize = 1000;
43+
44+
static SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
45+
Arc::new(Schema::new(
46+
(0..NUM_FIELDS)
47+
.map(|i| Arc::new(Field::new(format!("x_{i}"), DataType::Int64, false)))
48+
.collect::<Fields>(),
49+
))
50+
});
51+
52+
fn partitioning() -> Partitioning {
53+
Partitioning::RoundRobinBatch(16)
54+
}
55+
56+
fn col_name(i: usize) -> String {
57+
format!("x_{i}")
58+
}
59+
60+
fn aggr_name(i: usize) -> String {
61+
format!("aggr({})", col_name(i))
62+
}
63+
64+
fn col(i: usize) -> Arc<dyn PhysicalExpr> {
65+
expressions::col(&col_name(i), &SCHEMA).unwrap()
66+
}
67+
68+
/// Returns a typical plan for the query like:
69+
///
70+
/// ```sql
71+
/// SELECT aggr1(col1) as aggr1, aggr2(col2) as aggr2 FROM t
72+
/// WHERE p1
73+
/// HAVING p2
74+
/// ```
75+
///
76+
/// A plan looks like:
77+
///
78+
/// ```text
79+
/// FilterExec
80+
/// AggregateExec: mode=Final
81+
/// CoalescePartitionsExec
82+
/// AggregateExec: mode=Partial
83+
/// RepartitionExec
84+
/// FilterExec
85+
/// TestMemoryExec
86+
/// ```
87+
///
88+
fn query1_plan() -> Result<Arc<dyn ExecutionPlan>> {
89+
let schema = Arc::clone(&SCHEMA);
90+
let input = TestMemoryExec::try_new(&[vec![]], Arc::clone(&schema), None)?;
91+
92+
let plan = FilterExec::try_new(
93+
// Some predicate.
94+
binary(
95+
binary(col(0), Operator::Eq, col(1), &schema)?,
96+
Operator::And,
97+
binary(col(2), Operator::Eq, lit(42_i64), &schema)?,
98+
&schema,
99+
)?,
100+
Arc::new(input),
101+
)?;
102+
103+
let plan = RepartitionExec::try_new(Arc::new(plan), partitioning())?;
104+
105+
let plan = {
106+
// Partial aggregation.
107+
let aggr_expr = (0..NUM_FIELDS)
108+
.map(|i| {
109+
AggregateExprBuilder::new(avg_udaf(), vec![col(i)])
110+
.schema(Arc::clone(&schema))
111+
.alias(aggr_name(i))
112+
.build()
113+
.map(Arc::new)
114+
})
115+
.collect::<Result<Vec<_>>>()?;
116+
let filter_expr = (0..aggr_expr.len()).map(|_| None).collect();
117+
118+
AggregateExec::try_new(
119+
AggregateMode::Partial,
120+
PhysicalGroupBy::new(vec![], vec![], vec![], false),
121+
aggr_expr,
122+
filter_expr,
123+
Arc::new(plan),
124+
Arc::clone(&schema),
125+
)?
126+
};
127+
128+
let plan = CoalescePartitionsExec::new(Arc::new(plan));
129+
130+
let schema = plan.schema();
131+
let plan = {
132+
// Final aggregation.
133+
let aggr_expr = (0..NUM_FIELDS)
134+
.map(|i| {
135+
AggregateExprBuilder::new(
136+
avg_udaf(),
137+
vec![Arc::new(expressions::Column::new(&aggr_name(i), i))],
138+
)
139+
.schema(Arc::clone(&schema))
140+
.alias(aggr_name(i))
141+
.build()
142+
.map(Arc::new)
143+
})
144+
.collect::<Result<Vec<_>>>()?;
145+
let filter_expr = (0..aggr_expr.len()).map(|_| None).collect();
146+
147+
AggregateExec::try_new(
148+
AggregateMode::Partial,
149+
PhysicalGroupBy::new(vec![], vec![], vec![], false),
150+
aggr_expr,
151+
filter_expr,
152+
Arc::new(plan),
153+
Arc::clone(&schema),
154+
)?
155+
};
156+
157+
let schema = plan.schema();
158+
let plan = {
159+
let predicate = (0..schema.fields.len()).fold(lit(true), |expr, i| {
160+
binary(
161+
expr,
162+
Operator::And,
163+
binary(
164+
Arc::new(expressions::Column::new(schema.field(i).name(), i)),
165+
Operator::Gt,
166+
lit(i as i64),
167+
&schema,
168+
)
169+
.unwrap(),
170+
&schema,
171+
)
172+
.unwrap()
173+
});
174+
175+
FilterExec::try_new(predicate, Arc::new(plan))?
176+
};
177+
178+
Ok(Arc::new(plan))
179+
}
180+
181+
/// Returns a typical plan for the query like:
182+
///
183+
/// ```sql
184+
/// SELECT projection FROM t1 JOIN t2 ON t1.a = t2.a
185+
/// WHERE p1
186+
/// ```
187+
///
188+
/// A plan looks like:
189+
///
190+
/// ```text
191+
/// HashJoinExec
192+
/// RepartitionExec
193+
/// FilterExec
194+
/// TestMemoryExec
195+
/// RepartitionExec
196+
/// FilterExec
197+
/// TestMemoryExec
198+
/// ```
199+
///
200+
fn query2_plan() -> Result<Arc<dyn ExecutionPlan>> {
201+
let schema = Arc::clone(&SCHEMA);
202+
let t: Arc<dyn ExecutionPlan> = Arc::new(TestMemoryExec::try_new(
203+
&[vec![]],
204+
Arc::clone(&schema),
205+
None,
206+
)?);
207+
208+
// Some predicate.
209+
let predicate = (0..schema.fields.len().min(15)).fold(lit(true), |expr, i| {
210+
binary(
211+
expr,
212+
Operator::And,
213+
binary(
214+
Arc::new(expressions::Column::new(schema.field(i).name(), i)),
215+
Operator::Eq,
216+
lit(i as i64),
217+
&schema,
218+
)
219+
.unwrap(),
220+
&schema,
221+
)
222+
.unwrap()
223+
});
224+
225+
let left = RepartitionExec::try_new(
226+
Arc::new(FilterExec::try_new(Arc::clone(&predicate), Arc::clone(&t))?),
227+
partitioning(),
228+
)?;
229+
230+
let right = RepartitionExec::try_new(
231+
Arc::new(FilterExec::try_new(Arc::clone(&predicate), Arc::clone(&t))?),
232+
partitioning(),
233+
)?;
234+
235+
let projection = Some((0..schema.fields.len()).step_by(15).collect());
236+
let join = HashJoinExec::try_new(
237+
Arc::new(left),
238+
Arc::new(right),
239+
vec![(col(0), col(0))],
240+
None,
241+
&JoinType::Left,
242+
projection,
243+
PartitionMode::Partitioned,
244+
NullEquality::NullEqualsNothing,
245+
false,
246+
)?;
247+
248+
Ok(Arc::new(join))
249+
}
250+
251+
/// Returns a typical plan for the query like:
252+
///
253+
/// ```sql
254+
/// SELECT non_trivial_projection FROM t
255+
/// WHERE p
256+
/// ```
257+
///
258+
/// A plan looks like:
259+
///
260+
/// ```text
261+
/// ProjectionExec
262+
/// RepartitionExec
263+
/// FilterExec
264+
/// TestMemoryExec
265+
/// ```
266+
///
267+
fn query3_plan() -> Result<Arc<dyn ExecutionPlan>> {
268+
let schema = Arc::clone(&SCHEMA);
269+
let t: Arc<dyn ExecutionPlan> = Arc::new(TestMemoryExec::try_new(
270+
&[vec![]],
271+
Arc::clone(&schema),
272+
None,
273+
)?);
274+
275+
// Some predicate.
276+
let predicate = (0..schema.fields.len().min(15)).fold(lit(true), |expr, i| {
277+
binary(
278+
expr,
279+
Operator::And,
280+
binary(
281+
Arc::new(expressions::Column::new(schema.field(i).name(), i)),
282+
Operator::Eq,
283+
lit(i as i64),
284+
&schema,
285+
)
286+
.unwrap(),
287+
&schema,
288+
)
289+
.unwrap()
290+
});
291+
292+
let plan = FilterExec::try_new(predicate, t)?;
293+
let plan = RepartitionExec::try_new(Arc::new(plan), partitioning())?;
294+
let plan = ProjectionExec::try_new(
295+
(0..schema.fields.len() / 2).map(|i| {
296+
ProjectionExpr::new(
297+
binary(col(i * 2), Operator::Plus, col(i * 2 + 1), &schema).unwrap(),
298+
col_name(i),
299+
)
300+
}),
301+
Arc::new(plan),
302+
)?;
303+
304+
Ok(Arc::new(plan))
305+
}
306+
307+
fn reuse_plan(
308+
b: &mut criterion::Bencher,
309+
rt_handle: &tokio::runtime::Handle,
310+
task_ctx: &Arc<TaskContext>,
311+
plan: &Arc<dyn ExecutionPlan>,
312+
) {
313+
b.iter(|| {
314+
let plan = reset_plan_states(Arc::clone(plan)).unwrap();
315+
std::hint::black_box(
316+
rt_handle.block_on(async { execute_stream(plan, Arc::clone(task_ctx)) }),
317+
)
318+
});
319+
}
320+
321+
/// Benchmark is intended to measure overhead of actions, required to perform
322+
/// making an independent instance of the execution plan to re-execute it, avoiding
323+
/// re-planning stage.
324+
fn bench_plan_reuse(c: &mut Criterion) {
325+
let task_ctx = Arc::new(TaskContext::default());
326+
let rt = tokio::runtime::Runtime::new().unwrap();
327+
328+
macro_rules! bench_query {
329+
($query_producer: expr) => {{
330+
let plan = $query_producer().unwrap();
331+
let task_ctx = Arc::clone(&task_ctx);
332+
let h = rt.handle().clone();
333+
move |b| reuse_plan(b, &h, &task_ctx, &plan)
334+
}};
335+
}
336+
337+
c.bench_function("query1", bench_query!(query1_plan));
338+
c.bench_function("query2", bench_query!(query2_plan));
339+
c.bench_function("query3", bench_query!(query3_plan));
340+
}
341+
342+
criterion_group!(benches, bench_plan_reuse);
343+
criterion_main!(benches);

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use crate::sort_pushdown::SortOrderPushdownResult;
2626
pub use crate::stream::EmptyRecordBatchStream;
2727

2828
pub use datafusion_common::hash_utils;
29+
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
2930
pub use datafusion_common::utils::project_schema;
3031
pub use datafusion_common::{ColumnStatistics, Statistics, internal_err};
3132
pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
@@ -1384,6 +1385,30 @@ pub fn check_not_null_constraints(
13841385
Ok(batch)
13851386
}
13861387

1388+
/// Make plan ready to be re-executed returning its clone with state reset for all nodes.
1389+
///
1390+
/// Some plans will change their internal states after execution, making them unable to be executed again.
1391+
/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan.
1392+
///
1393+
/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan.
1394+
/// However, if the data of the left table is derived from the work table, it will become outdated
1395+
/// as the work table changes. When the next iteration executes this plan again, we must clear the left table.
1396+
///
1397+
/// # Limitations
1398+
///
1399+
/// While this function enables plan reuse, it does not allow the same plan to be executed if it (OR):
1400+
///
1401+
/// * uses dynamic filters,
1402+
/// * represents a recursive query.
1403+
///
1404+
pub fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
1405+
plan.transform_up(|plan| {
1406+
let new_plan = Arc::clone(&plan).reset_state()?;
1407+
Ok(Transformed::yes(new_plan))
1408+
})
1409+
.data()
1410+
}
1411+
13871412
/// Utility function yielding a string representation of the given [`ExecutionPlan`].
13881413
pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
13891414
let formatted = displayable(plan.as_ref()).indent(true).to_string();

0 commit comments

Comments
 (0)