Skip to content

Commit 3aa300a

Browse files
committed
Generate data in a dedicated binary
Signed-off-by: Adam Gutglick <[email protected]>
1 parent e5f38f8 commit 3aa300a

File tree

6 files changed

+187
-146
lines changed

6 files changed

+187
-146
lines changed

.github/workflows/sql-benchmarks.yml

Lines changed: 8 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ jobs:
127127
env:
128128
RUSTFLAGS: "-C target-cpu=native -C force-frame-pointers=yes"
129129
run: |
130-
packages="-p df-bench -p ddb-bench"
130+
packages="-p vortex-bench -p df-bench -p ddb-bench"
131131
if [ "${{ matrix.build_lance }}" = "true" ]; then
132132
packages="$packages -p lance-bench"
133133
fi
@@ -138,43 +138,19 @@ jobs:
138138
env:
139139
RUST_BACKTRACE: full
140140
run: |
141-
# Extract formats for each engine from targets like "datafusion:parquet,duckdb:vortex"
142-
# Filter out lance from df_formats since it's handled by lance-bench
143-
df_formats=$(echo "${{ matrix.targets }}" | tr ',' '\n' | grep '^datafusion:' | grep -v ':lance$' | sed 's/datafusion://' | tr '\n' ',' | sed 's/,$//')
144-
ddb_formats=$(echo "${{ matrix.targets }}" | tr ',' '\n' | grep '^duckdb:' | sed 's/duckdb://' | tr '\n' ',' | sed 's/,$//')
145-
has_lance=$(echo "${{ matrix.targets }}" | grep -q 'datafusion:lance' && echo "true" || echo "false")
141+
# Extract all unique formats from targets (e.g., "datafusion:parquet,duckdb:vortex" -> "parquet,vortex")
142+
all_formats=$(echo "${{ matrix.targets }}" | tr ',' '\n' | sed 's/^[^:]*://' | sort -u | tr '\n' ',' | sed 's/,$//')
146143
147144
# Build options string if scale_factor is set
148145
opts=""
149146
if [ -n "${{ matrix.scale_factor }}" ]; then
150-
opts="--options scale_factor=${{ matrix.scale_factor }} $opts"
151-
fi
152-
153-
# Generate data with df-bench (runs each query once)
154-
if [ -n "$df_formats" ]; then
155-
target/release_debug/df-bench ${{ matrix.subcommand }} \
156-
--formats "$df_formats" \
157-
-i1 \
158-
-d table \
159-
$opts
160-
fi
161-
162-
# Generate data with ddb-bench (runs each query once)
163-
if [ -n "$ddb_formats" ]; then
164-
target/release_debug/ddb-bench ${{ matrix.subcommand }} \
165-
--formats "$ddb_formats" \
166-
-i1 \
167-
-d table \
168-
$opts
147+
opts="--options scale_factor=${{ matrix.scale_factor }}"
169148
fi
170149
171-
# Generate data with lance-bench (runs each query once)
172-
if [ "$has_lance" = "true" ] && [ -f "target/release_debug/lance-bench" ]; then
173-
target/release_debug/lance-bench ${{ matrix.subcommand }} \
174-
-i1 \
175-
-d table \
176-
$opts
177-
fi
150+
# Generate all data formats with a single command
151+
target/release_debug/bench-data-gen ${{ matrix.subcommand }} \
152+
--formats "$all_formats" \
153+
$opts
178154
179155
- name: Setup AWS CLI
180156
if: inputs.mode != 'pr' || github.event.pull_request.head.repo.fork == false

benchmarks/ddb-bench/src/lib.rs

Lines changed: 11 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -13,26 +13,12 @@ use url::Url;
1313
use vortex::error::VortexExpect;
1414
use vortex_bench::Benchmark;
1515
use vortex_bench::Format;
16+
use vortex_bench::generate_duckdb_registration_sql;
1617
use vortex_duckdb::duckdb::Config;
1718
use vortex_duckdb::duckdb::Connection;
1819
use vortex_duckdb::duckdb::Database;
1920
use vortex_duckdb::register_extension_options;
2021

21-
#[derive(Debug, Clone)]
22-
enum DuckDBObject {
23-
Table,
24-
View,
25-
}
26-
27-
impl DuckDBObject {
28-
fn to_str(&self) -> &str {
29-
match self {
30-
DuckDBObject::Table => "TABLE",
31-
DuckDBObject::View => "VIEW",
32-
}
33-
}
34-
}
35-
3622
/// DuckDB context for benchmarks.
3723
pub struct DuckClient {
3824
pub db: Database,
@@ -166,9 +152,9 @@ impl DuckClient {
166152
benchmark: &B,
167153
file_format: Format,
168154
) -> Result<()> {
169-
let object = match file_format {
170-
Format::Parquet | Format::OnDiskVortex | Format::VortexCompact => DuckDBObject::View,
171-
Format::OnDiskDuckDB => DuckDBObject::Table,
155+
let object_type = match file_format {
156+
Format::Parquet | Format::OnDiskVortex | Format::VortexCompact => "VIEW",
157+
Format::OnDiskDuckDB => "TABLE",
172158
Format::Lance => {
173159
anyhow::bail!(
174160
"Lance format is not supported for DuckDB engine. \
@@ -184,34 +170,16 @@ impl DuckClient {
184170
f => f,
185171
};
186172

187-
let extension = match load_format {
188-
Format::Parquet | Format::OnDiskVortex | Format::VortexCompact => load_format.ext(),
189-
other => anyhow::bail!("Format {other} isn't supported for DuckDB"),
190-
};
191-
192173
// Get the base URL for the format's data directory
193174
let format_url = benchmark.format_path(load_format, benchmark.data_url())?;
194175
let base_dir = format_url.as_str();
195-
let base_dir = base_dir.strip_prefix("file://").unwrap_or(base_dir);
196-
197-
// Generate table registration commands from benchmark's table specs
198-
let mut commands = String::new();
199-
for table_spec in benchmark.table_specs() {
200-
let pattern = benchmark
201-
.pattern(table_spec.name, load_format)
202-
.map(|p| p.to_string())
203-
.unwrap_or_else(|| format!("*.{}", extension));
204-
205-
let table_path = format!("{base_dir}{pattern}");
206-
207-
commands.push_str(&format!(
208-
"CREATE {} IF NOT EXISTS {} AS SELECT * FROM read_{}('{}');\n",
209-
object.to_str(),
210-
table_spec.name,
211-
extension,
212-
table_path
213-
));
214-
}
176+
let base_dir = base_dir
177+
.strip_prefix("file://")
178+
.unwrap_or(base_dir)
179+
.trim_end_matches('/');
180+
181+
let commands =
182+
generate_duckdb_registration_sql(benchmark, base_dir, load_format, object_type);
215183

216184
trace!("Executing table registration commands: {commands}");
217185

vortex-bench/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,7 @@ vortex = { workspace = true, features = [
6666
"zstd",
6767
"unstable_encodings",
6868
] }
69+
70+
[[bin]]
71+
name = "bench-data-gen"
72+
path = "src/bin/data_gen.rs"

vortex-bench/src/bin/data_gen.rs

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
//! Benchmark data generation binary.
5+
//!
6+
//! This binary generates benchmark data for all formats needed, consolidating
7+
//! data generation that was previously duplicated across df-bench and ddb-bench.
8+
9+
use std::path::Path;
10+
use std::process::Command;
11+
12+
use clap::Parser;
13+
use clap::value_parser;
14+
use tracing::info;
15+
use vortex_bench::Benchmark;
16+
use vortex_bench::BenchmarkArg;
17+
use vortex_bench::CompactionStrategy;
18+
use vortex_bench::Format;
19+
use vortex_bench::Opt;
20+
use vortex_bench::Opts;
21+
use vortex_bench::conversions::convert_parquet_to_vortex;
22+
use vortex_bench::create_benchmark;
23+
use vortex_bench::generate_duckdb_registration_sql;
24+
use vortex_bench::setup_logging_and_tracing;
25+
26+
#[derive(Parser)]
27+
#[command(name = "bench-data-gen")]
28+
#[command(about = "Generate benchmark data for all requested formats")]
29+
struct Args {
30+
#[arg(value_enum)]
31+
benchmark: BenchmarkArg,
32+
33+
#[arg(short, long)]
34+
verbose: bool,
35+
36+
#[arg(long)]
37+
tracing: bool,
38+
39+
#[arg(long, value_delimiter = ',', value_parser = value_parser!(Format))]
40+
formats: Vec<Format>,
41+
42+
#[arg(long, value_delimiter = ',', value_parser = value_parser!(Opt))]
43+
options: Vec<Opt>,
44+
}
45+
46+
#[tokio::main]
47+
async fn main() -> anyhow::Result<()> {
48+
let args = Args::parse();
49+
let opts = Opts::from(args.options);
50+
51+
setup_logging_and_tracing(args.verbose, args.tracing)?;
52+
53+
let benchmark = create_benchmark(args.benchmark, &opts)?;
54+
55+
// Generate base Parquet data - this is the source for all other formats
56+
benchmark.generate_base_data().await?;
57+
58+
// Convert to other formats as needed (only for local file URLs)
59+
if benchmark.data_url().scheme() == "file" {
60+
let base_path = benchmark
61+
.data_url()
62+
.to_file_path()
63+
.map_err(|_| anyhow::anyhow!("Invalid file URL: {}", benchmark.data_url()))?;
64+
65+
if args
66+
.formats
67+
.iter()
68+
.any(|f| matches!(f, Format::OnDiskVortex))
69+
{
70+
convert_parquet_to_vortex(&base_path, CompactionStrategy::Default).await?;
71+
}
72+
73+
if args
74+
.formats
75+
.iter()
76+
.any(|f| matches!(f, Format::VortexCompact))
77+
{
78+
convert_parquet_to_vortex(&base_path, CompactionStrategy::Compact).await?;
79+
}
80+
81+
if args
82+
.formats
83+
.iter()
84+
.any(|f| matches!(f, Format::OnDiskDuckDB))
85+
{
86+
generate_duckdb(&base_path, &*benchmark)?;
87+
}
88+
}
89+
90+
Ok(())
91+
}
92+
93+
/// Generate a DuckDB database from Parquet files using the DuckDB CLI.
94+
fn generate_duckdb(base_path: &Path, benchmark: &dyn Benchmark) -> anyhow::Result<()> {
95+
let duckdb_dir = base_path.join(Format::OnDiskDuckDB.name());
96+
std::fs::create_dir_all(&duckdb_dir)?;
97+
98+
let db_path = duckdb_dir.join("duckdb.db");
99+
100+
// Skip if database already exists
101+
if db_path.exists() {
102+
info!("DuckDB database already exists at {}", db_path.display());
103+
return Ok(());
104+
}
105+
106+
let parquet_dir = base_path.join(Format::Parquet.name());
107+
let sql = generate_duckdb_registration_sql(
108+
benchmark,
109+
parquet_dir.to_str().unwrap(),
110+
Format::Parquet,
111+
"TABLE",
112+
);
113+
114+
info!("Creating DuckDB database at {}", db_path.display());
115+
116+
let output = Command::new("duckdb")
117+
.arg(&db_path)
118+
.arg("-c")
119+
.arg(&sql)
120+
.output()?;
121+
122+
if !output.status.success() {
123+
let stderr = String::from_utf8_lossy(&output.stderr);
124+
anyhow::bail!("DuckDB CLI failed: {}", stderr);
125+
}
126+
127+
Ok(())
128+
}

vortex-bench/src/conversions.rs

Lines changed: 0 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use std::path::Path;
77
use std::path::PathBuf;
88

99
use arrow_array::RecordBatchReader;
10-
use arrow_ipc::writer::FileWriter;
1110
use futures::StreamExt;
1211
use futures::TryStreamExt;
1312
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
@@ -120,73 +119,3 @@ pub async fn convert_parquet_to_vortex(
120119
.await?;
121120
Ok(())
122121
}
123-
124-
/// Convert all Parquet files in a directory to Arrow IPC format.
125-
///
126-
/// This function reads Parquet files from `{input_path}/parquet/` and writes
127-
/// Arrow IPC files to `{input_path}/arrow/`.
128-
///
129-
/// The conversion is idempotent - existing Arrow files will not be regenerated.
130-
pub async fn convert_parquet_to_arrow(input_path: &Path) -> anyhow::Result<()> {
131-
let arrow_dir = input_path.join(Format::Arrow.name());
132-
let parquet_path = input_path.join(Format::Parquet.name());
133-
create_dir_all(&arrow_dir).await?;
134-
135-
let parquet_inputs = fs::read_dir(&parquet_path)?.collect::<std::io::Result<Vec<_>>>()?;
136-
137-
trace!(
138-
"Found {} parquet files in {}",
139-
parquet_inputs.len(),
140-
parquet_path.to_str().unwrap()
141-
);
142-
143-
let iter = parquet_inputs
144-
.iter()
145-
.filter(|entry| entry.path().extension().is_some_and(|e| e == "parquet"));
146-
147-
futures::stream::iter(iter)
148-
.map(|dir_entry| {
149-
let filename = {
150-
let mut temp = dir_entry.path();
151-
temp.set_extension("");
152-
temp.file_name().unwrap().to_str().unwrap().to_string()
153-
};
154-
let parquet_file_path = parquet_path.join(format!("{filename}.parquet"));
155-
let output_path = arrow_dir.join(format!("{filename}.{}", Format::Arrow.ext()));
156-
157-
tokio::spawn(
158-
async move {
159-
idempotent_async(output_path.as_path(), move |arrow_file| async move {
160-
info!("Converting '{filename}' from Parquet to Arrow IPC");
161-
162-
// Read Parquet file
163-
let parquet_file = File::open(&parquet_file_path)?;
164-
let reader =
165-
ParquetRecordBatchReaderBuilder::try_new(parquet_file)?.build()?;
166-
let schema = reader.schema();
167-
168-
// Write Arrow IPC file
169-
let arrow_output = File::create(&arrow_file)?;
170-
let mut writer = FileWriter::try_new(arrow_output, &schema)?;
171-
172-
for batch in reader {
173-
let batch = batch?;
174-
writer.write(&batch)?;
175-
}
176-
177-
writer.finish()?;
178-
179-
anyhow::Ok(())
180-
})
181-
.await
182-
.expect("Failed to write Arrow file")
183-
}
184-
.in_current_span(),
185-
)
186-
})
187-
.buffer_unordered(16)
188-
.try_collect::<Vec<_>>()
189-
.await?;
190-
191-
Ok(())
192-
}

0 commit comments

Comments
 (0)