Skip to content

Commit 049b06e

Browse files
committed
Add benchmark for string aggregate functions with topk optimization
1 parent 0491977 commit 049b06e

File tree

1 file changed

+81
-0
lines changed

1 file changed

+81
-0
lines changed

datafusion/core/benches/topk_aggregate.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ fn run(rt: &Runtime, ctx: SessionContext, limit: usize, use_topk: bool, asc: boo
5252
black_box(rt.block_on(async { aggregate(ctx, limit, use_topk, asc).await })).unwrap();
5353
}
5454

55+
fn run_string(rt: &Runtime, ctx: SessionContext, limit: usize, use_topk: bool) {
56+
black_box(rt.block_on(async { aggregate_string(ctx, limit, use_topk).await }))
57+
.unwrap();
58+
}
59+
5560
async fn aggregate(
5661
ctx: SessionContext,
5762
limit: usize,
@@ -99,6 +104,33 @@ async fn aggregate(
99104
Ok(())
100105
}
101106

107+
/// Benchmark for string aggregate functions with topk optimization.
108+
/// This tests grouping by a numeric column (timestamp_ms) and aggregating
109+
/// a string column (trace_id) with Utf8 or Utf8View data types.
110+
async fn aggregate_string(
111+
ctx: SessionContext,
112+
limit: usize,
113+
use_topk: bool,
114+
) -> Result<()> {
115+
let sql = format!(
116+
"select max(trace_id) from traces group by timestamp_ms order by max(trace_id) desc limit {limit};"
117+
);
118+
let df = ctx.sql(sql.as_str()).await?;
119+
let plan = df.create_physical_plan().await?;
120+
let actual_phys_plan = displayable(plan.as_ref()).indent(true).to_string();
121+
assert_eq!(
122+
actual_phys_plan.contains(&format!("lim=[{limit}]")),
123+
use_topk
124+
);
125+
126+
let batches = collect(plan, ctx.task_ctx()).await?;
127+
assert_eq!(batches.len(), 1);
128+
let batch = batches.first().unwrap();
129+
assert_eq!(batch.num_rows(), 10);
130+
131+
Ok(())
132+
}
133+
102134
fn criterion_benchmark(c: &mut Criterion) {
103135
let rt = Runtime::new().unwrap();
104136
let limit = 10;
@@ -170,6 +202,55 @@ fn criterion_benchmark(c: &mut Criterion) {
170202
.as_str(),
171203
|b| b.iter(|| run(&rt, ctx.clone(), limit, true, true)),
172204
);
205+
206+
// String aggregate benchmarks - grouping by timestamp, aggregating string column
207+
let ctx = rt
208+
.block_on(create_context(partitions, samples, false, true, false))
209+
.unwrap();
210+
c.bench_function(
211+
format!(
212+
"top k={limit} string aggregate {} time-series rows [Utf8]",
213+
partitions * samples
214+
)
215+
.as_str(),
216+
|b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
217+
);
218+
219+
let ctx = rt
220+
.block_on(create_context(partitions, samples, true, true, false))
221+
.unwrap();
222+
c.bench_function(
223+
format!(
224+
"top k={limit} string aggregate {} worst-case rows [Utf8]",
225+
partitions * samples
226+
)
227+
.as_str(),
228+
|b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
229+
);
230+
231+
let ctx = rt
232+
.block_on(create_context(partitions, samples, false, true, true))
233+
.unwrap();
234+
c.bench_function(
235+
format!(
236+
"top k={limit} string aggregate {} time-series rows [Utf8View]",
237+
partitions * samples
238+
)
239+
.as_str(),
240+
|b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
241+
);
242+
243+
let ctx = rt
244+
.block_on(create_context(partitions, samples, true, true, true))
245+
.unwrap();
246+
c.bench_function(
247+
format!(
248+
"top k={limit} string aggregate {} worst-case rows [Utf8View]",
249+
partitions * samples
250+
)
251+
.as_str(),
252+
|b| b.iter(|| run_string(&rt, ctx.clone(), limit, true)),
253+
);
173254
}
174255

175256
criterion_group!(benches, criterion_benchmark);

0 commit comments

Comments
 (0)