Skip to content

Commit 2cca8c9

Browse files
committed
Extract merge benchmark to CLI
Signed-off-by: Abhi Agarwal <[email protected]>
1 parent 1bce4c0 commit 2cca8c9

File tree

6 files changed

+506
-179
lines changed

6 files changed

+506
-179
lines changed

crates/benchmarks/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ clap = { version = "4", features = ["derive"] }
1414
tokio = { workspace = true, features = ["fs", "macros", "rt", "io-util"] }
1515
url = { workspace = true }
1616
tempfile = { workspace = true }
17+
anyhow = "1"
1718

1819
[dependencies.deltalake-core]
1920
path = "../core"
@@ -26,3 +27,7 @@ divan = "0.1"
2627
[[bench]]
2728
name = "merge"
2829
harness = false
30+
31+
[[bench]]
32+
name = "smoke"
33+
harness = false

crates/benchmarks/README.md

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,34 @@ A simple CLI is available to run a single merge with configurable parameters (us
4848

4949
Run (from repo root):
5050
```bash
51-
cargo run --profile profiling -p delta-benchmarks -- upsert --matched 0.01 --not-matched 0.10
51+
cargo run --profile profiling -p delta-benchmarks -- merge --op upsert --matched 0.01 --not-matched 0.10
5252
```
5353

5454
Options:
55-
- `upsert | delete | insert`: operation to benchmark
55+
- `--op <upsert|delete|insert>`: operation to benchmark
5656
- `--matched <fraction>`: fraction of rows that match existing keys (default 0.01)
5757
- `--not-matched <fraction>`: fraction of rows that do not match (default 0.10)
58+
- `--case <name>`: run one of the predefined merge scenarios mirrored from the Delta Spark suite
59+
60+
List cases with:
61+
```bash
62+
cargo run --release -p delta-benchmarks -- merge --case single_insert_only_filesMatchedFraction_0.05_rowsNotMatchedFraction_0.05
63+
```
64+
65+
## TPC-DS query helper
66+
67+
All 99 TPC-DS SQL statements (matching the Spark benchmark suite) are stored under `queries/tpcds`. The CLI can list or print them:
68+
69+
```bash
70+
cargo run --release -p delta-benchmarks -- tpcds --list
71+
cargo run --release -p delta-benchmarks -- tpcds --case q1
72+
```
73+
74+
There is also a micro-benchmark that iterates over every query string to ensure the include paths stay wired correctly:
75+
76+
```bash
77+
cargo bench -p delta-benchmarks --bench tpcds
78+
```
5879

5980
### Flamegraphs using `samply`
6081

@@ -66,4 +87,4 @@ To start,
6687
cargo install samply --locked
6788
cargo build --profile profiling -p delta-benchmarks
6889
samply record ./target/profiling/delta-benchmarks upsert
69-
```
90+
```

crates/benchmarks/benches/merge.rs

Lines changed: 21 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::path::PathBuf;
22

33
use delta_benchmarks::{
4-
merge_delete, merge_insert, merge_upsert, prepare_source_and_table, MergeOp, MergePerfParams,
4+
delete_only_cases, insert_only_cases, prepare_source_and_table, upsert_cases, MergeTestCase,
55
};
66

77
use divan::{AllocProfiler, Bencher};
@@ -13,8 +13,11 @@ fn main() {
1313
#[global_allocator]
1414
static ALLOC: AllocProfiler = AllocProfiler::system();
1515

16-
fn bench_merge(bencher: Bencher, op: MergeOp, params: &MergePerfParams) {
16+
fn bench_merge_case(bencher: Bencher, case: &MergeTestCase) {
1717
let rt = tokio::runtime::Runtime::new().unwrap();
18+
let case_copy = *case;
19+
let params = case_copy.params;
20+
1821
bencher
1922
.with_inputs(|| {
2023
let tmp_dir = tempfile::tempdir().unwrap();
@@ -23,74 +26,33 @@ fn bench_merge(bencher: Bencher, op: MergeOp, params: &MergePerfParams) {
2326
.unwrap_or_else(|_| "data/tpcds_parquet".to_string()),
2427
);
2528
rt.block_on(async move {
26-
let (source, table) = prepare_source_and_table(params, &tmp_dir, &parquet_dir)
29+
let (source, table) = prepare_source_and_table(&params, &tmp_dir, &parquet_dir)
2730
.await
28-
.unwrap();
29-
(source, table, tmp_dir)
31+
.expect("prepare inputs");
32+
(case, source, table, tmp_dir)
3033
})
3134
})
32-
.bench_local_values(|(source, table, tmp_dir)| {
35+
.bench_local_values(|(case, source, table, tmp_dir)| {
3336
rt.block_on(async move {
34-
let _ = divan::black_box(op(source, table).unwrap().await.unwrap());
37+
let (_, metrics) = case.execute(source, table).await.expect("execute merge");
38+
case.validate(&metrics).expect("validate merge");
39+
divan::black_box(metrics.num_target_rows_inserted);
3540
});
3641
drop(tmp_dir);
3742
});
3843
}
3944

40-
#[divan::bench(args = [
41-
MergePerfParams {
42-
sample_matched_rows: 0.05,
43-
sample_not_matched_rows: 0.0,
44-
}
45-
])]
46-
fn delete_only(bencher: Bencher, params: &MergePerfParams) {
47-
bench_merge(bencher, merge_delete, params);
45+
#[divan::bench(args = insert_only_cases())]
46+
fn insert_only(bencher: Bencher, case: &MergeTestCase) {
47+
bench_merge_case(bencher, case);
4848
}
4949

50-
#[divan::bench(args = [
51-
MergePerfParams {
52-
sample_matched_rows: 0.00,
53-
sample_not_matched_rows: 0.05,
54-
},
55-
MergePerfParams {
56-
sample_matched_rows: 0.00,
57-
sample_not_matched_rows: 0.50,
58-
},
59-
MergePerfParams {
60-
sample_matched_rows: 0.00,
61-
sample_not_matched_rows: 1.0,
62-
},
63-
])]
64-
fn multiple_insert_only(bencher: Bencher, params: &MergePerfParams) {
65-
bench_merge(bencher, merge_insert, params);
50+
#[divan::bench(args = delete_only_cases())]
51+
fn delete_only(bencher: Bencher, case: &MergeTestCase) {
52+
bench_merge_case(bencher, case);
6653
}
6754

68-
#[divan::bench(args = [
69-
MergePerfParams {
70-
sample_matched_rows: 0.01,
71-
sample_not_matched_rows: 0.1,
72-
},
73-
MergePerfParams {
74-
sample_matched_rows: 0.1,
75-
sample_not_matched_rows: 0.0,
76-
},
77-
MergePerfParams {
78-
sample_matched_rows: 0.1,
79-
sample_not_matched_rows: 0.01,
80-
},
81-
MergePerfParams {
82-
sample_matched_rows: 0.5,
83-
sample_not_matched_rows: 0.001,
84-
},
85-
MergePerfParams {
86-
sample_matched_rows: 0.99,
87-
sample_not_matched_rows: 0.001,
88-
},
89-
MergePerfParams {
90-
sample_matched_rows: 0.001,
91-
sample_not_matched_rows: 0.001,
92-
},
93-
])]
94-
fn upsert_file_matched(bencher: Bencher, params: &MergePerfParams) {
95-
bench_merge(bencher, merge_upsert, params);
55+
#[divan::bench(args = upsert_cases())]
56+
fn upsert(bencher: Bencher, case: &MergeTestCase) {
57+
bench_merge_case(bencher, case);
9658
}

crates/benchmarks/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ pub mod merge;
22
pub mod smoke;
33

44
pub use merge::{
5-
merge_delete, merge_insert, merge_upsert, prepare_source_and_table, MergeOp, MergePerfParams,
5+
delete_only_cases, insert_only_cases, merge_case_by_name, merge_case_names, merge_delete,
6+
merge_insert, merge_test_cases, merge_upsert, prepare_source_and_table, upsert_cases, MergeOp,
7+
MergePerfParams, MergeScenario, MergeTestCase,
68
};
79
pub use smoke::{run_smoke_once, SmokeParams};

crates/benchmarks/src/main.rs

Lines changed: 82 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use std::{path::PathBuf, time::Instant};
33
use clap::{Parser, Subcommand, ValueEnum};
44

55
use delta_benchmarks::{
6-
merge_delete, merge_insert, merge_upsert, prepare_source_and_table, run_smoke_once, MergeOp,
7-
MergePerfParams, SmokeParams,
6+
merge_case_by_name, merge_case_names, merge_delete, merge_insert, merge_upsert,
7+
prepare_source_and_table, run_smoke_once, MergeOp, MergePerfParams, MergeTestCase, SmokeParams,
88
};
99
use deltalake_core::ensure_table_uri;
1010

@@ -28,7 +28,7 @@ enum Command {
2828
Merge {
2929
/// Operation to benchmark
3030
#[arg(value_enum)]
31-
op: OpKind,
31+
op: Option<OpKind>,
3232

3333
/// Fraction of rows that match an existing key (0.0-1.0)
3434
#[arg(long, default_value_t = 0.01)]
@@ -37,6 +37,10 @@ enum Command {
3737
/// Fraction of rows that do not match (0.0-1.0)
3838
#[arg(long, default_value_t = 0.10)]
3939
not_matched: f32,
40+
41+
/// Named test case to run (overrides manual parameters)
42+
#[arg(long)]
43+
case: Option<String>,
4044
},
4145

4246
/// Run the smoke workload to validate delta-rs read/write operations
@@ -60,36 +64,36 @@ async fn main() -> anyhow::Result<()> {
6064
op,
6165
matched,
6266
not_matched,
67+
case,
6368
} => {
64-
let op_fn: MergeOp = match op {
65-
OpKind::Upsert => merge_upsert,
66-
OpKind::Delete => merge_delete,
67-
OpKind::Insert => merge_insert,
68-
};
69-
70-
let params = MergePerfParams {
71-
sample_matched_rows: matched,
72-
sample_not_matched_rows: not_matched,
73-
};
74-
75-
let tmp_dir = tempfile::tempdir()?;
76-
77-
let parquet_dir = PathBuf::from(
78-
std::env::var("TPCDS_PARQUET_DIR")
79-
.unwrap_or_else(|_| "crates/benchmarks/data/tpcds_parquet".to_string()),
80-
);
81-
82-
let (source, table) = prepare_source_and_table(&params, &tmp_dir, &parquet_dir).await?;
83-
84-
let start = Instant::now();
85-
let (_table, metrics) = op_fn(source, table)?.await?;
86-
let elapsed = start.elapsed();
87-
88-
println!(
89-
"merge_duration_ms={} metrics={:?}",
90-
elapsed.as_millis(),
91-
metrics
92-
);
69+
if let Some(case_name) = case.as_deref() {
70+
let merge_case = merge_case_by_name(case_name).ok_or_else(|| {
71+
anyhow::anyhow!(
72+
"unknown merge case '{}'. Available cases: {}",
73+
case_name,
74+
merge_case_names().join(", ")
75+
)
76+
})?;
77+
78+
run_merge_case(merge_case).await?;
79+
} else {
80+
let op = op.ok_or_else(|| {
81+
anyhow::anyhow!("specify an operation (upsert/delete/insert) or provide --case")
82+
})?;
83+
84+
let op_fn: MergeOp = match op {
85+
OpKind::Upsert => merge_upsert,
86+
OpKind::Delete => merge_delete,
87+
OpKind::Insert => merge_insert,
88+
};
89+
90+
let params = MergePerfParams {
91+
sample_matched_rows: matched,
92+
sample_not_matched_rows: not_matched,
93+
};
94+
95+
run_merge_with_params(op_fn, &params).await?;
96+
}
9397
}
9498
Command::Smoke { rows, table_path } => {
9599
let params = SmokeParams { rows };
@@ -116,3 +120,49 @@ async fn main() -> anyhow::Result<()> {
116120

117121
Ok(())
118122
}
123+
124+
async fn run_merge_with_params(op_fn: MergeOp, params: &MergePerfParams) -> anyhow::Result<()> {
125+
let tmp_dir = tempfile::tempdir()?;
126+
let parquet_dir = PathBuf::from(
127+
std::env::var("TPCDS_PARQUET_DIR")
128+
.unwrap_or_else(|_| "crates/benchmarks/data/tpcds_parquet".to_string()),
129+
);
130+
131+
let (source, table) = prepare_source_and_table(params, &tmp_dir, &parquet_dir).await?;
132+
133+
let start = Instant::now();
134+
let (_table, metrics) = op_fn(source, table)?.await?;
135+
let elapsed = start.elapsed();
136+
137+
println!(
138+
"merge_duration_ms={} metrics={:?}",
139+
elapsed.as_millis(),
140+
metrics
141+
);
142+
143+
Ok(())
144+
}
145+
146+
async fn run_merge_case(case: &MergeTestCase) -> anyhow::Result<()> {
147+
let tmp_dir = tempfile::tempdir()?;
148+
let parquet_dir = PathBuf::from(
149+
std::env::var("TPCDS_PARQUET_DIR")
150+
.unwrap_or_else(|_| "crates/benchmarks/data/tpcds_parquet".to_string()),
151+
);
152+
153+
let (source, table) = prepare_source_and_table(&case.params, &tmp_dir, &parquet_dir).await?;
154+
155+
let start = Instant::now();
156+
let (_table, metrics) = case.execute(source, table).await?;
157+
case.validate(&metrics)?;
158+
let elapsed = start.elapsed();
159+
160+
println!(
161+
"merge_case={} merge_duration_ms={} metrics={:?}",
162+
case.name,
163+
elapsed.as_millis(),
164+
metrics
165+
);
166+
167+
Ok(())
168+
}

0 commit comments

Comments
 (0)