@@ -3,8 +3,8 @@ use std::{path::PathBuf, time::Instant};
33use clap:: { Parser , Subcommand , ValueEnum } ;
44
55use delta_benchmarks:: {
6- merge_delete , merge_insert , merge_upsert , prepare_source_and_table , run_smoke_once , MergeOp ,
7- MergePerfParams , SmokeParams ,
6+ merge_case_by_name , merge_case_names , merge_delete , merge_insert , merge_upsert ,
7+ prepare_source_and_table , run_smoke_once , MergeOp , MergePerfParams , MergeTestCase , SmokeParams ,
88} ;
99use deltalake_core:: ensure_table_uri;
1010
@@ -28,7 +28,7 @@ enum Command {
2828 Merge {
2929 /// Operation to benchmark
3030 #[ arg( value_enum) ]
31- op : OpKind ,
31+ op : Option < OpKind > ,
3232
3333 /// Fraction of rows that match an existing key (0.0-1.0)
3434 #[ arg( long, default_value_t = 0.01 ) ]
@@ -37,6 +37,10 @@ enum Command {
3737 /// Fraction of rows that do not match (0.0-1.0)
3838 #[ arg( long, default_value_t = 0.10 ) ]
3939 not_matched : f32 ,
40+
41+ /// Named test case to run (overrides manual parameters)
42+ #[ arg( long) ]
43+ case : Option < String > ,
4044 } ,
4145
4246 /// Run the smoke workload to validate delta-rs read/write operations
@@ -60,36 +64,36 @@ async fn main() -> anyhow::Result<()> {
6064 op,
6165 matched,
6266 not_matched,
67+ case,
6368 } => {
64- let op_fn: MergeOp = match op {
65- OpKind :: Upsert => merge_upsert,
66- OpKind :: Delete => merge_delete,
67- OpKind :: Insert => merge_insert,
68- } ;
69-
70- let params = MergePerfParams {
71- sample_matched_rows : matched,
72- sample_not_matched_rows : not_matched,
73- } ;
74-
75- let tmp_dir = tempfile:: tempdir ( ) ?;
76-
77- let parquet_dir = PathBuf :: from (
78- std:: env:: var ( "TPCDS_PARQUET_DIR" )
79- . unwrap_or_else ( |_| "crates/benchmarks/data/tpcds_parquet" . to_string ( ) ) ,
80- ) ;
81-
82- let ( source, table) = prepare_source_and_table ( & params, & tmp_dir, & parquet_dir) . await ?;
83-
84- let start = Instant :: now ( ) ;
85- let ( _table, metrics) = op_fn ( source, table) ?. await ?;
86- let elapsed = start. elapsed ( ) ;
87-
88- println ! (
89- "merge_duration_ms={} metrics={:?}" ,
90- elapsed. as_millis( ) ,
91- metrics
92- ) ;
69+ if let Some ( case_name) = case. as_deref ( ) {
70+ let merge_case = merge_case_by_name ( case_name) . ok_or_else ( || {
71+ anyhow:: anyhow!(
72+ "unknown merge case '{}'. Available cases: {}" ,
73+ case_name,
74+ merge_case_names( ) . join( ", " )
75+ )
76+ } ) ?;
77+
78+ run_merge_case ( merge_case) . await ?;
79+ } else {
80+ let op = op. ok_or_else ( || {
81+ anyhow:: anyhow!( "specify an operation (upsert/delete/insert) or provide --case" )
82+ } ) ?;
83+
84+ let op_fn: MergeOp = match op {
85+ OpKind :: Upsert => merge_upsert,
86+ OpKind :: Delete => merge_delete,
87+ OpKind :: Insert => merge_insert,
88+ } ;
89+
90+ let params = MergePerfParams {
91+ sample_matched_rows : matched,
92+ sample_not_matched_rows : not_matched,
93+ } ;
94+
95+ run_merge_with_params ( op_fn, & params) . await ?;
96+ }
9397 }
9498 Command :: Smoke { rows, table_path } => {
9599 let params = SmokeParams { rows } ;
@@ -116,3 +120,49 @@ async fn main() -> anyhow::Result<()> {
116120
117121 Ok ( ( ) )
118122}
123+
124+ async fn run_merge_with_params ( op_fn : MergeOp , params : & MergePerfParams ) -> anyhow:: Result < ( ) > {
125+ let tmp_dir = tempfile:: tempdir ( ) ?;
126+ let parquet_dir = PathBuf :: from (
127+ std:: env:: var ( "TPCDS_PARQUET_DIR" )
128+ . unwrap_or_else ( |_| "crates/benchmarks/data/tpcds_parquet" . to_string ( ) ) ,
129+ ) ;
130+
131+ let ( source, table) = prepare_source_and_table ( params, & tmp_dir, & parquet_dir) . await ?;
132+
133+ let start = Instant :: now ( ) ;
134+ let ( _table, metrics) = op_fn ( source, table) ?. await ?;
135+ let elapsed = start. elapsed ( ) ;
136+
137+ println ! (
138+ "merge_duration_ms={} metrics={:?}" ,
139+ elapsed. as_millis( ) ,
140+ metrics
141+ ) ;
142+
143+ Ok ( ( ) )
144+ }
145+
146+ async fn run_merge_case ( case : & MergeTestCase ) -> anyhow:: Result < ( ) > {
147+ let tmp_dir = tempfile:: tempdir ( ) ?;
148+ let parquet_dir = PathBuf :: from (
149+ std:: env:: var ( "TPCDS_PARQUET_DIR" )
150+ . unwrap_or_else ( |_| "crates/benchmarks/data/tpcds_parquet" . to_string ( ) ) ,
151+ ) ;
152+
153+ let ( source, table) = prepare_source_and_table ( & case. params , & tmp_dir, & parquet_dir) . await ?;
154+
155+ let start = Instant :: now ( ) ;
156+ let ( _table, metrics) = case. execute ( source, table) . await ?;
157+ case. validate ( & metrics) ?;
158+ let elapsed = start. elapsed ( ) ;
159+
160+ println ! (
161+ "merge_case={} merge_duration_ms={} metrics={:?}" ,
162+ case. name,
163+ elapsed. as_millis( ) ,
164+ metrics
165+ ) ;
166+
167+ Ok ( ( ) )
168+ }
0 commit comments