Skip to content

Commit 91405d8

Browse files
abhiaagarwalrtyler
authored andcommitted
Cleanup
Signed-off-by: Abhi Agarwal <[email protected]>
1 parent 9c94ae9 commit 91405d8

File tree

2 files changed

+31
-31
lines changed

2 files changed

+31
-31
lines changed

crates/benchmarks/benches/merge.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@ static ALLOC: AllocProfiler = AllocProfiler::system();
1515

1616
fn bench_merge_case(bencher: Bencher, case: &MergeTestCase) {
1717
let rt = tokio::runtime::Runtime::new().unwrap();
18-
let case_copy = *case;
19-
let params = case_copy.params;
20-
2118
bencher
2219
.with_inputs(|| {
2320
let tmp_dir = tempfile::tempdir().unwrap();
@@ -26,17 +23,16 @@ fn bench_merge_case(bencher: Bencher, case: &MergeTestCase) {
2623
.unwrap_or_else(|_| "data/tpcds_parquet".to_string()),
2724
);
2825
rt.block_on(async move {
29-
let (source, table) = prepare_source_and_table(&params, &tmp_dir, &parquet_dir)
30-
.await
31-
.expect("prepare inputs");
26+
let (source, table) =
27+
prepare_source_and_table(&case.params, &tmp_dir, &parquet_dir)
28+
.await
29+
.expect("prepare inputs");
3230
(case, source, table, tmp_dir)
3331
})
3432
})
3533
.bench_local_values(|(case, source, table, tmp_dir)| {
3634
rt.block_on(async move {
37-
let (_, metrics) = case.execute(source, table).await.expect("execute merge");
38-
case.validate(&metrics).expect("validate merge");
39-
divan::black_box(metrics.num_target_rows_inserted);
35+
divan::black_box(case.execute(source, table).await.expect("execute merge"));
4036
});
4137
drop(tmp_dir);
4238
});

crates/benchmarks/src/main.rs

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use std::{path::PathBuf, time::Instant};
1+
use std::{
2+
path::{Path, PathBuf},
3+
time::Instant,
4+
};
25

36
use clap::{Parser, Subcommand, ValueEnum};
47

@@ -21,6 +24,14 @@ enum OpKind {
2124
struct Cli {
2225
#[command(subcommand)]
2326
command: Command,
27+
28+
/// Path to the parquet directory
29+
#[arg(
30+
long,
31+
env = "TPCDS_PARQUET_DIR",
32+
default_value = "crates/benchmarks/data/tpcds_parquet"
33+
)]
34+
parquet_dir: PathBuf,
2435
}
2536

2637
#[derive(Debug, Subcommand)]
@@ -75,6 +86,8 @@ enum Command {
7586
async fn main() -> anyhow::Result<()> {
7687
let cli = Cli::parse();
7788

89+
let parquet_dir = cli.parquet_dir;
90+
7891
match cli.command {
7992
Command::Merge {
8093
op,
@@ -91,7 +104,7 @@ async fn main() -> anyhow::Result<()> {
91104
)
92105
})?;
93106

94-
run_merge_case(merge_case).await?;
107+
run_merge_case(merge_case, &parquet_dir).await?;
95108
} else {
96109
let op = op.ok_or_else(|| {
97110
anyhow::anyhow!("specify an operation (upsert/delete/insert) or provide --case")
@@ -108,7 +121,7 @@ async fn main() -> anyhow::Result<()> {
108121
sample_not_matched_rows: not_matched,
109122
};
110123

111-
run_merge_with_params(op_fn, &params).await?;
124+
run_merge_with_params(op_fn, &params, &parquet_dir).await?;
112125
}
113126
}
114127
Command::Smoke { rows, table_path } => {
@@ -147,7 +160,7 @@ async fn main() -> anyhow::Result<()> {
147160
};
148161

149162
if run {
150-
run_tpcds_query(&name).await?;
163+
run_tpcds_query(&name, &parquet_dir).await?;
151164
} else {
152165
println!("-- {name}\n{}", sql.trim());
153166
}
@@ -160,14 +173,14 @@ async fn main() -> anyhow::Result<()> {
160173
Ok(())
161174
}
162175

163-
async fn run_merge_with_params(op_fn: MergeOp, params: &MergePerfParams) -> anyhow::Result<()> {
176+
async fn run_merge_with_params(
177+
op_fn: MergeOp,
178+
params: &MergePerfParams,
179+
parquet_dir: &Path,
180+
) -> anyhow::Result<()> {
164181
let tmp_dir = tempfile::tempdir()?;
165-
let parquet_dir = PathBuf::from(
166-
std::env::var("TPCDS_PARQUET_DIR")
167-
.unwrap_or_else(|_| "crates/benchmarks/data/tpcds_parquet".to_string()),
168-
);
169182

170-
let (source, table) = prepare_source_and_table(params, &tmp_dir, &parquet_dir).await?;
183+
let (source, table) = prepare_source_and_table(params, &tmp_dir, parquet_dir).await?;
171184

172185
let start = Instant::now();
173186
let (_table, metrics) = op_fn(source, table)?.await?;
@@ -182,13 +195,8 @@ async fn run_merge_with_params(op_fn: MergeOp, params: &MergePerfParams) -> anyh
182195
Ok(())
183196
}
184197

185-
async fn run_merge_case(case: &MergeTestCase) -> anyhow::Result<()> {
198+
async fn run_merge_case(case: &MergeTestCase, parquet_dir: &Path) -> anyhow::Result<()> {
186199
let tmp_dir = tempfile::tempdir()?;
187-
let parquet_dir = PathBuf::from(
188-
std::env::var("TPCDS_PARQUET_DIR")
189-
.unwrap_or_else(|_| "crates/benchmarks/data/tpcds_parquet".to_string()),
190-
);
191-
192200
let (source, table) = prepare_source_and_table(&case.params, &tmp_dir, &parquet_dir).await?;
193201

194202
let start = Instant::now();
@@ -206,16 +214,12 @@ async fn run_merge_case(case: &MergeTestCase) -> anyhow::Result<()> {
206214
Ok(())
207215
}
208216

209-
async fn run_tpcds_query(query_name: &str) -> anyhow::Result<()> {
217+
async fn run_tpcds_query(query_name: &str, parquet_dir: &Path) -> anyhow::Result<()> {
210218
let tmp_dir = tempfile::tempdir()?;
211-
let parquet_dir = PathBuf::from(
212-
std::env::var("TPCDS_PARQUET_DIR")
213-
.unwrap_or_else(|_| "crates/benchmarks/data/tpcds_parquet".to_string()),
214-
);
215219

216220
println!("Loading TPC-DS tables from {}...", parquet_dir.display());
217221
let setup_start = Instant::now();
218-
let ctx = register_tpcds_tables(&tmp_dir, &parquet_dir).await?;
222+
let ctx = register_tpcds_tables(&tmp_dir, parquet_dir).await?;
219223
let setup_elapsed = setup_start.elapsed();
220224
println!("Setup completed in {} ms", setup_elapsed.as_millis());
221225

0 commit comments

Comments
 (0)