|
1 | | -use std::path::Path; |
| 1 | +pub mod merge; |
| 2 | +pub mod smoke; |
2 | 3 |
|
3 | | -use deltalake_core::datafusion::functions::expr_fn; |
4 | | -use deltalake_core::kernel::engine::arrow_conversion::TryIntoKernel; |
5 | | -use deltalake_core::kernel::{StructField, StructType}; |
6 | | -use deltalake_core::operations::merge::MergeBuilder; |
7 | | -use deltalake_core::{arrow, DeltaResult}; |
8 | | -use deltalake_core::{ |
9 | | - datafusion::{ |
10 | | - logical_expr::{cast, lit}, |
11 | | - prelude::{DataFrame, ParquetReadOptions, SessionContext}, |
12 | | - }, |
13 | | - DeltaOps, DeltaTable, DeltaTableError, |
| 4 | +pub use merge::{ |
| 5 | + merge_delete, merge_insert, merge_upsert, prepare_source_and_table, MergeOp, MergePerfParams, |
14 | 6 | }; |
15 | | -use tempfile::TempDir; |
16 | | -use url::Url; |
17 | | - |
18 | | -pub type MergeOp = fn(DataFrame, DeltaTable) -> Result<MergeBuilder, DeltaTableError>; |
19 | | - |
20 | | -#[derive(Debug, Clone)] |
21 | | -pub struct MergePerfParams { |
22 | | - pub sample_matched_rows: f32, |
23 | | - pub sample_not_matched_rows: f32, |
24 | | -} |
25 | | - |
26 | | -pub fn merge_upsert(source: DataFrame, table: DeltaTable) -> Result<MergeBuilder, DeltaTableError> { |
27 | | - deltalake_core::DeltaOps(table) |
28 | | - .merge(source, "source.wr_item_sk = target.wr_item_sk and source.wr_order_number = target.wr_order_number") |
29 | | - .with_source_alias("source") |
30 | | - .with_target_alias("target") |
31 | | - .when_matched_update(|update| { |
32 | | - update |
33 | | - .update("wr_returned_date_sk", "source.wr_returned_date_sk") |
34 | | - .update("wr_returned_time_sk", "source.wr_returned_time_sk") |
35 | | - .update("wr_item_sk", "source.wr_item_sk") |
36 | | - .update("wr_refunded_customer_sk", "source.wr_refunded_customer_sk") |
37 | | - .update("wr_refunded_cdemo_sk", "source.wr_refunded_cdemo_sk") |
38 | | - .update("wr_refunded_hdemo_sk", "source.wr_refunded_hdemo_sk") |
39 | | - .update("wr_refunded_addr_sk", "source.wr_refunded_addr_sk") |
40 | | - .update("wr_returning_customer_sk", "source.wr_returning_customer_sk") |
41 | | - .update("wr_returning_cdemo_sk", "source.wr_returning_cdemo_sk") |
42 | | - .update("wr_returning_hdemo_sk", "source.wr_returning_hdemo_sk") |
43 | | - .update("wr_returning_addr_sk", "source.wr_returning_addr_sk") |
44 | | - .update("wr_web_page_sk", "source.wr_web_page_sk") |
45 | | - .update("wr_reason_sk", "source.wr_reason_sk") |
46 | | - .update("wr_order_number", "source.wr_order_number") |
47 | | - .update("wr_return_quantity", "source.wr_return_quantity") |
48 | | - .update("wr_return_amt", "source.wr_return_amt") |
49 | | - .update("wr_return_tax", "source.wr_return_tax") |
50 | | - .update("wr_return_amt_inc_tax", "source.wr_return_amt_inc_tax") |
51 | | - .update("wr_fee", "source.wr_fee") |
52 | | - .update("wr_return_ship_cost", "source.wr_return_ship_cost") |
53 | | - .update("wr_refunded_cash", "source.wr_refunded_cash") |
54 | | - .update("wr_reversed_charge", "source.wr_reversed_charge") |
55 | | - .update("wr_account_credit", "source.wr_account_credit") |
56 | | - .update("wr_net_loss", "source.wr_net_loss") |
57 | | - })? |
58 | | - .when_not_matched_insert(|insert| { |
59 | | - insert |
60 | | - .set("wr_returned_date_sk", "source.wr_returned_date_sk") |
61 | | - .set("wr_returned_time_sk", "source.wr_returned_time_sk") |
62 | | - .set("wr_item_sk", "source.wr_item_sk") |
63 | | - .set("wr_refunded_customer_sk", "source.wr_refunded_customer_sk") |
64 | | - .set("wr_refunded_cdemo_sk", "source.wr_refunded_cdemo_sk") |
65 | | - .set("wr_refunded_hdemo_sk", "source.wr_refunded_hdemo_sk") |
66 | | - .set("wr_refunded_addr_sk", "source.wr_refunded_addr_sk") |
67 | | - .set("wr_returning_customer_sk", "source.wr_returning_customer_sk") |
68 | | - .set("wr_returning_cdemo_sk", "source.wr_returning_cdemo_sk") |
69 | | - .set("wr_returning_hdemo_sk", "source.wr_returning_hdemo_sk") |
70 | | - .set("wr_returning_addr_sk", "source.wr_returning_addr_sk") |
71 | | - .set("wr_web_page_sk", "source.wr_web_page_sk") |
72 | | - .set("wr_reason_sk", "source.wr_reason_sk") |
73 | | - .set("wr_order_number", "source.wr_order_number") |
74 | | - .set("wr_return_quantity", "source.wr_return_quantity") |
75 | | - .set("wr_return_amt", "source.wr_return_amt") |
76 | | - .set("wr_return_tax", "source.wr_return_tax") |
77 | | - .set("wr_return_amt_inc_tax", "source.wr_return_amt_inc_tax") |
78 | | - .set("wr_fee", "source.wr_fee") |
79 | | - .set("wr_return_ship_cost", "source.wr_return_ship_cost") |
80 | | - .set("wr_refunded_cash", "source.wr_refunded_cash") |
81 | | - .set("wr_reversed_charge", "source.wr_reversed_charge") |
82 | | - .set("wr_account_credit", "source.wr_account_credit") |
83 | | - .set("wr_net_loss", "source.wr_net_loss") |
84 | | - }) |
85 | | -} |
86 | | - |
87 | | -pub fn merge_insert(source: DataFrame, table: DeltaTable) -> Result<MergeBuilder, DeltaTableError> { |
88 | | - deltalake_core::DeltaOps(table) |
89 | | - .merge(source, "source.wr_item_sk = target.wr_item_sk and source.wr_order_number = target.wr_order_number") |
90 | | - .with_source_alias("source") |
91 | | - .with_target_alias("target") |
92 | | - .when_not_matched_insert(|insert| { |
93 | | - insert |
94 | | - .set("wr_returned_date_sk", "source.wr_returned_date_sk") |
95 | | - .set("wr_returned_time_sk", "source.wr_returned_time_sk") |
96 | | - .set("wr_item_sk", "source.wr_item_sk") |
97 | | - .set("wr_refunded_customer_sk", "source.wr_refunded_customer_sk") |
98 | | - .set("wr_refunded_cdemo_sk", "source.wr_refunded_cdemo_sk") |
99 | | - .set("wr_refunded_hdemo_sk", "source.wr_refunded_hdemo_sk") |
100 | | - .set("wr_refunded_addr_sk", "source.wr_refunded_addr_sk") |
101 | | - .set("wr_returning_customer_sk", "source.wr_returning_customer_sk") |
102 | | - .set("wr_returning_cdemo_sk", "source.wr_returning_cdemo_sk") |
103 | | - .set("wr_returning_hdemo_sk", "source.wr_returning_hdemo_sk") |
104 | | - .set("wr_returning_addr_sk", "source.wr_returning_addr_sk") |
105 | | - .set("wr_web_page_sk", "source.wr_web_page_sk") |
106 | | - .set("wr_reason_sk", "source.wr_reason_sk") |
107 | | - .set("wr_order_number", "source.wr_order_number") |
108 | | - .set("wr_return_quantity", "source.wr_return_quantity") |
109 | | - .set("wr_return_amt", "source.wr_return_amt") |
110 | | - .set("wr_return_tax", "source.wr_return_tax") |
111 | | - .set("wr_return_amt_inc_tax", "source.wr_return_amt_inc_tax") |
112 | | - .set("wr_fee", "source.wr_fee") |
113 | | - .set("wr_return_ship_cost", "source.wr_return_ship_cost") |
114 | | - .set("wr_refunded_cash", "source.wr_refunded_cash") |
115 | | - .set("wr_reversed_charge", "source.wr_reversed_charge") |
116 | | - .set("wr_account_credit", "source.wr_account_credit") |
117 | | - .set("wr_net_loss", "source.wr_net_loss") |
118 | | - }) |
119 | | -} |
120 | | - |
121 | | -pub fn merge_delete(source: DataFrame, table: DeltaTable) -> Result<MergeBuilder, DeltaTableError> { |
122 | | - deltalake_core::DeltaOps(table) |
123 | | - .merge(source, "source.wr_item_sk = target.wr_item_sk and source.wr_order_number = target.wr_order_number") |
124 | | - .with_source_alias("source") |
125 | | - .with_target_alias("target") |
126 | | - .when_matched_delete(|delete| delete) |
127 | | -} |
128 | | - |
129 | | -/// Prepare source DataFrame and target Delta table from DuckDB-generated TPC-DS parquet. |
130 | | -/// Creates a temporary Delta table from web_returns.parquet as the target. |
131 | | -/// Returns (source_df, target_table) for benchmarking. |
132 | | -pub async fn prepare_source_and_table( |
133 | | - params: &MergePerfParams, |
134 | | - tmp_dir: &TempDir, |
135 | | - parquet_dir: &Path, |
136 | | -) -> DeltaResult<(DataFrame, DeltaTable)> { |
137 | | - let ctx = SessionContext::new(); |
138 | | - |
139 | | - let parquet_path = parquet_dir |
140 | | - .join("web_returns.parquet") |
141 | | - .to_str() |
142 | | - .unwrap() |
143 | | - .to_owned(); |
144 | | - |
145 | | - let parquet_df = ctx |
146 | | - .read_parquet(&parquet_path, ParquetReadOptions::default()) |
147 | | - .await?; |
148 | | - let temp_table_url = Url::from_directory_path(tmp_dir).unwrap(); |
149 | | - |
150 | | - let schema = parquet_df.schema(); |
151 | | - let delta_schema: StructType = schema.as_arrow().try_into_kernel().unwrap(); |
152 | | - |
153 | | - let batches = parquet_df.collect().await?; |
154 | | - let fields: Vec<StructField> = delta_schema.fields().cloned().collect(); |
155 | | - let table = DeltaOps::try_from_uri(temp_table_url) |
156 | | - .await? |
157 | | - .create() |
158 | | - .with_columns(fields) |
159 | | - .await?; |
160 | | - |
161 | | - let table = DeltaOps(table).write(batches).await?; |
162 | | - |
163 | | - // Now prepare source DataFrame with sampling |
164 | | - let source = ctx |
165 | | - .read_parquet(&parquet_path, ParquetReadOptions::default()) |
166 | | - .await?; |
167 | | - |
168 | | - // Split matched and not-matched portions |
169 | | - let matched = source |
170 | | - .clone() |
171 | | - .filter(expr_fn::random().lt_eq(lit(params.sample_matched_rows)))?; |
172 | | - |
173 | | - let rand = cast( |
174 | | - expr_fn::random() * lit(u32::MAX), |
175 | | - arrow::datatypes::DataType::Int64, |
176 | | - ); |
177 | | - let not_matched = source |
178 | | - .filter(expr_fn::random().lt_eq(lit(params.sample_not_matched_rows)))? |
179 | | - .with_column("wr_item_sk", rand.clone())? |
180 | | - .with_column("wr_order_number", rand)?; |
181 | | - |
182 | | - let source = matched.union(not_matched)?; |
183 | | - Ok((source, table)) |
184 | | -} |
| 7 | +pub use smoke::{run_smoke_once, SmokeParams}; |
0 commit comments