Skip to content

Commit 1704d1e

Browse files
authored
refactor: extract the data generate out of aggregate_topk benchmark (#19523)
## Which issue does this PR close? None ## Rationale for this change when i run the benchmark for topk_aggregate, i find the mostly of time cost is generate the data, so the result is not real reflect the performance for topk_aggregate. ``` cargo bench --bench topk_aggregate ``` ### main branch ![main](https://github.com/user-attachments/assets/15bc0368-f870-4ad8-84f8-0b6e23e442cb) ### this pr ![this pr](https://github.com/user-attachments/assets/bf92ec25-217a-40aa-8b0f-2cf89bea8737) ## What changes are included in this PR? extract the data generate out of topk_aggregate benchmark ## Are these changes tested? ## Are there any user-facing changes?
1 parent d13d891 commit 1704d1e

File tree

1 file changed

+43
-81
lines changed

1 file changed

+43
-81
lines changed

datafusion/core/benches/topk_aggregate.rs

Lines changed: 43 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,21 @@ mod data_utils;
2020
use arrow::util::pretty::pretty_format_batches;
2121
use criterion::{Criterion, criterion_group, criterion_main};
2222
use data_utils::make_data;
23-
use datafusion::physical_plan::{ExecutionPlan, collect, displayable};
23+
use datafusion::physical_plan::{collect, displayable};
2424
use datafusion::prelude::SessionContext;
2525
use datafusion::{datasource::MemTable, error::Result};
26-
use datafusion_execution::TaskContext;
2726
use datafusion_execution::config::SessionConfig;
2827
use std::hint::black_box;
2928
use std::sync::Arc;
3029
use tokio::runtime::Runtime;
3130

3231
async fn create_context(
33-
limit: usize,
3432
partition_cnt: i32,
3533
sample_cnt: i32,
3634
asc: bool,
3735
use_topk: bool,
3836
use_view: bool,
39-
) -> Result<(Arc<dyn ExecutionPlan>, Arc<TaskContext>)> {
37+
) -> Result<SessionContext> {
4038
let (schema, parts) = make_data(partition_cnt, sample_cnt, asc, use_view).unwrap();
4139
let mem_table = Arc::new(MemTable::try_new(schema, parts).unwrap());
4240

@@ -46,32 +44,32 @@ async fn create_context(
4644
opts.optimizer.enable_topk_aggregation = use_topk;
4745
let ctx = SessionContext::new_with_config(cfg);
4846
let _ = ctx.register_table("traces", mem_table)?;
47+
48+
Ok(ctx)
49+
}
50+
51+
fn run(rt: &Runtime, ctx: SessionContext, limit: usize, use_topk: bool, asc: bool) {
52+
black_box(rt.block_on(async { aggregate(ctx, limit, use_topk, asc).await })).unwrap();
53+
}
54+
55+
async fn aggregate(
56+
ctx: SessionContext,
57+
limit: usize,
58+
use_topk: bool,
59+
asc: bool,
60+
) -> Result<()> {
4961
let sql = format!(
5062
"select max(timestamp_ms) from traces group by trace_id order by max(timestamp_ms) desc limit {limit};"
5163
);
5264
let df = ctx.sql(sql.as_str()).await?;
53-
let physical_plan = df.create_physical_plan().await?;
54-
let actual_phys_plan = displayable(physical_plan.as_ref()).indent(true).to_string();
65+
let plan = df.create_physical_plan().await?;
66+
let actual_phys_plan = displayable(plan.as_ref()).indent(true).to_string();
5567
assert_eq!(
5668
actual_phys_plan.contains(&format!("lim=[{limit}]")),
5769
use_topk
5870
);
5971

60-
Ok((physical_plan, ctx.task_ctx()))
61-
}
62-
63-
#[expect(clippy::needless_pass_by_value)]
64-
fn run(rt: &Runtime, plan: Arc<dyn ExecutionPlan>, ctx: Arc<TaskContext>, asc: bool) {
65-
black_box(rt.block_on(async { aggregate(plan.clone(), ctx.clone(), asc).await }))
66-
.unwrap();
67-
}
68-
69-
async fn aggregate(
70-
plan: Arc<dyn ExecutionPlan>,
71-
ctx: Arc<TaskContext>,
72-
asc: bool,
73-
) -> Result<()> {
74-
let batches = collect(plan, ctx).await?;
72+
let batches = collect(plan, ctx.task_ctx()).await?;
7573
assert_eq!(batches.len(), 1);
7674
let batch = batches.first().unwrap();
7775
assert_eq!(batch.num_rows(), 10);
@@ -107,106 +105,70 @@ fn criterion_benchmark(c: &mut Criterion) {
107105
let partitions = 10;
108106
let samples = 1_000_000;
109107

108+
let ctx = rt
109+
.block_on(create_context(partitions, samples, false, false, false))
110+
.unwrap();
110111
c.bench_function(
111112
format!("aggregate {} time-series rows", partitions * samples).as_str(),
112-
|b| {
113-
b.iter(|| {
114-
let real = rt.block_on(async {
115-
create_context(limit, partitions, samples, false, false, false)
116-
.await
117-
.unwrap()
118-
});
119-
run(&rt, real.0.clone(), real.1.clone(), false)
120-
})
121-
},
113+
|b| b.iter(|| run(&rt, ctx.clone(), limit, false, false)),
122114
);
123115

116+
let ctx = rt
117+
.block_on(create_context(partitions, samples, true, false, false))
118+
.unwrap();
124119
c.bench_function(
125120
format!("aggregate {} worst-case rows", partitions * samples).as_str(),
126-
|b| {
127-
b.iter(|| {
128-
let asc = rt.block_on(async {
129-
create_context(limit, partitions, samples, true, false, false)
130-
.await
131-
.unwrap()
132-
});
133-
run(&rt, asc.0.clone(), asc.1.clone(), true)
134-
})
135-
},
121+
|b| b.iter(|| run(&rt, ctx.clone(), limit, false, true)),
136122
);
137123

124+
let ctx = rt
125+
.block_on(create_context(partitions, samples, false, true, false))
126+
.unwrap();
138127
c.bench_function(
139128
format!(
140129
"top k={limit} aggregate {} time-series rows",
141130
partitions * samples
142131
)
143132
.as_str(),
144-
|b| {
145-
b.iter(|| {
146-
let topk_real = rt.block_on(async {
147-
create_context(limit, partitions, samples, false, true, false)
148-
.await
149-
.unwrap()
150-
});
151-
run(&rt, topk_real.0.clone(), topk_real.1.clone(), false)
152-
})
153-
},
133+
|b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)),
154134
);
155135

136+
let ctx = rt
137+
.block_on(create_context(partitions, samples, true, true, false))
138+
.unwrap();
156139
c.bench_function(
157140
format!(
158141
"top k={limit} aggregate {} worst-case rows",
159142
partitions * samples
160143
)
161144
.as_str(),
162-
|b| {
163-
b.iter(|| {
164-
let topk_asc = rt.block_on(async {
165-
create_context(limit, partitions, samples, true, true, false)
166-
.await
167-
.unwrap()
168-
});
169-
run(&rt, topk_asc.0.clone(), topk_asc.1.clone(), true)
170-
})
171-
},
145+
|b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)),
172146
);
173147

174148
// Utf8View schema,time-series rows
149+
let ctx = rt
150+
.block_on(create_context(partitions, samples, false, true, true))
151+
.unwrap();
175152
c.bench_function(
176153
format!(
177154
"top k={limit} aggregate {} time-series rows [Utf8View]",
178155
partitions * samples
179156
)
180157
.as_str(),
181-
|b| {
182-
b.iter(|| {
183-
let topk_real = rt.block_on(async {
184-
create_context(limit, partitions, samples, false, true, true)
185-
.await
186-
.unwrap()
187-
});
188-
run(&rt, topk_real.0.clone(), topk_real.1.clone(), false)
189-
})
190-
},
158+
|b| b.iter(|| run(&rt, ctx.clone(), limit, true, false)),
191159
);
192160

193161
// Utf8View schema,worst-case rows
162+
let ctx = rt
163+
.block_on(create_context(partitions, samples, true, true, true))
164+
.unwrap();
194165
c.bench_function(
195166
format!(
196167
"top k={limit} aggregate {} worst-case rows [Utf8View]",
197168
partitions * samples
198169
)
199170
.as_str(),
200-
|b| {
201-
b.iter(|| {
202-
let topk_asc = rt.block_on(async {
203-
create_context(limit, partitions, samples, true, true, true)
204-
.await
205-
.unwrap()
206-
});
207-
run(&rt, topk_asc.0.clone(), topk_asc.1.clone(), true)
208-
})
209-
},
171+
|b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)),
210172
);
211173
}
212174

0 commit comments

Comments
 (0)