Skip to content

Commit a0d535e

Browse files
Fix merges and switch to snapshot testing
Signed-off-by: Abhi Agarwal <[email protected]>
1 parent 9fd42f7 commit a0d535e

20 files changed

+547
-654
lines changed

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,7 @@ x509-cert = { version = "0.2.2", default-features = false }
8484

8585
[profile.bench]
8686
debug = true
87+
88+
[profile.dev.package]
89+
insta.opt-level = 3
90+
similar.opt-level = 3

etl-destinations/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ tracing = { workspace = true, optional = true, default-features = true }
4949
etl = { workspace = true, features = ["test-utils"] }
5050
etl-telemetry = { workspace = true }
5151
deltalake = { workspace = true, default-features = false, features = ["rustls", "datafusion", "s3"] }
52+
insta = { workspace = true }
5253

5354
base64 = { workspace = true }
5455
chrono = { workspace = true }

etl-destinations/src/deltalake/operations/merge.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use deltalake::DeltaTableError;
22
use deltalake::datafusion::common::Column;
3-
use deltalake::datafusion::prelude::SessionContext;
3+
use deltalake::datafusion::prelude::{SessionContext, col};
44
use deltalake::operations::merge::MergeBuilder;
55
use deltalake::{DeltaResult, DeltaTable, datafusion::prelude::Expr};
66
use etl::types::{TableRow as PgTableRow, TableSchema as PgTableSchema};
@@ -19,7 +19,7 @@ pub async fn merge_to_table(
1919
let rows = TableRowEncoder::encode_table_rows(table_schema, upsert_rows)?;
2020

2121
let ctx = SessionContext::new();
22-
let batch = ctx.read_batch(rows)?;
22+
let batch = ctx.read_batch(rows.clone())?;
2323

2424
// TODO(abhi): We should proabbly be passing this information in
2525
let primary_keys = table_schema
@@ -40,12 +40,27 @@ pub async fn merge_to_table(
4040
batch,
4141
);
4242

43+
// TODO(abhi): Clean up this mess
44+
let all_columns: Vec<String> = table_schema
45+
.column_schemas
46+
.iter()
47+
.map(|col| col.name.clone())
48+
.collect();
49+
4350
let mut merge_builder = merge_builder
4451
.with_writer_properties(config.clone().into())
4552
.with_source_alias("source")
4653
.with_target_alias("target")
47-
.when_not_matched_insert(|insert| insert)?
48-
.when_matched_update(|update| update)?;
54+
.when_not_matched_insert(|insert| {
55+
all_columns.iter().fold(insert, |insert, column| {
56+
insert.set(column.clone(), col(format!("source.{}", column.clone())))
57+
})
58+
})?
59+
.when_matched_update(|update| {
60+
all_columns.iter().fold(update, |update, column| {
61+
update.update(column.clone(), col(format!("source.{}", column.clone())))
62+
})
63+
})?;
4964

5065
if let Some(delete_predicate) = delete_predicate {
5166
merge_builder = merge_builder

0 commit comments

Comments
 (0)