Skip to content

Commit 1bce4c0

Browse files
committed
Implement smoke 'benchmark'
Signed-off-by: Abhi Agarwal <[email protected]>
1 parent e6d9a2a commit 1bce4c0

File tree

5 files changed

+402
-219
lines changed

5 files changed

+402
-219
lines changed

crates/benchmarks/benches/smoke.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
use delta_benchmarks::{run_smoke_once, SmokeParams};
2+
use divan::{AllocProfiler, Bencher};
3+
use url::Url;
4+
5+
fn main() {
6+
divan::main();
7+
}
8+
9+
#[global_allocator]
10+
static ALLOC: AllocProfiler = AllocProfiler::system();
11+
12+
type Runtime = tokio::runtime::Runtime;
13+
14+
fn bench_smoke(bencher: Bencher, params: &SmokeParams) {
15+
let rt = Runtime::new().expect("create tokio runtime");
16+
bencher
17+
.with_inputs(|| tempfile::tempdir().expect("create temp dir"))
18+
.bench_local_values(|tmp_dir| {
19+
let table_url = Url::from_directory_path(tmp_dir.path()).expect("tmp dir url");
20+
rt.block_on(async {
21+
run_smoke_once(&table_url, params).await.expect("smoke run");
22+
});
23+
drop(tmp_dir);
24+
});
25+
}
26+
27+
#[divan::bench(args = [
28+
SmokeParams { rows: 2 },
29+
SmokeParams { rows: 10 },
30+
SmokeParams { rows: 100 },
31+
SmokeParams { rows: 1_000 },
32+
])]
33+
fn smoke(bencher: Bencher, params: &SmokeParams) {
34+
bench_smoke(bencher, params);
35+
}

crates/benchmarks/src/lib.rs

Lines changed: 5 additions & 182 deletions
Original file line numberDiff line numberDiff line change
@@ -1,184 +1,7 @@
1-
use std::path::Path;
1+
pub mod merge;
2+
pub mod smoke;
23

3-
use deltalake_core::datafusion::functions::expr_fn;
4-
use deltalake_core::kernel::engine::arrow_conversion::TryIntoKernel;
5-
use deltalake_core::kernel::{StructField, StructType};
6-
use deltalake_core::operations::merge::MergeBuilder;
7-
use deltalake_core::{arrow, DeltaResult};
8-
use deltalake_core::{
9-
datafusion::{
10-
logical_expr::{cast, lit},
11-
prelude::{DataFrame, ParquetReadOptions, SessionContext},
12-
},
13-
DeltaOps, DeltaTable, DeltaTableError,
4+
pub use merge::{
5+
merge_delete, merge_insert, merge_upsert, prepare_source_and_table, MergeOp, MergePerfParams,
146
};
15-
use tempfile::TempDir;
16-
use url::Url;
17-
18-
pub type MergeOp = fn(DataFrame, DeltaTable) -> Result<MergeBuilder, DeltaTableError>;
19-
20-
#[derive(Debug, Clone)]
21-
pub struct MergePerfParams {
22-
pub sample_matched_rows: f32,
23-
pub sample_not_matched_rows: f32,
24-
}
25-
26-
pub fn merge_upsert(source: DataFrame, table: DeltaTable) -> Result<MergeBuilder, DeltaTableError> {
27-
deltalake_core::DeltaOps(table)
28-
.merge(source, "source.wr_item_sk = target.wr_item_sk and source.wr_order_number = target.wr_order_number")
29-
.with_source_alias("source")
30-
.with_target_alias("target")
31-
.when_matched_update(|update| {
32-
update
33-
.update("wr_returned_date_sk", "source.wr_returned_date_sk")
34-
.update("wr_returned_time_sk", "source.wr_returned_time_sk")
35-
.update("wr_item_sk", "source.wr_item_sk")
36-
.update("wr_refunded_customer_sk", "source.wr_refunded_customer_sk")
37-
.update("wr_refunded_cdemo_sk", "source.wr_refunded_cdemo_sk")
38-
.update("wr_refunded_hdemo_sk", "source.wr_refunded_hdemo_sk")
39-
.update("wr_refunded_addr_sk", "source.wr_refunded_addr_sk")
40-
.update("wr_returning_customer_sk", "source.wr_returning_customer_sk")
41-
.update("wr_returning_cdemo_sk", "source.wr_returning_cdemo_sk")
42-
.update("wr_returning_hdemo_sk", "source.wr_returning_hdemo_sk")
43-
.update("wr_returning_addr_sk", "source.wr_returning_addr_sk")
44-
.update("wr_web_page_sk", "source.wr_web_page_sk")
45-
.update("wr_reason_sk", "source.wr_reason_sk")
46-
.update("wr_order_number", "source.wr_order_number")
47-
.update("wr_return_quantity", "source.wr_return_quantity")
48-
.update("wr_return_amt", "source.wr_return_amt")
49-
.update("wr_return_tax", "source.wr_return_tax")
50-
.update("wr_return_amt_inc_tax", "source.wr_return_amt_inc_tax")
51-
.update("wr_fee", "source.wr_fee")
52-
.update("wr_return_ship_cost", "source.wr_return_ship_cost")
53-
.update("wr_refunded_cash", "source.wr_refunded_cash")
54-
.update("wr_reversed_charge", "source.wr_reversed_charge")
55-
.update("wr_account_credit", "source.wr_account_credit")
56-
.update("wr_net_loss", "source.wr_net_loss")
57-
})?
58-
.when_not_matched_insert(|insert| {
59-
insert
60-
.set("wr_returned_date_sk", "source.wr_returned_date_sk")
61-
.set("wr_returned_time_sk", "source.wr_returned_time_sk")
62-
.set("wr_item_sk", "source.wr_item_sk")
63-
.set("wr_refunded_customer_sk", "source.wr_refunded_customer_sk")
64-
.set("wr_refunded_cdemo_sk", "source.wr_refunded_cdemo_sk")
65-
.set("wr_refunded_hdemo_sk", "source.wr_refunded_hdemo_sk")
66-
.set("wr_refunded_addr_sk", "source.wr_refunded_addr_sk")
67-
.set("wr_returning_customer_sk", "source.wr_returning_customer_sk")
68-
.set("wr_returning_cdemo_sk", "source.wr_returning_cdemo_sk")
69-
.set("wr_returning_hdemo_sk", "source.wr_returning_hdemo_sk")
70-
.set("wr_returning_addr_sk", "source.wr_returning_addr_sk")
71-
.set("wr_web_page_sk", "source.wr_web_page_sk")
72-
.set("wr_reason_sk", "source.wr_reason_sk")
73-
.set("wr_order_number", "source.wr_order_number")
74-
.set("wr_return_quantity", "source.wr_return_quantity")
75-
.set("wr_return_amt", "source.wr_return_amt")
76-
.set("wr_return_tax", "source.wr_return_tax")
77-
.set("wr_return_amt_inc_tax", "source.wr_return_amt_inc_tax")
78-
.set("wr_fee", "source.wr_fee")
79-
.set("wr_return_ship_cost", "source.wr_return_ship_cost")
80-
.set("wr_refunded_cash", "source.wr_refunded_cash")
81-
.set("wr_reversed_charge", "source.wr_reversed_charge")
82-
.set("wr_account_credit", "source.wr_account_credit")
83-
.set("wr_net_loss", "source.wr_net_loss")
84-
})
85-
}
86-
87-
pub fn merge_insert(source: DataFrame, table: DeltaTable) -> Result<MergeBuilder, DeltaTableError> {
88-
deltalake_core::DeltaOps(table)
89-
.merge(source, "source.wr_item_sk = target.wr_item_sk and source.wr_order_number = target.wr_order_number")
90-
.with_source_alias("source")
91-
.with_target_alias("target")
92-
.when_not_matched_insert(|insert| {
93-
insert
94-
.set("wr_returned_date_sk", "source.wr_returned_date_sk")
95-
.set("wr_returned_time_sk", "source.wr_returned_time_sk")
96-
.set("wr_item_sk", "source.wr_item_sk")
97-
.set("wr_refunded_customer_sk", "source.wr_refunded_customer_sk")
98-
.set("wr_refunded_cdemo_sk", "source.wr_refunded_cdemo_sk")
99-
.set("wr_refunded_hdemo_sk", "source.wr_refunded_hdemo_sk")
100-
.set("wr_refunded_addr_sk", "source.wr_refunded_addr_sk")
101-
.set("wr_returning_customer_sk", "source.wr_returning_customer_sk")
102-
.set("wr_returning_cdemo_sk", "source.wr_returning_cdemo_sk")
103-
.set("wr_returning_hdemo_sk", "source.wr_returning_hdemo_sk")
104-
.set("wr_returning_addr_sk", "source.wr_returning_addr_sk")
105-
.set("wr_web_page_sk", "source.wr_web_page_sk")
106-
.set("wr_reason_sk", "source.wr_reason_sk")
107-
.set("wr_order_number", "source.wr_order_number")
108-
.set("wr_return_quantity", "source.wr_return_quantity")
109-
.set("wr_return_amt", "source.wr_return_amt")
110-
.set("wr_return_tax", "source.wr_return_tax")
111-
.set("wr_return_amt_inc_tax", "source.wr_return_amt_inc_tax")
112-
.set("wr_fee", "source.wr_fee")
113-
.set("wr_return_ship_cost", "source.wr_return_ship_cost")
114-
.set("wr_refunded_cash", "source.wr_refunded_cash")
115-
.set("wr_reversed_charge", "source.wr_reversed_charge")
116-
.set("wr_account_credit", "source.wr_account_credit")
117-
.set("wr_net_loss", "source.wr_net_loss")
118-
})
119-
}
120-
121-
pub fn merge_delete(source: DataFrame, table: DeltaTable) -> Result<MergeBuilder, DeltaTableError> {
122-
deltalake_core::DeltaOps(table)
123-
.merge(source, "source.wr_item_sk = target.wr_item_sk and source.wr_order_number = target.wr_order_number")
124-
.with_source_alias("source")
125-
.with_target_alias("target")
126-
.when_matched_delete(|delete| delete)
127-
}
128-
129-
/// Prepare source DataFrame and target Delta table from DuckDB-generated TPC-DS parquet.
130-
/// Creates a temporary Delta table from web_returns.parquet as the target.
131-
/// Returns (source_df, target_table) for benchmarking.
132-
pub async fn prepare_source_and_table(
133-
params: &MergePerfParams,
134-
tmp_dir: &TempDir,
135-
parquet_dir: &Path,
136-
) -> DeltaResult<(DataFrame, DeltaTable)> {
137-
let ctx = SessionContext::new();
138-
139-
let parquet_path = parquet_dir
140-
.join("web_returns.parquet")
141-
.to_str()
142-
.unwrap()
143-
.to_owned();
144-
145-
let parquet_df = ctx
146-
.read_parquet(&parquet_path, ParquetReadOptions::default())
147-
.await?;
148-
let temp_table_url = Url::from_directory_path(tmp_dir).unwrap();
149-
150-
let schema = parquet_df.schema();
151-
let delta_schema: StructType = schema.as_arrow().try_into_kernel().unwrap();
152-
153-
let batches = parquet_df.collect().await?;
154-
let fields: Vec<StructField> = delta_schema.fields().cloned().collect();
155-
let table = DeltaOps::try_from_uri(temp_table_url)
156-
.await?
157-
.create()
158-
.with_columns(fields)
159-
.await?;
160-
161-
let table = DeltaOps(table).write(batches).await?;
162-
163-
// Now prepare source DataFrame with sampling
164-
let source = ctx
165-
.read_parquet(&parquet_path, ParquetReadOptions::default())
166-
.await?;
167-
168-
// Split matched and not-matched portions
169-
let matched = source
170-
.clone()
171-
.filter(expr_fn::random().lt_eq(lit(params.sample_matched_rows)))?;
172-
173-
let rand = cast(
174-
expr_fn::random() * lit(u32::MAX),
175-
arrow::datatypes::DataType::Int64,
176-
);
177-
let not_matched = source
178-
.filter(expr_fn::random().lt_eq(lit(params.sample_not_matched_rows)))?
179-
.with_column("wr_item_sk", rand.clone())?
180-
.with_column("wr_order_number", rand)?;
181-
182-
let source = matched.union(not_matched)?;
183-
Ok((source, table))
184-
}
7+
pub use smoke::{run_smoke_once, SmokeParams};

crates/benchmarks/src/main.rs

Lines changed: 89 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
use std::{path::PathBuf, time::Instant};
22

3-
use clap::{Parser, ValueEnum};
3+
use clap::{Parser, Subcommand, ValueEnum};
44

55
use delta_benchmarks::{
6-
merge_delete, merge_insert, merge_upsert, prepare_source_and_table, MergeOp, MergePerfParams,
6+
merge_delete, merge_insert, merge_upsert, prepare_source_and_table, run_smoke_once, MergeOp,
7+
MergePerfParams, SmokeParams,
78
};
9+
use deltalake_core::ensure_table_uri;
810

911
#[derive(Copy, Clone, Debug, ValueEnum)]
1012
enum OpKind {
@@ -14,53 +16,103 @@ enum OpKind {
1416
}
1517

1618
#[derive(Parser, Debug)]
17-
#[command(about = "Run a merge benchmark with configurable parameters")]
19+
#[command(about = "Run delta-rs benchmarks")]
1820
struct Cli {
19-
/// Operation to benchmark
20-
#[arg(value_enum)]
21-
op: OpKind,
21+
#[command(subcommand)]
22+
command: Command,
23+
}
24+
25+
#[derive(Debug, Subcommand)]
26+
enum Command {
27+
/// Run a merge benchmark with configurable parameters
28+
Merge {
29+
/// Operation to benchmark
30+
#[arg(value_enum)]
31+
op: OpKind,
32+
33+
/// Fraction of rows that match an existing key (0.0-1.0)
34+
#[arg(long, default_value_t = 0.01)]
35+
matched: f32,
36+
37+
/// Fraction of rows that do not match (0.0-1.0)
38+
#[arg(long, default_value_t = 0.10)]
39+
not_matched: f32,
40+
},
2241

23-
/// Fraction of rows that match an existing key (0.0-1.0)
24-
#[arg(long, default_value_t = 0.01)]
25-
matched: f32,
42+
/// Run the smoke workload to validate delta-rs read/write operations
43+
Smoke {
44+
/// Number of rows to write into the smoke table
45+
#[arg(long, default_value_t = 2)]
46+
rows: usize,
2647

27-
/// Fraction of rows that do not match (0.0-1.0)
28-
#[arg(long, default_value_t = 0.10)]
29-
not_matched: f32,
48+
/// Optional table path to reuse for the smoke run (defaults to a temporary directory)
49+
#[arg(long)]
50+
table_path: Option<PathBuf>,
51+
},
3052
}
3153

3254
#[tokio::main]
33-
async fn main() {
55+
async fn main() -> anyhow::Result<()> {
3456
let cli = Cli::parse();
3557

36-
let op_fn: MergeOp = match cli.op {
37-
OpKind::Upsert => merge_upsert,
38-
OpKind::Delete => merge_delete,
39-
OpKind::Insert => merge_insert,
40-
};
58+
match cli.command {
59+
Command::Merge {
60+
op,
61+
matched,
62+
not_matched,
63+
} => {
64+
let op_fn: MergeOp = match op {
65+
OpKind::Upsert => merge_upsert,
66+
OpKind::Delete => merge_delete,
67+
OpKind::Insert => merge_insert,
68+
};
69+
70+
let params = MergePerfParams {
71+
sample_matched_rows: matched,
72+
sample_not_matched_rows: not_matched,
73+
};
74+
75+
let tmp_dir = tempfile::tempdir()?;
76+
77+
let parquet_dir = PathBuf::from(
78+
std::env::var("TPCDS_PARQUET_DIR")
79+
.unwrap_or_else(|_| "crates/benchmarks/data/tpcds_parquet".to_string()),
80+
);
4181

42-
let params = MergePerfParams {
43-
sample_matched_rows: cli.matched,
44-
sample_not_matched_rows: cli.not_matched,
45-
};
82+
let (source, table) = prepare_source_and_table(&params, &tmp_dir, &parquet_dir).await?;
4683

47-
let tmp_dir = tempfile::tempdir().expect("create tmp dir");
84+
let start = Instant::now();
85+
let (_table, metrics) = op_fn(source, table)?.await?;
86+
let elapsed = start.elapsed();
4887

49-
let parquet_dir = PathBuf::from(
50-
std::env::var("TPCDS_PARQUET_DIR")
51-
.unwrap_or_else(|_| "crates/benchmarks/data/tpcds_parquet".to_string()),
52-
);
88+
println!(
89+
"merge_duration_ms={} metrics={:?}",
90+
elapsed.as_millis(),
91+
metrics
92+
);
93+
}
94+
Command::Smoke { rows, table_path } => {
95+
let params = SmokeParams { rows };
96+
let (table_url, _guard) = match table_path {
97+
Some(path) => (ensure_table_uri(path.to_string_lossy().as_ref())?, None),
98+
None => {
99+
let dir = tempfile::tempdir()?;
100+
let url = ensure_table_uri(dir.path().to_string_lossy().as_ref())?;
101+
(url, Some(dir))
102+
}
103+
};
53104

54-
let (source, table) = prepare_source_and_table(&params, &tmp_dir, &parquet_dir)
55-
.await
56-
.expect("prepare inputs");
105+
let start = Instant::now();
106+
run_smoke_once(&table_url, &params).await?;
107+
let elapsed = start.elapsed();
57108

58-
let start = Instant::now();
59-
let (_table, metrics) = op_fn(source, table)
60-
.expect("build merge")
61-
.await
62-
.expect("execute merge");
63-
let elapsed = start.elapsed();
109+
println!(
110+
"smoke_duration_ms={} table_uri={}",
111+
elapsed.as_millis(),
112+
table_url
113+
);
114+
}
115+
}
64116

65-
println!("duration_ms={} metrics={:?}", elapsed.as_millis(), metrics)
117+
Ok(())
66118
}

0 commit comments

Comments
 (0)