Skip to content

Commit 0ee1d01

Browse files
committed
physical plan: add plan re-use benchmark
This patch adds a benchmark which is intended to measure overhead of actions, required to perform making an independent instance of the execution plan to re-execute it, avoiding re-planning stage. There are several typical plans that are tested, covering projection, aggregation, filtration, re-partition. Also, the function `reset_plan_states(...)` is publically exported.
1 parent 4c67d02 commit 0ee1d01

File tree

5 files changed

+355
-16
lines changed

5 files changed

+355
-16
lines changed

datafusion/physical-plan/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ tokio = { workspace = true, features = [
9090
harness = false
9191
name = "partial_ordering"
9292

93+
[[bench]]
94+
harness = false
95+
name = "plan_reuse"
96+
9397
[[bench]]
9498
harness = false
9599
name = "spill_io"
Lines changed: 326 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,326 @@
1+
use std::sync::{Arc, LazyLock};
2+
3+
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
4+
use criterion::{Criterion, criterion_group, criterion_main};
5+
use datafusion_common::{NullEquality, Result};
6+
use datafusion_execution::TaskContext;
7+
use datafusion_expr::{JoinType, Operator};
8+
use datafusion_functions_aggregate::average::avg_udaf;
9+
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
10+
use datafusion_physical_expr::expressions::{self, binary, lit};
11+
use datafusion_physical_expr::projection::ProjectionExpr;
12+
use datafusion_physical_expr::{Partitioning, PhysicalExpr};
13+
use datafusion_physical_plan::aggregates::{
14+
AggregateExec, AggregateMode, PhysicalGroupBy,
15+
};
16+
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
17+
use datafusion_physical_plan::execution_plan::reset_plan_states;
18+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
19+
use datafusion_physical_plan::projection::ProjectionExec;
20+
use datafusion_physical_plan::repartition::RepartitionExec;
21+
use datafusion_physical_plan::{
22+
ExecutionPlan, execute_stream, filter::FilterExec, test::TestMemoryExec,
23+
};
24+
25+
const NUM_FIELDS: usize = 1000;
26+
27+
static SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
28+
Arc::new(Schema::new(
29+
(0..NUM_FIELDS)
30+
.map(|i| Arc::new(Field::new(format!("x_{i}"), DataType::Int64, false)))
31+
.collect::<Fields>(),
32+
))
33+
});
34+
35+
fn partitioning() -> Partitioning {
36+
Partitioning::RoundRobinBatch(16)
37+
}
38+
39+
fn col_name(i: usize) -> String {
40+
format!("x_{i}")
41+
}
42+
43+
fn aggr_name(i: usize) -> String {
44+
format!("aggr({})", col_name(i))
45+
}
46+
47+
fn col(i: usize) -> Arc<dyn PhysicalExpr> {
48+
expressions::col(&col_name(i), &SCHEMA).unwrap()
49+
}
50+
51+
/// Returns a typical plan for the query like:
52+
///
53+
/// ```sql
54+
/// SELECT aggr1(col1) as aggr1, aggr2(col2) as aggr2 FROM t
55+
/// WHERE p1
56+
/// HAVING p2
57+
/// ```
58+
///
59+
/// A plan looks like:
60+
///
61+
/// ```text
62+
/// FilterExec
63+
/// AggregateExec: mode=Final
64+
/// CoalescePartitionsExec
65+
/// AggregateExec: mode=Partial
66+
/// RepartitionExec
67+
/// FilterExec
68+
/// TestMemoryExec
69+
/// ```
70+
///
71+
fn query1_plan() -> Result<Arc<dyn ExecutionPlan>> {
72+
let schema = Arc::clone(&SCHEMA);
73+
let input = TestMemoryExec::try_new(&[vec![]], Arc::clone(&schema), None)?;
74+
75+
let plan = FilterExec::try_new(
76+
// Some predicate.
77+
binary(
78+
binary(col(0), Operator::Eq, col(1), &schema)?,
79+
Operator::And,
80+
binary(col(2), Operator::Eq, lit(42_i64), &schema)?,
81+
&schema,
82+
)?,
83+
Arc::new(input),
84+
)?;
85+
86+
let plan = RepartitionExec::try_new(Arc::new(plan), partitioning())?;
87+
88+
let plan = {
89+
// Partial aggregation.
90+
let aggr_expr = (0..NUM_FIELDS)
91+
.map(|i| {
92+
AggregateExprBuilder::new(avg_udaf(), vec![col(i)])
93+
.schema(Arc::clone(&schema))
94+
.alias(aggr_name(i))
95+
.build()
96+
.map(Arc::new)
97+
})
98+
.collect::<Result<Vec<_>>>()?;
99+
let filter_expr = (0..aggr_expr.len()).map(|_| None).collect();
100+
101+
AggregateExec::try_new(
102+
AggregateMode::Partial,
103+
PhysicalGroupBy::new(vec![], vec![], vec![], false),
104+
aggr_expr,
105+
filter_expr,
106+
Arc::new(plan),
107+
Arc::clone(&schema),
108+
)?
109+
};
110+
111+
let plan = CoalescePartitionsExec::new(Arc::new(plan));
112+
113+
let schema = plan.schema();
114+
let plan = {
115+
// Final aggregation.
116+
let aggr_expr = (0..NUM_FIELDS)
117+
.map(|i| {
118+
AggregateExprBuilder::new(
119+
avg_udaf(),
120+
vec![Arc::new(expressions::Column::new(&aggr_name(i), i))],
121+
)
122+
.schema(Arc::clone(&schema))
123+
.alias(aggr_name(i))
124+
.build()
125+
.map(Arc::new)
126+
})
127+
.collect::<Result<Vec<_>>>()?;
128+
let filter_expr = (0..aggr_expr.len()).map(|_| None).collect();
129+
130+
AggregateExec::try_new(
131+
AggregateMode::Partial,
132+
PhysicalGroupBy::new(vec![], vec![], vec![], false),
133+
aggr_expr,
134+
filter_expr,
135+
Arc::new(plan),
136+
Arc::clone(&schema),
137+
)?
138+
};
139+
140+
let schema = plan.schema();
141+
let plan = {
142+
let predicate = (0..schema.fields.len()).fold(lit(true), |expr, i| {
143+
binary(
144+
expr,
145+
Operator::And,
146+
binary(
147+
Arc::new(expressions::Column::new(schema.field(i).name(), i)),
148+
Operator::Gt,
149+
lit(i as i64),
150+
&schema,
151+
)
152+
.unwrap(),
153+
&schema,
154+
)
155+
.unwrap()
156+
});
157+
158+
FilterExec::try_new(predicate, Arc::new(plan))?
159+
};
160+
161+
Ok(Arc::new(plan))
162+
}
163+
164+
/// Returns a typical plan for the query like:
165+
///
166+
/// ```sql
167+
/// SELECT projection FROM t1 JOIN t2 ON t1.a = t2.a
168+
/// WHERE p1
169+
/// ```
170+
///
171+
/// A plan looks like:
172+
///
173+
/// ```text
174+
/// HashJoinExec
175+
/// RepartitionExec
176+
/// FilterExec
177+
/// TestMemoryExec
178+
/// RepartitionExec
179+
/// FilterExec
180+
/// TestMemoryExec
181+
/// ```
182+
///
183+
fn query2_plan() -> Result<Arc<dyn ExecutionPlan>> {
184+
let schema = Arc::clone(&SCHEMA);
185+
let t: Arc<dyn ExecutionPlan> = Arc::new(TestMemoryExec::try_new(
186+
&[vec![]],
187+
Arc::clone(&schema),
188+
None,
189+
)?);
190+
191+
// Some predicate.
192+
let predicate = (0..schema.fields.len().min(15)).fold(lit(true), |expr, i| {
193+
binary(
194+
expr,
195+
Operator::And,
196+
binary(
197+
Arc::new(expressions::Column::new(schema.field(i).name(), i)),
198+
Operator::Eq,
199+
lit(i as i64),
200+
&schema,
201+
)
202+
.unwrap(),
203+
&schema,
204+
)
205+
.unwrap()
206+
});
207+
208+
let left = RepartitionExec::try_new(
209+
Arc::new(FilterExec::try_new(Arc::clone(&predicate), Arc::clone(&t))?),
210+
partitioning(),
211+
)?;
212+
213+
let right = RepartitionExec::try_new(
214+
Arc::new(FilterExec::try_new(Arc::clone(&predicate), Arc::clone(&t))?),
215+
partitioning(),
216+
)?;
217+
218+
let projection = Some((0..schema.fields.len()).step_by(15).collect());
219+
let join = HashJoinExec::try_new(
220+
Arc::new(left),
221+
Arc::new(right),
222+
vec![(col(0), col(0))],
223+
None,
224+
&JoinType::Left,
225+
projection,
226+
PartitionMode::Partitioned,
227+
NullEquality::NullEqualsNothing,
228+
false,
229+
)?;
230+
231+
Ok(Arc::new(join))
232+
}
233+
234+
/// Returns a typical plan for the query like:
235+
///
236+
/// ```sql
237+
/// SELECT non_trivial_projection FROM t
238+
/// WHERE p
239+
/// ```
240+
///
241+
/// A plan looks like:
242+
///
243+
/// ```text
244+
/// ProjectionExec
245+
/// RepartitionExec
246+
/// FilterExec
247+
/// TestMemoryExec
248+
/// ```
249+
///
250+
fn query3_plan() -> Result<Arc<dyn ExecutionPlan>> {
251+
let schema = Arc::clone(&SCHEMA);
252+
let t: Arc<dyn ExecutionPlan> = Arc::new(TestMemoryExec::try_new(
253+
&[vec![]],
254+
Arc::clone(&schema),
255+
None,
256+
)?);
257+
258+
// Some predicate.
259+
let predicate = (0..schema.fields.len().min(15)).fold(lit(true), |expr, i| {
260+
binary(
261+
expr,
262+
Operator::And,
263+
binary(
264+
Arc::new(expressions::Column::new(schema.field(i).name(), i)),
265+
Operator::Eq,
266+
lit(i as i64),
267+
&schema,
268+
)
269+
.unwrap(),
270+
&schema,
271+
)
272+
.unwrap()
273+
});
274+
275+
let plan = FilterExec::try_new(predicate, t)?;
276+
let plan = RepartitionExec::try_new(Arc::new(plan), partitioning())?;
277+
let plan = ProjectionExec::try_new(
278+
(0..schema.fields.len() / 2).map(|i| {
279+
ProjectionExpr::new(
280+
binary(col(i * 2), Operator::Plus, col(i * 2 + 1), &schema).unwrap(),
281+
col_name(i),
282+
)
283+
}),
284+
Arc::new(plan),
285+
)?;
286+
287+
Ok(Arc::new(plan))
288+
}
289+
290+
fn reuse_plan(
291+
b: &mut criterion::Bencher,
292+
rt_handle: &tokio::runtime::Handle,
293+
task_ctx: &Arc<TaskContext>,
294+
plan: &Arc<dyn ExecutionPlan>,
295+
) {
296+
b.iter(|| {
297+
let plan = reset_plan_states(Arc::clone(plan)).unwrap();
298+
std::hint::black_box(
299+
rt_handle.block_on(async { execute_stream(plan, Arc::clone(task_ctx)) }),
300+
)
301+
});
302+
}
303+
304+
/// Benchmark is intended to measure overhead of actions, required to perform
305+
/// making an independent instance of the execution plan to re-execute it, avoiding
306+
/// re-planning stage.
307+
fn bench_plan_reuse(c: &mut Criterion) {
308+
let task_ctx = Arc::new(TaskContext::default());
309+
let rt = tokio::runtime::Runtime::new().unwrap();
310+
311+
macro_rules! bench_query {
312+
($query_producer: expr) => {{
313+
let plan = $query_producer().unwrap();
314+
let task_ctx = Arc::clone(&task_ctx);
315+
let h = rt.handle().clone();
316+
move |b| reuse_plan(b, &h, &task_ctx, &plan)
317+
}};
318+
}
319+
320+
c.bench_function("query1", bench_query!(query1_plan));
321+
c.bench_function("query2", bench_query!(query2_plan));
322+
c.bench_function("query3", bench_query!(query3_plan));
323+
}
324+
325+
criterion_group!(benches, bench_plan_reuse);
326+
criterion_main!(benches);

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use crate::sort_pushdown::SortOrderPushdownResult;
2626
pub use crate::stream::EmptyRecordBatchStream;
2727

2828
pub use datafusion_common::hash_utils;
29+
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
2930
pub use datafusion_common::utils::project_schema;
3031
pub use datafusion_common::{ColumnStatistics, Statistics, internal_err};
3132
pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
@@ -1384,6 +1385,28 @@ pub fn check_not_null_constraints(
13841385
Ok(batch)
13851386
}
13861387

1388+
/// Make plan ready to be re-executed returning its clone with state reset for all nodes.
1389+
///
1390+
/// Some plans will change their internal states after execution, making them unable to be executed again.
1391+
/// This function uses [`ExecutionPlan::reset_state`] to reset any internal state within the plan.
1392+
///
1393+
/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan.
1394+
/// However, if the data of the left table is derived from the work table, it will become outdated
1395+
/// as the work table changes. When the next iteration executes this plan again, we must clear the left table.
1396+
///
1397+
/// # Note
1398+
///
1399+
/// While this function enables plan reuse, it does not allow the same plan to be executed concurrently if
1400+
/// it uses dynamic filters or represents a recursive query.
1401+
///
1402+
pub fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
1403+
plan.transform_up(|plan| {
1404+
let new_plan = Arc::clone(&plan).reset_state()?;
1405+
Ok(Transformed::yes(new_plan))
1406+
})
1407+
.data()
1408+
}
1409+
13871410
/// Utility function yielding a string representation of the given [`ExecutionPlan`].
13881411
pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
13891412
let formatted = displayable(plan.as_ref()).indent(true).to_string();

0 commit comments

Comments
 (0)