Skip to content

Commit a5e4d34

Browse files
andygroveclaude
andcommitted
feat: make query parameter optional in tpch benchmark
When running the tpch benchmark, the --query parameter is now optional. If not specified, all 22 TPC-H queries will be run sequentially. Changes: - Make --query optional for both datafusion and ballista benchmarks - Run all 22 queries when --query is not specified - Only print SQL queries when --debug flag is enabled - Write a single JSON output file for the entire benchmark run - Fix parquet file path resolution for datafusion benchmarks - Simplify output when iterations=1 (no iteration number, no average) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent fce60b7 commit a5e4d34

File tree

1 file changed

+156
-97
lines changed

1 file changed

+156
-97
lines changed

benchmarks/src/bin/tpch.rs

Lines changed: 156 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,9 @@ static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
7272

7373
#[derive(Debug, StructOpt, Clone)]
7474
struct BallistaBenchmarkOpt {
75-
/// Query number
75+
/// Query number (1-22). If not specified, runs all queries.
7676
#[structopt(short, long)]
77-
query: usize,
77+
query: Option<usize>,
7878

7979
/// Activate debug mode to see query results
8080
#[structopt(short, long)]
@@ -122,9 +122,9 @@ struct BallistaBenchmarkOpt {
122122

123123
#[derive(Debug, StructOpt, Clone)]
124124
struct DataFusionBenchmarkOpt {
125-
/// Query number
125+
/// Query number (1-22). If not specified, runs all queries.
126126
#[structopt(short, long)]
127-
query: usize,
127+
query: Option<usize>,
128128

129129
/// Activate debug mode to see query results
130130
#[structopt(short, long)]
@@ -283,7 +283,6 @@ async fn main() -> Result<()> {
283283
#[allow(clippy::await_holding_lock)]
284284
async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordBatch>> {
285285
println!("Running benchmarks with the following options: {opt:?}");
286-
let mut benchmark_run = BenchmarkRun::new(opt.query);
287286
let config = SessionConfig::new()
288287
.with_target_partitions(opt.partitions)
289288
.with_batch_size(opt.batch_size);
@@ -319,124 +318,166 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result<Vec<RecordB
319318
}
320319
}
321320

322-
let mut millis = vec![];
323-
// run benchmark
321+
// Determine which queries to run
322+
let query_numbers: Vec<usize> = opt
323+
.query
324+
.map(|q| vec![q])
325+
.unwrap_or_else(|| (1..=22).collect());
326+
327+
let mut benchmark_run = BenchmarkRun::new();
324328
let mut result: Vec<RecordBatch> = Vec::with_capacity(1);
325-
for i in 0..opt.iterations {
326-
let start = Instant::now();
327-
let plans = create_logical_plans(&ctx, opt.query).await?;
328-
for plan in plans {
329-
result = execute_query(&ctx, &plan, opt.debug).await?;
329+
330+
for query in query_numbers {
331+
let mut query_run = QueryRun::new(query);
332+
let mut millis = vec![];
333+
334+
// run benchmark
335+
for i in 0..opt.iterations {
336+
let start = Instant::now();
337+
let plans = create_logical_plans(&ctx, query).await?;
338+
for plan in plans {
339+
result = execute_query(&ctx, &plan, opt.debug).await?;
340+
}
341+
let elapsed = start.elapsed().as_secs_f64() * 1000.0;
342+
millis.push(elapsed);
343+
let row_count = result.iter().map(|b| b.num_rows()).sum();
344+
if opt.iterations == 1 {
345+
println!(
346+
"Query {} took {:.1} ms and returned {} rows",
347+
query, elapsed, row_count
348+
);
349+
} else {
350+
println!(
351+
"Query {} iteration {} took {:.1} ms and returned {} rows",
352+
query, i, elapsed, row_count
353+
);
354+
}
355+
query_run.add_result(elapsed, row_count);
356+
}
357+
358+
if opt.iterations > 1 {
359+
let avg = millis.iter().sum::<f64>() / millis.len() as f64;
360+
println!("Query {} avg time: {:.2} ms", query, avg);
330361
}
331-
let elapsed = start.elapsed().as_secs_f64() * 1000.0;
332-
millis.push(elapsed);
333-
let row_count = result.iter().map(|b| b.num_rows()).sum();
334-
println!(
335-
"Query {} iteration {} took {:.1} ms and returned {} rows",
336-
opt.query, i, elapsed, row_count
337-
);
338-
benchmark_run.add_result(elapsed, row_count);
339-
}
340362

341-
let avg = millis.iter().sum::<f64>() / millis.len() as f64;
342-
println!("Query {} avg time: {:.2} ms", opt.query, avg);
363+
benchmark_run.add_query_run(query_run);
364+
}
343365

344366
if let Some(path) = &opt.output_path {
345-
write_summary_json(&mut benchmark_run, path)?;
367+
write_summary_json(&benchmark_run, path)?;
346368
}
347369

348370
Ok(result)
349371
}
350372

351373
async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
352374
println!("Running benchmarks with the following options: {opt:?}");
353-
let mut benchmark_run = BenchmarkRun::new(opt.query);
354-
355-
let config = SessionConfig::new_with_ballista()
356-
.with_target_partitions(opt.partitions)
357-
.with_ballista_job_name(&format!("Query derived from TPC-H q{}", opt.query))
358-
.with_batch_size(opt.batch_size)
359-
.with_collect_statistics(true);
360375

361-
let state = SessionStateBuilder::new()
362-
.with_default_features()
363-
.with_config(config)
364-
.build();
365376
let address = format!(
366377
"df://{}:{}",
367378
opt.host.clone().unwrap().as_str(),
368379
opt.port.unwrap()
369380
);
370-
let ctx = SessionContext::remote_with_state(&address, state).await?;
371381

372-
// register tables with Ballista context
373-
let path = opt.path.to_str().unwrap();
374-
let file_format = opt.file_format.as_str();
382+
// Determine which queries to run
383+
let query_numbers: Vec<usize> = opt
384+
.query
385+
.map(|q| vec![q])
386+
.unwrap_or_else(|| (1..=22).collect());
375387

376-
register_tables(path, file_format, &ctx, opt.debug).await?;
388+
let mut benchmark_run = BenchmarkRun::new();
377389

378-
let mut millis = vec![];
390+
for query in query_numbers {
391+
let mut query_run = QueryRun::new(query);
379392

380-
// run benchmark
381-
let queries = get_query_sql(opt.query)?;
382-
println!(
383-
"Running benchmark with queries {}:\n {:?}",
384-
opt.query, queries
385-
);
386-
let mut batches = vec![];
387-
for i in 0..opt.iterations {
388-
let start = Instant::now();
389-
for sql in &queries {
390-
let df = ctx
391-
.sql(sql)
392-
.await
393-
.map_err(|e| DataFusionError::Plan(format!("{e:?}")))
394-
.unwrap();
395-
let plan = df.clone().into_optimized_plan()?;
393+
let config = SessionConfig::new_with_ballista()
394+
.with_target_partitions(opt.partitions)
395+
.with_ballista_job_name(&format!("Query derived from TPC-H q{}", query))
396+
.with_batch_size(opt.batch_size)
397+
.with_collect_statistics(true);
398+
399+
let state = SessionStateBuilder::new()
400+
.with_default_features()
401+
.with_config(config)
402+
.build();
403+
let ctx = SessionContext::remote_with_state(&address, state).await?;
404+
405+
// register tables with Ballista context
406+
let path = opt.path.to_str().unwrap();
407+
let file_format = opt.file_format.as_str();
408+
409+
register_tables(path, file_format, &ctx, opt.debug).await?;
410+
411+
let mut millis = vec![];
412+
413+
// run benchmark
414+
let sqls = get_query_sql(query)?;
415+
if opt.debug {
416+
println!("Running benchmark with query {}:\n {:?}", query, sqls);
417+
}
418+
let mut batches = vec![];
419+
for i in 0..opt.iterations {
420+
let start = Instant::now();
421+
for sql in &sqls {
422+
let df = ctx
423+
.sql(sql)
424+
.await
425+
.map_err(|e| DataFusionError::Plan(format!("{e:?}")))
426+
.unwrap();
427+
let plan = df.clone().into_optimized_plan()?;
428+
if opt.debug {
429+
println!("=== Optimized logical plan ===\n{plan:?}\n");
430+
}
431+
batches = df
432+
.collect()
433+
.await
434+
.map_err(|e| DataFusionError::Plan(format!("{e:?}")))
435+
.unwrap();
436+
}
437+
let elapsed = start.elapsed().as_secs_f64() * 1000.0;
438+
millis.push(elapsed);
439+
let row_count = batches.iter().map(|b| b.num_rows()).sum();
440+
if opt.iterations == 1 {
441+
println!(
442+
"Query {} took {:.1} ms and returned {} rows",
443+
query, elapsed, row_count
444+
);
445+
} else {
446+
println!(
447+
"Query {} iteration {} took {:.1} ms and returned {} rows",
448+
query, i, elapsed, row_count
449+
);
450+
}
451+
query_run.add_result(elapsed, row_count);
396452
if opt.debug {
397-
println!("=== Optimized logical plan ===\n{plan:?}\n");
453+
pretty::print_batches(&batches)?;
454+
}
455+
456+
if let Some(expected_results_path) = opt.expected_results.as_ref() {
457+
let expected = get_expected_results(query, expected_results_path).await?;
458+
assert_expected_results(&expected, &batches)
398459
}
399-
batches = df
400-
.collect()
401-
.await
402-
.map_err(|e| DataFusionError::Plan(format!("{e:?}")))
403-
.unwrap();
404-
}
405-
let elapsed = start.elapsed().as_secs_f64() * 1000.0;
406-
millis.push(elapsed);
407-
let row_count = batches.iter().map(|b| b.num_rows()).sum();
408-
println!(
409-
"Query {} iteration {} took {:.1} ms and returned {} rows",
410-
opt.query, i, elapsed, row_count
411-
);
412-
benchmark_run.add_result(elapsed, row_count);
413-
if opt.debug {
414-
pretty::print_batches(&batches)?;
415460
}
416461

417-
if let Some(expected_results_path) = opt.expected_results.as_ref() {
418-
let expected = get_expected_results(opt.query, expected_results_path).await?;
419-
assert_expected_results(&expected, &batches)
462+
if opt.iterations > 1 {
463+
let avg = millis.iter().sum::<f64>() / millis.len() as f64;
464+
println!("Query {} avg time: {:.2} ms", query, avg);
420465
}
421-
}
422466

423-
let avg = millis.iter().sum::<f64>() / millis.len() as f64;
424-
println!("Query {} avg time: {:.2} ms", opt.query, avg);
467+
benchmark_run.add_query_run(query_run);
468+
}
425469

426470
if let Some(path) = &opt.output_path {
427-
write_summary_json(&mut benchmark_run, path)?;
471+
write_summary_json(&benchmark_run, path)?;
428472
}
429473

430474
Ok(())
431475
}
432476

433-
fn write_summary_json(benchmark_run: &mut BenchmarkRun, path: &Path) -> Result<()> {
477+
fn write_summary_json(benchmark_run: &BenchmarkRun, path: &Path) -> Result<()> {
434478
let json =
435479
serde_json::to_string_pretty(&benchmark_run).expect("summary is serializable");
436-
let filename = format!(
437-
"tpch-q{}-{}.json",
438-
benchmark_run.query, benchmark_run.start_time
439-
);
480+
let filename = format!("tpch-{}.json", benchmark_run.start_time);
440481
let path = path.join(filename);
441482
println!(
442483
"Writing summary file to {}",
@@ -833,7 +874,7 @@ async fn get_table(
833874
)
834875
}
835876
"parquet" => {
836-
let path = format!("{path}/{table}");
877+
let path = find_path(path, table, "parquet")?;
837878
let format = ParquetFormat::default().with_enable_pruning(true);
838879

839880
(
@@ -971,6 +1012,27 @@ pub fn get_tbl_tpch_table_schema(table: &str) -> Schema {
9711012
schema.finish()
9721013
}
9731014

1015+
#[derive(Debug, Serialize)]
1016+
struct QueryRun {
1017+
/// query number
1018+
query: usize,
1019+
/// list of individual run times and row counts
1020+
iterations: Vec<QueryResult>,
1021+
}
1022+
1023+
impl QueryRun {
1024+
fn new(query: usize) -> Self {
1025+
Self {
1026+
query,
1027+
iterations: vec![],
1028+
}
1029+
}
1030+
1031+
fn add_result(&mut self, elapsed: f64, row_count: usize) {
1032+
self.iterations.push(QueryResult { elapsed, row_count })
1033+
}
1034+
}
1035+
9741036
#[derive(Debug, Serialize)]
9751037
struct BenchmarkRun {
9761038
/// Benchmark crate version
@@ -983,14 +1045,12 @@ struct BenchmarkRun {
9831045
start_time: u64,
9841046
/// CLI arguments
9851047
arguments: Vec<String>,
986-
/// query number
987-
query: usize,
988-
/// list of individual run times and row counts
989-
iterations: Vec<QueryResult>,
1048+
/// Results for each query
1049+
queries: Vec<QueryRun>,
9901050
}
9911051

9921052
impl BenchmarkRun {
993-
fn new(query: usize) -> Self {
1053+
fn new() -> Self {
9941054
Self {
9951055
benchmark_version: env!("CARGO_PKG_VERSION").to_owned(),
9961056
datafusion_version: DATAFUSION_VERSION.to_owned(),
@@ -1000,13 +1060,12 @@ impl BenchmarkRun {
10001060
.expect("current time is later than UNIX_EPOCH")
10011061
.as_secs(),
10021062
arguments: std::env::args().skip(1).collect::<Vec<String>>(),
1003-
query,
1004-
iterations: vec![],
1063+
queries: vec![],
10051064
}
10061065
}
10071066

1008-
fn add_result(&mut self, elapsed: f64, row_count: usize) {
1009-
self.iterations.push(QueryResult { elapsed, row_count })
1067+
fn add_query_run(&mut self, query_run: QueryRun) {
1068+
self.queries.push(query_run)
10101069
}
10111070
}
10121071

@@ -1637,7 +1696,7 @@ mod tests {
16371696

16381697
// run the query to compute actual results of the query
16391698
let opt = DataFusionBenchmarkOpt {
1640-
query: n,
1699+
query: Some(n),
16411700
debug: false,
16421701
iterations: 1,
16431702
partitions: 2,

0 commit comments

Comments
 (0)