Skip to content

Commit 6564035

Browse files
chore[bench]: move tpcds into new benchmark format (#3870)
Signed-off-by: Joe Isaacs <[email protected]> --------- Signed-off-by: Joe Isaacs <[email protected]>
1 parent 0b091c3 commit 6564035

File tree

14 files changed

+305
-298
lines changed

14 files changed

+305
-298
lines changed

bench-vortex/src/benchmark_driver.rs

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,12 @@ use std::path::PathBuf;
88
use anyhow::Result;
99
use indicatif::ProgressBar;
1010
use log::warn;
11+
use vortex::error::VortexExpect;
1112
use vortex_datafusion::metrics::VortexMetricsFinder;
1213

1314
use crate::benchmark_trait::Benchmark;
1415
use crate::display::DisplayFormat;
15-
use crate::engines::{EngineCtx, benchmark_datafusion_query, benchmark_duckdb_query};
16+
use crate::engines::{EngineCtx, benchmark_datafusion_query};
1617
use crate::measurements::{MemoryMeasurement, QueryMeasurement};
1718
use crate::memory::BenchmarkMemoryTracker;
1819
use crate::metrics::{MetricsSetExt, export_plan_spans};
@@ -143,13 +144,12 @@ fn execute_queries<B: Benchmark>(
143144
tracker.start_query();
144145
}
145146

146-
match engine_ctx {
147+
let row_count = match engine_ctx {
147148
EngineCtx::DataFusion(ctx) => {
148149
let (runs, (row_count, execution_plan)) = runtime.block_on(async {
149150
benchmark_datafusion_query(iterations, || async {
150-
let (batches, plan) = df::execute_query(&ctx.session, query_string)
151-
.await
152-
.unwrap_or_else(|err| {
151+
let (batches, plan) =
152+
ctx.execute_query(query_string).await.unwrap_or_else(|err| {
153153
vortex_panic!("query: {query_idx} failed with: {err}")
154154
});
155155
let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum();
@@ -158,16 +158,6 @@ fn execute_queries<B: Benchmark>(
158158
.await
159159
});
160160

161-
// Validate row count if expected counts are provided
162-
if let Some(expected_counts) = expected_row_counts {
163-
if query_idx < expected_counts.len() {
164-
assert_eq!(
165-
row_count, expected_counts[query_idx],
166-
"Row count mismatch for query {query_idx} - datafusion:{format}",
167-
);
168-
}
169-
}
170-
171161
ctx.execution_plans
172162
.push((query_idx, execution_plan.clone()));
173163

@@ -193,19 +183,24 @@ fn execute_queries<B: Benchmark>(
193183
storage: url_scheme_to_storage(benchmark.data_url())?,
194184
runs,
195185
});
186+
187+
row_count
196188
}
197189
EngineCtx::DuckDB(ctx) => {
198-
let (runs, row_count) =
199-
benchmark_duckdb_query(query_idx, query_string, iterations, ctx);
200-
201-
// Validate row count if expected counts are provided
202-
if let Some(expected_counts) = expected_row_counts {
203-
if query_idx < expected_counts.len() {
204-
assert_eq!(
205-
row_count, expected_counts[query_idx],
206-
"Row count mismatch for query {query_idx} - duckdb:{format}",
207-
);
208-
}
190+
let mut runs = Vec::with_capacity(iterations);
191+
let mut row_count = None;
192+
193+
for _ in 0..iterations {
194+
let (duration, current_row_count) =
195+
ctx.execute_query(query_string).unwrap_or_else(|err| {
196+
vortex_panic!("query: {query_idx} failed with: {err}")
197+
});
198+
199+
runs.push(duration);
200+
row_count.inspect(|rc| {
201+
assert_eq!(*rc, current_row_count, "each row count must match")
202+
});
203+
row_count = Some(current_row_count);
209204
}
210205

211206
query_measurements.push(QueryMeasurement {
@@ -215,6 +210,18 @@ fn execute_queries<B: Benchmark>(
215210
storage: url_scheme_to_storage(benchmark.data_url())?,
216211
runs,
217212
});
213+
214+
row_count.vortex_expect("cannot have zero runs")
215+
}
216+
};
217+
218+
// Validate row count if expected counts are provided
219+
if let Some(expected_counts) = expected_row_counts {
220+
if query_idx < expected_counts.len() {
221+
assert_eq!(
222+
row_count, expected_counts[query_idx],
223+
"Row count mismatch for query {query_idx} - duckdb:{format}",
224+
);
218225
}
219226
}
220227

bench-vortex/src/bin/query_bench.rs

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ use std::path::PathBuf;
55

66
use bench_vortex::Target;
77
use bench_vortex::benchmark_driver::{DriverConfig, run_benchmark};
8-
use bench_vortex::clickbench::Flavor;
9-
use bench_vortex::clickbench_benchmark::ClickBenchBenchmark;
8+
use bench_vortex::clickbench::{ClickBenchBenchmark, Flavor};
109
use bench_vortex::display::DisplayFormat;
11-
use bench_vortex::tpch_benchmark::TpcHBenchmark;
10+
use bench_vortex::tpcds::TpcDsBenchmark;
11+
use bench_vortex::tpch::tpch_benchmark::TpcHBenchmark;
1212
use clap::{Parser, Subcommand, value_parser};
1313

1414
#[derive(Parser, Debug)]
@@ -27,6 +27,10 @@ enum Commands {
2727
/// Run TPC-H queries
2828
#[command(name = "tpch")]
2929
TpcH(TpcHArgs),
30+
31+
/// Run TPC-DS queries
32+
#[command(name = "tpcds")]
33+
TpcDS(TpcDSArgs),
3034
}
3135

3236
/// Common arguments shared across benchmarks
@@ -123,6 +127,26 @@ struct TpcHArgs {
123127
scale_factor: String,
124128
}
125129

130+
#[derive(Parser, Debug)]
131+
struct TpcDSArgs {
132+
#[command(flatten)]
133+
common: CommonArgs,
134+
135+
#[arg(long, value_delimiter = ',', value_parser = value_parser!(Target),
136+
default_values = vec![
137+
"datafusion:parquet",
138+
"datafusion:vortex",
139+
"duckdb:parquet",
140+
"duckdb:vortex",
141+
"duckdb:duckdb"
142+
]
143+
)]
144+
targets: Vec<Target>,
145+
146+
#[arg(long, default_value = "1.0", value_parser=validate_scale_factor)]
147+
scale_factor: String,
148+
}
149+
126150
fn validate_scale_factor(val: &str) -> Result<String, String> {
127151
match val.parse::<f32>() {
128152
Ok(n) if [0.01, 0.1, 1., 10., 100., 1000.].contains(&n) => {
@@ -150,6 +174,7 @@ fn main() -> anyhow::Result<()> {
150174
match args.command {
151175
Commands::ClickBench(clickbench_args) => run_clickbench(clickbench_args),
152176
Commands::TpcH(tpch_args) => run_tpch(tpch_args),
177+
Commands::TpcDS(tpcds_args) => run_tpcds(tpcds_args),
153178
}
154179
}
155180

@@ -212,3 +237,31 @@ fn run_tpch(args: TpcHArgs) -> anyhow::Result<()> {
212237

213238
Ok(())
214239
}
240+
241+
fn run_tpcds(args: TpcDSArgs) -> anyhow::Result<()> {
242+
// Create benchmark instance
243+
let benchmark = TpcDsBenchmark::new(args.scale_factor, args.common.use_remote_data_dir)?;
244+
245+
// Configure driver
246+
let config = DriverConfig {
247+
targets: args.targets,
248+
iterations: args.common.iterations,
249+
threads: args.common.threads,
250+
verbose: args.common.verbose,
251+
display_format: args.common.display_format,
252+
disable_datafusion_cache: args.common.disable_datafusion_cache,
253+
queries: args.common.queries,
254+
exclude_queries: args.common.exclude_queries,
255+
output_path: args.common.output_path,
256+
emit_plan: args.common.emit_plan,
257+
export_spans: args.common.export_spans,
258+
show_metrics: args.common.show_metrics,
259+
hide_progress_bar: args.common.hide_progress_bar,
260+
track_memory: args.common.track_memory,
261+
};
262+
263+
// Run benchmark using the trait system
264+
run_benchmark(benchmark, config)?;
265+
266+
Ok(())
267+
}

0 commit comments

Comments
 (0)