Skip to content

Commit a7d501f

Browse files
committed
temp
1 parent 2830e80 commit a7d501f

File tree

13 files changed

+576
-10
lines changed

13 files changed

+576
-10
lines changed

Cargo.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
[package]
2+
name = "bench-orchestrator"
3+
description = "Multi-engine benchmark orchestrator"
4+
authors = { workspace = true }
5+
categories = { workspace = true }
6+
edition = { workspace = true }
7+
homepage = { workspace = true }
8+
include = { workspace = true }
9+
keywords = { workspace = true }
10+
license = { workspace = true }
11+
readme = { workspace = true }
12+
repository = { workspace = true }
13+
rust-version = { workspace = true }
14+
version = { workspace = true }
15+
publish = false
16+
17+
[dependencies]
18+
anyhow = { workspace = true }
19+
clap = { workspace = true, features = ["derive"] }
20+
serde = { workspace = true }
21+
serde_json = { workspace = true }
22+
tracing.workspace = true
23+
vortex-bench = { workspace = true }
24+
25+
[lints]
26+
workspace = true
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::fs::File;
5+
use std::io::Write;
6+
use std::io::stdout;
7+
use std::path::PathBuf;
8+
use std::time::Duration;
9+
10+
use vortex_bench::Target;
11+
use vortex_bench::display::DisplayFormat;
12+
use vortex_bench::display::print_measurements_json;
13+
use vortex_bench::display::render_table;
14+
use vortex_bench::measurements::QueryMeasurement;
15+
use vortex_bench::measurements::QueryMeasurementJson;
16+
17+
/// Aggregates measurements from multiple benchmark runs.
18+
pub struct MeasurementAggregator {
19+
query_measurements: Vec<QueryMeasurement>,
20+
}
21+
22+
impl MeasurementAggregator {
23+
pub fn new() -> Self {
24+
Self {
25+
query_measurements: Vec::new(),
26+
}
27+
}
28+
29+
/// Parse newline-delimited JSON output and add measurements.
30+
pub fn add_json_output(&mut self, json_output: &str) -> anyhow::Result<()> {
31+
for line in json_output.lines() {
32+
let line = line.trim();
33+
if line.is_empty() {
34+
continue;
35+
}
36+
37+
let json: QueryMeasurementJson = serde_json::from_str(line)?;
38+
let measurement = json_to_query_measurement(json);
39+
self.query_measurements.push(measurement);
40+
}
41+
42+
Ok(())
43+
}
44+
45+
/// Export results to the specified output.
46+
pub fn export(
47+
&self,
48+
display_format: &DisplayFormat,
49+
output_path: Option<&PathBuf>,
50+
) -> anyhow::Result<()> {
51+
// Collect unique targets
52+
let mut targets: Vec<Target> = self.query_measurements.iter().map(|m| m.target).collect();
53+
targets.sort_by_key(|t| (format!("{:?}", t.engine), format!("{}", t.format)));
54+
targets.dedup();
55+
56+
match output_path {
57+
Some(path) => {
58+
let file = File::create(path)?;
59+
self.write_to(display_format, &targets, file)
60+
}
61+
None => self.write_to(display_format, &targets, stdout().lock()),
62+
}
63+
}
64+
65+
fn write_to<W: Write>(
66+
&self,
67+
display_format: &DisplayFormat,
68+
targets: &[Target],
69+
mut output: W,
70+
) -> anyhow::Result<()> {
71+
match display_format {
72+
DisplayFormat::Table => {
73+
render_table(&mut output, self.query_measurements.clone(), targets)?;
74+
}
75+
DisplayFormat::GhJson => {
76+
print_measurements_json(&mut output, self.query_measurements.clone())?;
77+
}
78+
}
79+
Ok(())
80+
}
81+
}
82+
83+
impl Default for MeasurementAggregator {
84+
fn default() -> Self {
85+
Self::new()
86+
}
87+
}
88+
89+
/// Convert a QueryMeasurementJson back to a QueryMeasurement.
90+
fn json_to_query_measurement(json: QueryMeasurementJson) -> QueryMeasurement {
91+
// Parse query index from name (format: "dataset_qNN/engine:format")
92+
let query_idx = json
93+
.name
94+
.split("_q")
95+
.nth(1)
96+
.and_then(|s| s.split('/').next())
97+
.and_then(|s| s.parse().ok())
98+
.unwrap_or(0);
99+
100+
// Convert nanosecond runtimes back to Duration
101+
let runs: Vec<Duration> = json
102+
.all_runtimes
103+
.iter()
104+
.map(|nanos| Duration::from_nanos(*nanos as u64))
105+
.collect();
106+
107+
QueryMeasurement {
108+
query_idx,
109+
target: json.target,
110+
benchmark_dataset: json.dataset,
111+
storage: json.storage,
112+
runs,
113+
}
114+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::path::PathBuf;
5+
6+
use anyhow::bail;
7+
use vortex_bench::Engine;
8+
use vortex_bench::Format;
9+
use vortex_bench::workspace_root;
10+
11+
/// Returns the binary name for the given engine and format combination.
12+
/// The format can override the binary (e.g., Lance format always uses lance-bench).
13+
pub fn binary_name(engine: Engine, format: Option<Format>) -> Option<&'static str> {
14+
match (engine, format) {
15+
// Lance format always uses lance-bench, regardless of engine
16+
(_, Some(Format::Lance)) => Some("lance-bench"),
17+
(Engine::DataFusion, _) => Some("df-bench"),
18+
(Engine::DuckDB, _) => Some("ddb-bench"),
19+
// Arrow is a display label, not a real engine with a binary
20+
(Engine::Arrow, _) => None,
21+
// Vortex engine uses the DataFusion binary
22+
(Engine::Vortex, _) => Some("df-bench"),
23+
}
24+
}
25+
26+
/// Find the benchmark binary for the given engine and format.
27+
/// Looks in target/release first, then target/debug.
28+
pub fn find_benchmark_binary(engine: Engine, format: Option<Format>) -> anyhow::Result<PathBuf> {
29+
let name = match binary_name(engine, format) {
30+
Some(name) => name,
31+
None => bail!(
32+
"Engine {:?} does not have a dedicated benchmark binary",
33+
engine
34+
),
35+
};
36+
37+
let workspace = workspace_root();
38+
39+
// Check release first, then debug
40+
let release_path = workspace.join("target/release").join(name);
41+
if release_path.exists() {
42+
return Ok(release_path);
43+
}
44+
45+
bail!(
46+
"Could not find {} binary.\n\
47+
Expected locations:\n\
48+
- {}\n\
49+
Build the benchmark first:\n\
50+
cargo build --release -p {}",
51+
name,
52+
release_path.display(),
53+
name
54+
);
55+
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
mod aggregator;
5+
mod binary;
6+
mod subprocess;
7+
mod validation;
8+
9+
use std::path::PathBuf;
10+
11+
use clap::Parser;
12+
use clap::value_parser;
13+
use vortex_bench::BenchmarkArg;
14+
use vortex_bench::Engine;
15+
use vortex_bench::Format;
16+
use vortex_bench::Opt;
17+
use vortex_bench::display::DisplayFormat;
18+
19+
use crate::aggregator::MeasurementAggregator;
20+
use crate::subprocess::run_benchmark;
21+
use crate::validation::filter_formats_for_engine;
22+
23+
#[derive(Parser)]
24+
#[command(name = "bench-orchestrator")]
25+
#[command(about = "Multi-engine benchmark orchestrator")]
26+
struct Args {
27+
/// Benchmark to run
28+
#[arg(value_enum)]
29+
benchmark: BenchmarkArg,
30+
31+
/// Engines to run (comma-separated)
32+
#[arg(long, value_delimiter = ',', default_values_t = vec![Engine::DataFusion, Engine::DuckDB])]
33+
engines: Vec<Engine>,
34+
35+
/// Formats to benchmark (comma-separated, auto-filtered per engine)
36+
#[arg(long, value_delimiter = ',', default_values_t = vec![Format::Parquet, Format::OnDiskVortex])]
37+
formats: Vec<Format>,
38+
39+
/// Number of iterations per query
40+
#[arg(short, long, default_value_t = 5)]
41+
iterations: usize,
42+
43+
/// Specific queries to run (comma-separated)
44+
#[arg(short, long, value_delimiter = ',')]
45+
queries: Option<Vec<usize>>,
46+
47+
/// Queries to exclude (comma-separated)
48+
#[arg(short, long, value_delimiter = ',')]
49+
exclude_queries: Option<Vec<usize>>,
50+
51+
/// Display format for output
52+
#[arg(short, long, default_value_t, value_enum)]
53+
display_format: DisplayFormat,
54+
55+
/// Output file path (stdout if not specified)
56+
#[arg(short, long)]
57+
output_path: Option<PathBuf>,
58+
59+
/// Benchmark options (passed through to benchmark binaries)
60+
#[arg(long, value_delimiter = ',', value_parser = value_parser!(Opt))]
61+
options: Vec<Opt>,
62+
63+
/// Verbose output
64+
#[arg(short, long)]
65+
verbose: bool,
66+
67+
/// Track memory usage
68+
#[arg(long, default_value_t = false)]
69+
track_memory: bool,
70+
71+
/// Error on unsupported engine/format combinations (default: warn and skip)
72+
#[arg(long, default_value_t = false)]
73+
strict: bool,
74+
}
75+
76+
fn main() -> anyhow::Result<()> {
77+
let args = Args::parse();
78+
79+
let mut aggregator = MeasurementAggregator::new();
80+
81+
for engine in &args.engines {
82+
let supported_formats =
83+
match filter_formats_for_engine(*engine, &args.formats, args.strict)? {
84+
Some(formats) => formats,
85+
None => {
86+
if args.verbose {
87+
eprintln!("Skipping engine {engine}: no benchmark binary available");
88+
}
89+
continue;
90+
}
91+
};
92+
93+
if supported_formats.is_empty() {
94+
if args.verbose {
95+
eprintln!(
96+
"Skipping engine {engine}: no supported formats from {:?}",
97+
args.formats
98+
);
99+
}
100+
continue;
101+
}
102+
103+
// Split Lance from other formats (lance-bench is a separate binary)
104+
let has_lance = supported_formats.contains(&Format::Lance);
105+
let other_formats: Vec<_> = supported_formats
106+
.iter()
107+
.filter(|f| **f != Format::Lance)
108+
.copied()
109+
.collect();
110+
111+
// Run lance-bench for Lance format
112+
if has_lance {
113+
if args.verbose {
114+
eprintln!("Running lance-bench for {engine}",);
115+
}
116+
117+
let json_output = run_benchmark(
118+
*engine,
119+
None, // lance-bench doesn't accept --formats
120+
args.benchmark,
121+
args.iterations,
122+
args.queries.as_ref(),
123+
args.exclude_queries.as_ref(),
124+
&args.options,
125+
args.track_memory,
126+
args.verbose,
127+
)?;
128+
129+
aggregator.add_json_output(&json_output)?;
130+
}
131+
132+
// Run engine's binary for other formats
133+
if !other_formats.is_empty() {
134+
let json_output = run_benchmark(
135+
*engine,
136+
Some(&other_formats),
137+
args.benchmark,
138+
args.iterations,
139+
args.queries.as_ref(),
140+
args.exclude_queries.as_ref(),
141+
&args.options,
142+
args.track_memory,
143+
args.verbose,
144+
)?;
145+
146+
aggregator.add_json_output(&json_output)?;
147+
}
148+
}
149+
150+
aggregator.export(&args.display_format, args.output_path.as_ref())?;
151+
152+
Ok(())
153+
}

0 commit comments

Comments
 (0)