Skip to content

Commit f704cf5

Browse files
Add TPC-H benchmark comparing DuckDB-DuckLake vs DataFusion-DuckLake
1 parent 6973bcc commit f704cf5

File tree

10 files changed

+993
-0
lines changed

10 files changed

+993
-0
lines changed

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
[workspace]
2+
members = [".", "benchmark"]
3+
14
[package]
25
name = "datafusion-ducklake"
36
version = "0.0.3"

benchmark/Cargo.toml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
[package]
2+
name = "ducklake-benchmark"
3+
version = "0.1.0"
4+
edition = "2024"
5+
description = "Benchmark comparing DuckDB-DuckLake vs DataFusion-DuckLake performance"
6+
publish = false
7+
8+
[[bin]]
9+
name = "ducklake-benchmark"
10+
path = "src/main.rs"
11+
12+
[[bin]]
13+
name = "generate-tpch"
14+
path = "src/bin/generate_tpch.rs"
15+
16+
[dependencies]
17+
# Use the parent crate
18+
datafusion-ducklake = { path = "..", features = ["metadata-duckdb"] }
19+
20+
# Core dependencies
21+
datafusion = "50.1.0"
22+
duckdb = { version = "1.4.1", features = ["bundled"] }
23+
tokio = { version = "1", features = ["full"] }
24+
25+
# CLI and reporting
26+
clap = { version = "4", features = ["derive"] }
27+
serde = { version = "1", features = ["derive"] }
28+
serde_json = "1"
29+
csv = "1.3"
30+
chrono = "0.4"
31+
32+
# Utilities
33+
anyhow = "1"
34+
regex = "1"
35+
walkdir = "2"

benchmark/src/benchmark_parser.rs

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
use anyhow::{anyhow, Result};
2+
use std::collections::HashMap;
3+
use std::fs;
4+
use std::path::Path;
5+
6+
#[derive(Debug, Clone)]
7+
pub struct Benchmark {
8+
pub name: String,
9+
pub group: Option<String>,
10+
pub subgroup: Option<String>,
11+
pub description: Option<String>,
12+
pub load: Option<String>,
13+
pub run: String,
14+
pub result: Option<String>,
15+
pub arguments: HashMap<String, Vec<String>>,
16+
}
17+
18+
pub fn parse_benchmark_file(path: &Path) -> Result<Vec<Benchmark>> {
19+
let content = fs::read_to_string(path)?;
20+
parse_benchmark(&content, path.to_string_lossy().to_string())
21+
}
22+
23+
fn parse_benchmark(content: &str, default_name: String) -> Result<Vec<Benchmark>> {
24+
let mut name = default_name;
25+
let mut group = None;
26+
let mut subgroup = None;
27+
let mut description = None;
28+
let mut load = None;
29+
let mut run = None;
30+
let mut result = None;
31+
let mut arguments: HashMap<String, Vec<String>> = HashMap::new();
32+
33+
let mut current_section: Option<&str> = None;
34+
let mut section_content = String::new();
35+
36+
for line in content.lines() {
37+
let trimmed = line.trim();
38+
39+
// Parse header comments
40+
if trimmed.starts_with("# name:") {
41+
name = trimmed.strip_prefix("# name:").unwrap().trim().to_string();
42+
continue;
43+
}
44+
if trimmed.starts_with("# group:") {
45+
group = Some(trimmed.strip_prefix("# group:").unwrap().trim().to_string());
46+
continue;
47+
}
48+
if trimmed.starts_with("# subgroup:") {
49+
subgroup = Some(trimmed.strip_prefix("# subgroup:").unwrap().trim().to_string());
50+
continue;
51+
}
52+
if trimmed.starts_with("# description:") {
53+
description = Some(trimmed.strip_prefix("# description:").unwrap().trim().to_string());
54+
continue;
55+
}
56+
if trimmed.starts_with("# argument:") {
57+
let arg = trimmed.strip_prefix("# argument:").unwrap().trim();
58+
if let Some((key, values)) = arg.split_once('=') {
59+
let vals: Vec<String> = values.split(',').map(|s| s.trim().to_string()).collect();
60+
arguments.insert(key.trim().to_string(), vals);
61+
}
62+
continue;
63+
}
64+
65+
// Skip other comments
66+
if trimmed.starts_with('#') {
67+
continue;
68+
}
69+
70+
// Section markers
71+
if trimmed == "load" {
72+
save_section(&current_section, &section_content, &mut load, &mut run, &mut result);
73+
current_section = Some("load");
74+
section_content.clear();
75+
continue;
76+
}
77+
if trimmed == "run" {
78+
save_section(&current_section, &section_content, &mut load, &mut run, &mut result);
79+
current_section = Some("run");
80+
section_content.clear();
81+
continue;
82+
}
83+
if trimmed == "result" {
84+
save_section(&current_section, &section_content, &mut load, &mut run, &mut result);
85+
current_section = Some("result");
86+
section_content.clear();
87+
continue;
88+
}
89+
90+
// Accumulate section content
91+
if current_section.is_some() {
92+
if !section_content.is_empty() {
93+
section_content.push('\n');
94+
}
95+
section_content.push_str(line);
96+
}
97+
}
98+
99+
// Save final section
100+
save_section(&current_section, &section_content, &mut load, &mut run, &mut result);
101+
102+
let run_sql = run.ok_or_else(|| anyhow!("No 'run' section found in benchmark"))?;
103+
104+
// Expand arguments into multiple benchmarks
105+
let benchmarks = expand_arguments(Benchmark {
106+
name,
107+
group,
108+
subgroup,
109+
description,
110+
load,
111+
run: run_sql,
112+
result,
113+
arguments,
114+
});
115+
116+
Ok(benchmarks)
117+
}
118+
119+
fn save_section(
120+
current: &Option<&str>,
121+
content: &str,
122+
load: &mut Option<String>,
123+
run: &mut Option<String>,
124+
result: &mut Option<String>,
125+
) {
126+
let trimmed = content.trim();
127+
if trimmed.is_empty() {
128+
return;
129+
}
130+
131+
match current {
132+
Some("load") => *load = Some(trimmed.to_string()),
133+
Some("run") => *run = Some(trimmed.to_string()),
134+
Some("result") => *result = Some(trimmed.to_string()),
135+
_ => {}
136+
}
137+
}
138+
139+
fn expand_arguments(base: Benchmark) -> Vec<Benchmark> {
140+
if base.arguments.is_empty() {
141+
return vec![base];
142+
}
143+
144+
// Get all argument combinations
145+
let mut combinations: Vec<HashMap<String, String>> = vec![HashMap::new()];
146+
147+
for (key, values) in &base.arguments {
148+
let mut new_combinations = Vec::new();
149+
for combo in &combinations {
150+
for value in values {
151+
let mut new_combo = combo.clone();
152+
new_combo.insert(key.clone(), value.clone());
153+
new_combinations.push(new_combo);
154+
}
155+
}
156+
combinations = new_combinations;
157+
}
158+
159+
// Generate benchmarks for each combination
160+
combinations
161+
.into_iter()
162+
.map(|combo| {
163+
let mut benchmark = base.clone();
164+
165+
// Update name with argument values
166+
let suffix: String = combo.values().cloned().collect::<Vec<_>>().join("_");
167+
if !suffix.is_empty() {
168+
benchmark.name = format!("{}_{}", benchmark.name, suffix);
169+
}
170+
171+
// Substitute variables in SQL
172+
benchmark.run = substitute_vars(&benchmark.run, &combo);
173+
if let Some(ref load) = benchmark.load {
174+
benchmark.load = Some(substitute_vars(load, &combo));
175+
}
176+
177+
benchmark
178+
})
179+
.collect()
180+
}
181+
182+
fn substitute_vars(sql: &str, vars: &HashMap<String, String>) -> String {
183+
let mut result = sql.to_string();
184+
for (key, value) in vars {
185+
result = result.replace(&format!("${{{}}}", key), value);
186+
}
187+
result
188+
}
189+
190+
#[cfg(test)]
191+
mod tests {
192+
use super::*;
193+
194+
#[test]
195+
fn test_parse_benchmark() {
196+
let content = r#"
197+
# name: test/sum.benchmark
198+
# group: micro
199+
# description: Sum benchmark
200+
201+
load
202+
CREATE TABLE t AS SELECT range i FROM range(1000);
203+
204+
run
205+
SELECT SUM(i) FROM t;
206+
207+
result
208+
499500
209+
"#;
210+
211+
let benchmarks = parse_benchmark(content, "default".to_string()).unwrap();
212+
assert_eq!(benchmarks.len(), 1);
213+
assert_eq!(benchmarks[0].name, "test/sum.benchmark");
214+
assert_eq!(benchmarks[0].group, Some("micro".to_string()));
215+
assert!(benchmarks[0].load.is_some());
216+
assert_eq!(benchmarks[0].run, "SELECT SUM(i) FROM t;");
217+
}
218+
}

benchmark/src/bin/generate_tpch.rs

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
use anyhow::Result;
2+
use clap::Parser;
3+
use duckdb::Connection;
4+
use std::path::PathBuf;
5+
6+
#[derive(Parser)]
7+
#[command(name = "generate-tpch")]
8+
#[command(about = "Generate TPC-H data in DuckLake format")]
9+
struct Args {
10+
/// Path for the DuckLake catalog database
11+
#[arg(short, long, default_value = "benchmark/data/tpch.ducklake")]
12+
catalog: PathBuf,
13+
14+
/// Path for data files (Parquet storage)
15+
#[arg(short, long, default_value = "benchmark/data/tpch_files")]
16+
data_path: PathBuf,
17+
18+
/// TPC-H scale factor (1 = 1GB, 10 = 10GB, etc.)
19+
#[arg(short, long, default_value = "1")]
20+
scale_factor: f64,
21+
}
22+
23+
fn main() -> Result<()> {
24+
let args = Args::parse();
25+
26+
println!("TPC-H DuckLake Data Generator");
27+
println!("=============================");
28+
println!("Catalog: {:?}", args.catalog);
29+
println!("Data path: {:?}", args.data_path);
30+
println!("Scale factor: {} (~{}GB)", args.scale_factor, args.scale_factor);
31+
println!();
32+
33+
// Ensure directories exist
34+
std::fs::create_dir_all(&args.data_path)?;
35+
if let Some(parent) = args.catalog.parent() {
36+
std::fs::create_dir_all(parent)?;
37+
}
38+
39+
// Remove existing catalog if present
40+
if args.catalog.exists() {
41+
std::fs::remove_file(&args.catalog)?;
42+
}
43+
44+
let conn = Connection::open_in_memory()?;
45+
46+
// Install and load extensions
47+
println!("Installing extensions...");
48+
conn.execute_batch(
49+
r#"
50+
INSTALL tpch;
51+
LOAD tpch;
52+
INSTALL ducklake;
53+
LOAD ducklake;
54+
"#,
55+
)?;
56+
57+
// Generate TPC-H data in memory
58+
println!("Generating TPC-H data (SF={})...", args.scale_factor);
59+
conn.execute_batch(&format!("CALL dbgen(sf={})", args.scale_factor))?;
60+
61+
// Create DuckLake catalog
62+
println!("Creating DuckLake catalog...");
63+
let attach_sql = format!(
64+
"ATTACH '{}' AS tpch_lake (TYPE ducklake, DATA_PATH '{}')",
65+
args.catalog.display(),
66+
args.data_path.display()
67+
);
68+
conn.execute(&attach_sql, [])?;
69+
70+
// Create schema
71+
conn.execute("CREATE SCHEMA IF NOT EXISTS tpch_lake.main", [])?;
72+
73+
// Copy TPC-H tables to DuckLake
74+
let tables = [
75+
"customer", "lineitem", "nation", "orders",
76+
"part", "partsupp", "region", "supplier"
77+
];
78+
79+
for table in &tables {
80+
println!(" Copying {} to DuckLake...", table);
81+
conn.execute_batch(&format!(
82+
"CREATE TABLE tpch_lake.main.{} AS SELECT * FROM {}",
83+
table, table
84+
))?;
85+
}
86+
87+
println!("\nData generation complete!");
88+
println!("Catalog saved to: {:?}", args.catalog);
89+
90+
// Print table statistics
91+
println!("\nTable Statistics:");
92+
println!("-----------------");
93+
for table in &tables {
94+
let count: i64 = conn.query_row(
95+
&format!("SELECT COUNT(*) FROM tpch_lake.main.{}", table),
96+
[],
97+
|row| row.get(0),
98+
)?;
99+
println!(" {}: {} rows", table, count);
100+
}
101+
102+
// Print data size
103+
let total_size = dir_size(&args.data_path)?;
104+
println!("\nTotal data size: {:.2} MB", total_size as f64 / 1_000_000.0);
105+
106+
Ok(())
107+
}
108+
109+
fn dir_size(path: &PathBuf) -> Result<u64> {
110+
let mut size = 0;
111+
for entry in walkdir::WalkDir::new(path) {
112+
let entry = entry?;
113+
if entry.file_type().is_file() {
114+
size += entry.metadata()?.len();
115+
}
116+
}
117+
Ok(size)
118+
}

0 commit comments

Comments
 (0)