Skip to content

Commit 4275e02

Browse files
committed
add physical plan re-use benchmark
1 parent 7e009ff commit 4275e02

File tree

3 files changed

+197
-1
lines changed

3 files changed

+197
-1
lines changed

datafusion/physical-plan/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ tokio = { workspace = true, features = [
9292
harness = false
9393
name = "partial_ordering"
9494

95+
[[bench]]
96+
harness = false
97+
name = "plan_reuse"
98+
9599
[[bench]]
96100
harness = false
97101
name = "spill_io"
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
use std::sync::{Arc, LazyLock};
2+
3+
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
4+
use criterion::{Criterion, criterion_group, criterion_main};
5+
use datafusion_common::Result;
6+
use datafusion_common::tree_node::{Transformed, TreeNode};
7+
use datafusion_execution::TaskContext;
8+
use datafusion_expr::Operator;
9+
use datafusion_functions_aggregate::average::avg_udaf;
10+
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
11+
use datafusion_physical_expr::expressions::{self, binary, lit};
12+
use datafusion_physical_expr::{Partitioning, PhysicalExpr};
13+
use datafusion_physical_plan::aggregates::{
14+
AggregateExec, AggregateMode, PhysicalGroupBy,
15+
};
16+
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
17+
use datafusion_physical_plan::repartition::RepartitionExec;
18+
use datafusion_physical_plan::{
19+
ExecutionPlan, execute_stream, filter::FilterExec, test::TestMemoryExec,
20+
};
21+
22+
const NUM_FIELDS: usize = 1000;
23+
24+
static SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
25+
Arc::new(Schema::new(
26+
(0..NUM_FIELDS)
27+
.map(|i| Arc::new(Field::new(format!("x_{i}"), DataType::Int64, false)))
28+
.collect::<Fields>(),
29+
))
30+
});
31+
32+
fn partitioning() -> Partitioning {
33+
Partitioning::RoundRobinBatch(16)
34+
}
35+
36+
fn col_name(i: usize) -> String {
37+
format!("x_{i}")
38+
}
39+
40+
fn aggr_name(i: usize) -> String {
41+
format!("aggr({})", col_name(i))
42+
}
43+
44+
fn col(i: usize) -> Arc<dyn PhysicalExpr> {
45+
expressions::col(&col_name(i), &SCHEMA).unwrap()
46+
}
47+
48+
/// Returns a typical plan for the query like:
49+
///
50+
/// ```sql
51+
/// SELECT aggr1(col1) as aggr1, aggr2(col2) as aggr2 FROM t
52+
/// WHERE p1
53+
/// HAVING p2
54+
/// ```
55+
///
56+
/// A plan looks like:
57+
///
58+
/// ```text
59+
/// ProjectionExec
60+
/// FilterExec
61+
/// AggregateExec: mode=Final
62+
/// CoalescePartitionsExec
63+
/// AggregateExec: mode=Partial
64+
/// RepartitionExec
65+
/// FilterExec
66+
/// TestMemoryExec
67+
/// ```
68+
///
69+
fn query1_plan() -> Result<Arc<dyn ExecutionPlan>> {
70+
let schema = Arc::clone(&SCHEMA);
71+
let input = TestMemoryExec::try_new(&[vec![]], Arc::clone(&schema), None)?;
72+
73+
let plan = FilterExec::try_new(
74+
// Some predicate.
75+
binary(
76+
binary(col(0), Operator::Eq, col(1), &schema)?,
77+
Operator::And,
78+
binary(col(2), Operator::Eq, lit(42_i64), &schema)?,
79+
&schema,
80+
)?,
81+
Arc::new(input),
82+
)?;
83+
84+
let plan = RepartitionExec::try_new(Arc::new(plan), partitioning())?;
85+
86+
let plan = {
87+
// Partial aggregation.
88+
let aggr_expr = (0..NUM_FIELDS)
89+
.map(|i| {
90+
AggregateExprBuilder::new(avg_udaf(), vec![col(i)])
91+
.schema(Arc::clone(&schema))
92+
.alias(aggr_name(i))
93+
.build()
94+
.map(Arc::new)
95+
})
96+
.collect::<Result<Vec<_>>>()?;
97+
let filter_expr = (0..aggr_expr.len()).map(|_| None).collect();
98+
99+
AggregateExec::try_new(
100+
AggregateMode::Partial,
101+
PhysicalGroupBy::new(vec![], vec![], vec![], false),
102+
aggr_expr,
103+
filter_expr,
104+
Arc::new(plan),
105+
Arc::clone(&schema),
106+
)?
107+
};
108+
109+
let plan = CoalescePartitionsExec::new(Arc::new(plan));
110+
111+
let schema = plan.schema();
112+
let plan = {
113+
// Final aggregation.
114+
let aggr_expr = (0..NUM_FIELDS)
115+
.map(|i| {
116+
AggregateExprBuilder::new(
117+
avg_udaf(),
118+
vec![Arc::new(expressions::Column::new(&aggr_name(i), i))],
119+
)
120+
.schema(Arc::clone(&schema))
121+
.alias(aggr_name(i))
122+
.build()
123+
.map(Arc::new)
124+
})
125+
.collect::<Result<Vec<_>>>()?;
126+
let filter_expr = (0..aggr_expr.len()).map(|_| None).collect();
127+
128+
AggregateExec::try_new(
129+
AggregateMode::Partial,
130+
PhysicalGroupBy::new(vec![], vec![], vec![], false),
131+
aggr_expr,
132+
filter_expr,
133+
Arc::new(plan),
134+
Arc::clone(&schema),
135+
)?
136+
};
137+
138+
let schema = plan.schema();
139+
let plan = {
140+
let predicate = (0..schema.fields.len()).fold(lit(true), |expr, i| {
141+
binary(
142+
expr,
143+
Operator::And,
144+
binary(
145+
Arc::new(expressions::Column::new(schema.field(i).name(), i)),
146+
Operator::Gt,
147+
lit(i as i64),
148+
&schema,
149+
)
150+
.unwrap(),
151+
&schema,
152+
)
153+
.unwrap()
154+
});
155+
156+
FilterExec::try_new(predicate, Arc::new(plan))?
157+
};
158+
159+
Ok(Arc::new(plan))
160+
}
161+
162+
#[cfg(not(feature = "stateless_plan"))]
163+
fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
164+
plan.transform_up(|plan| {
165+
let new_plan = Arc::clone(&plan).reset_state()?;
166+
Ok(Transformed::yes(new_plan))
167+
})
168+
.unwrap()
169+
.data
170+
}
171+
172+
fn bench_plan_execute(c: &mut Criterion) {
173+
let task_ctx = Arc::new(TaskContext::default());
174+
let plan = query1_plan().unwrap();
175+
let rt = tokio::runtime::Runtime::new().unwrap();
176+
177+
c.bench_function("execute", |b| {
178+
b.iter(|| {
179+
#[cfg(not(feature = "stateless_plan"))]
180+
let plan = reset_plan_states(Arc::clone(&plan));
181+
182+
#[cfg(feature = "stateless_plan")]
183+
let plan = Arc::clone(&plan);
184+
185+
let _stream =
186+
rt.block_on(async { execute_stream(plan, Arc::clone(&task_ctx)) });
187+
});
188+
});
189+
}
190+
191+
criterion_group!(benches, bench_plan_execute);
192+
criterion_main!(benches);

datafusion/physical-plan/src/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ impl ExecutionPlan for TestMemoryExec {
148148
self: Arc<Self>,
149149
_: Vec<Arc<dyn ExecutionPlan>>,
150150
) -> Result<Arc<dyn ExecutionPlan>> {
151-
unimplemented!()
151+
Ok(self)
152152
}
153153

154154
fn repartitioned(

0 commit comments

Comments
 (0)