Skip to content

Commit 41ecb8f

Browse files
Mostly finish the refactor
Signed-off-by: Abhi Agarwal <[email protected]>
1 parent 012b568 commit 41ecb8f

File tree

14 files changed

+543
-330
lines changed

14 files changed

+543
-330
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ k8s-openapi = { version = "0.25.0", default-features = false }
5555
kube = { version = "1.1.0", default-features = false }
5656
metrics = { version = "0.24.2", default-features = false }
5757
metrics-exporter-prometheus = { version = "0.17.2", default-features = false }
58-
parquet = { version = "56.2.0", default-features = false }
58+
parquet = { version = "55.0.0", default-features = false }
5959
pg_escape = { version = "0.1.1", default-features = false }
6060
pin-project-lite = { version = "0.2.16", default-features = false }
6161
postgres-replication = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, rev = "c4b473b478b3adfbf8667d2fbe895d8423f1290b" }

etl-destinations/Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ repository.workspace = true
88
homepage.workspace = true
99

1010
[features]
11-
arrow = ["dep:arrow"]
1211
bigquery = [
1312
"dep:gcp-bigquery-client",
1413
"dep:prost",
@@ -21,15 +20,15 @@ iceberg = [
2120
"dep:iceberg-catalog-rest",
2221
"dep:parquet",
2322
"dep:uuid",
24-
"arrow",
23+
"dep:arrow",
2524
]
2625
deltalake = [
2726
"dep:dashmap",
2827
"dep:deltalake",
2928
"dep:futures",
3029
"dep:tokio",
3130
"dep:tracing",
32-
"arrow",
31+
"dep:arrow",
3332
]
3433

3534
[dependencies]

etl-destinations/src/arrow/encoding.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,13 @@ use arrow::{
77
TimestampMicrosecondBuilder,
88
},
99
datatypes::{
10-
DataType, Date32Type, FieldRef, Float32Type, Float64Type, Int16Type, Int32Type,
11-
Int64Type, Schema, Time64MicrosecondType, TimeUnit, TimestampMicrosecondType, UInt32Type,
10+
DataType, Date32Type, FieldRef, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
11+
Schema, Time64MicrosecondType, TimeUnit, TimestampMicrosecondType, UInt32Type,
1212
},
1313
error::ArrowError,
1414
};
1515
use chrono::{NaiveDate, NaiveTime};
16-
use etl::types::{
17-
ArrayCell, Cell, DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, TableRow,
18-
};
16+
use etl::types::{ArrayCell, Cell, DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, TableRow};
1917

2018
pub const UNIX_EPOCH: NaiveDate =
2119
NaiveDate::from_ymd_opt(1970, 1, 1).expect("unix epoch is a valid date");

etl-destinations/src/deltalake/core.rs

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::deltalake::config::DeltaTableConfig;
2020
use crate::deltalake::events::{materialize_events, materialize_events_append_only};
2121
use crate::deltalake::maintenance::TableMaintenanceState;
2222
use crate::deltalake::operations::{append_to_table, delete_from_table, merge_to_table};
23-
use crate::deltalake::schema::postgres_to_delta_schema;
23+
use crate::deltalake::schema::{postgres_to_arrow_schema, postgres_to_delta_schema};
2424

2525
/// Configuration for Delta Lake destination
2626
#[derive(Debug, Clone)]
@@ -111,10 +111,12 @@ where
111111
})?;
112112

113113
let table_name = &table_schema.name.name;
114-
let table_path = parse_table_uri(format!("{}/{}", self.config.base_uri, table_name))
115-
.map_err(|e| {
116-
etl_error!(ErrorKind::DestinationError, "Failed to parse table path", e)
117-
})?;
114+
let pg_table_schema = &table_schema.name.schema;
115+
let table_path = parse_table_uri(format!(
116+
"{}/{}/{}",
117+
self.config.base_uri, pg_table_schema, table_name
118+
))
119+
.map_err(|e| etl_error!(ErrorKind::DestinationError, "Failed to parse table path", e))?;
118120

119121
let mut table_builder = DeltaTableBuilder::from_uri(table_path).map_err(|e| {
120122
etl_error!(
@@ -245,14 +247,16 @@ where
245247
.append_only;
246248

247249
if is_append_only {
248-
let rows = materialize_events_append_only(&events, &table_schema)?;
249-
self.write_table_rows_internal(&table_id, rows).await?;
250+
let row_refs = materialize_events_append_only(&events, &table_schema)?;
251+
let rows: Vec<PgTableRow> = row_refs.into_iter().cloned().collect();
252+
self.write_table_rows_internal(&table_id, &rows).await?;
250253
} else {
251-
let (delete_predicates, rows) = materialize_events(&events, &table_schema)?;
254+
let (delete_predicates, row_refs) = materialize_events(&events, &table_schema)?;
255+
let rows: Vec<PgTableRow> = row_refs.into_iter().cloned().collect();
252256
self.execute_delete_append_transaction_expr(
253257
table_id,
254258
&table_schema,
255-
rows,
259+
&rows,
256260
delete_predicates,
257261
)
258262
.await?;
@@ -266,7 +270,7 @@ where
266270
&self,
267271
table_id: TableId,
268272
table_schema: &PgTableSchema,
269-
upsert_rows: Vec<&PgTableRow>,
273+
upsert_rows: &[PgTableRow],
270274
delete_predicates: Vec<Expr>,
271275
) -> EtlResult<()> {
272276
let combined_predicate = delete_predicates.into_iter().reduce(|acc, e| acc.or(e));
@@ -347,29 +351,47 @@ where
347351
async fn write_table_rows_internal(
348352
&self,
349353
table_id: &TableId,
350-
table_rows: Vec<&PgTableRow>,
354+
table_rows: &[PgTableRow],
351355
) -> EtlResult<()> {
352356
if table_rows.is_empty() {
353357
return Ok(());
354358
}
355359

356360
let table = self.table_handle(table_id).await?;
357361

362+
let table_schema = self
363+
.store
364+
.get_table_schema(table_id)
365+
.await?
366+
.ok_or_else(|| {
367+
etl_error!(
368+
ErrorKind::MissingTableSchema,
369+
"Table schema not found",
370+
format!("Schema for table {} not found in store", table_id.0)
371+
)
372+
})?;
373+
358374
let row_length = table_rows.len();
359375
trace!("Writing {} rows to Delta table", row_length);
360-
361-
let config = self.config_for_table_name(&table_schema.name.name);
376+
362377
let mut table_guard = table.lock().await;
363-
let schema = table_guard.snapshot().schema();
378+
let arrow_schema = postgres_to_arrow_schema(&table_schema).map_err(|e| {
379+
etl_error!(
380+
ErrorKind::ConversionError,
381+
"Failed to convert table schema to Arrow schema",
382+
e
383+
)
384+
})?;
364385

365-
let record_batch =
366-
rows_to_record_batch(table_rows.iter(), table_schema.clone()).map_err(|e| {
367-
etl_error!(
368-
ErrorKind::ConversionError,
369-
"Failed to encode table rows",
370-
format!("Error converting to Arrow: {}", e)
371-
)
372-
})?;
386+
let config = self.config_for_table_name(&table_schema.name.name);
387+
388+
let record_batch = rows_to_record_batch(table_rows, arrow_schema.clone()).map_err(|e| {
389+
etl_error!(
390+
ErrorKind::ConversionError,
391+
"Failed to encode table rows",
392+
format!("Error converting to Arrow: {}", e)
393+
)
394+
})?;
373395
append_to_table(&mut table_guard, config.as_ref(), record_batch)
374396
.await
375397
.map_err(|e| {
@@ -450,8 +472,7 @@ where
450472
table_id: TableId,
451473
table_rows: Vec<PgTableRow>,
452474
) -> EtlResult<()> {
453-
self.write_table_rows_internal(&table_id, table_rows.iter().collect())
454-
.await
475+
self.write_table_rows_internal(&table_id, &table_rows).await
455476
}
456477

457478
async fn write_events(&self, events: Vec<Event>) -> EtlResult<()> {

etl-destinations/src/deltalake/events.rs

Lines changed: 108 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ pub fn materialize_events_append_only<'a>(
2424
match event {
2525
Event::Insert(e) => {
2626
let marker = (e.commit_lsn, e.start_lsn);
27-
let pk_expr = build_pk_expr(table_schema, &e.table_row);
27+
let pk_expr = build_pk_expr(table_schema, &e.table_row)?;
2828
let entry = crdt_by_key.entry(pk_expr).or_insert_with(|| LWWReg {
2929
val: RowOp::Upsert(&e.table_row),
3030
marker,
@@ -68,7 +68,7 @@ pub fn materialize_events<'a>(
6868
match event {
6969
Event::Insert(e) => {
7070
let marker = (e.commit_lsn, e.start_lsn);
71-
let pk_expr = build_pk_expr(table_schema, &e.table_row);
71+
let pk_expr = build_pk_expr(table_schema, &e.table_row)?;
7272
let entry = crdt_by_key.entry(pk_expr).or_insert_with(|| LWWReg {
7373
val: RowOp::Upsert(&e.table_row),
7474
marker,
@@ -77,7 +77,7 @@ pub fn materialize_events<'a>(
7777
}
7878
Event::Update(e) => {
7979
let marker = (e.commit_lsn, e.start_lsn);
80-
let pk_expr = build_pk_expr(table_schema, &e.table_row);
80+
let pk_expr = build_pk_expr(table_schema, &e.table_row)?;
8181
let entry = crdt_by_key.entry(pk_expr).or_insert_with(|| LWWReg {
8282
val: RowOp::Upsert(&e.table_row),
8383
marker,
@@ -87,7 +87,7 @@ pub fn materialize_events<'a>(
8787
Event::Delete(e) => {
8888
if let Some((_, ref old_row)) = e.old_table_row {
8989
let marker = (e.commit_lsn, e.start_lsn);
90-
let pk_expr = build_pk_expr(table_schema, old_row);
90+
let pk_expr = build_pk_expr(table_schema, old_row)?;
9191
let entry = crdt_by_key.entry(pk_expr).or_insert_with(|| LWWReg {
9292
val: RowOp::Delete,
9393
marker,
@@ -127,6 +127,7 @@ mod tests {
127127
Cell as PgCell, ColumnSchema as PgColumnSchema, DeleteEvent, InsertEvent, TableId,
128128
TableName, Type as PgType, UpdateEvent,
129129
};
130+
use insta::assert_debug_snapshot;
130131

131132
fn schema_single_pk(table_id: TableId) -> PgTableSchema {
132133
PgTableSchema::new(
@@ -221,8 +222,20 @@ mod tests {
221222

222223
let (deletes, upserts) = materialize_events(&events, &schema).unwrap();
223224
assert!(deletes.is_empty());
224-
assert_eq!(upserts.len(), 1);
225-
assert_eq!(upserts[0].values[1], PgCell::String("b".to_string()));
225+
assert_debug_snapshot!(upserts, @r#"
226+
[
227+
TableRow {
228+
values: [
229+
I64(
230+
1,
231+
),
232+
String(
233+
"b",
234+
),
235+
],
236+
},
237+
]
238+
"#);
226239
}
227240

228241
#[test]
@@ -246,8 +259,26 @@ mod tests {
246259
let events = vec![ins, del];
247260

248261
let (deletes, upserts) = materialize_events(&events, &schema).unwrap();
262+
assert_debug_snapshot!(deletes, @r#"
263+
[
264+
BinaryExpr(
265+
BinaryExpr {
266+
left: Column(
267+
Column {
268+
relation: None,
269+
name: "id",
270+
},
271+
),
272+
op: Eq,
273+
right: Literal(
274+
Int64(1),
275+
None,
276+
),
277+
},
278+
),
279+
]
280+
"#);
249281
assert!(upserts.is_empty());
250-
assert_eq!(deletes.len(), 1);
251282
}
252283

253284
#[test]
@@ -271,8 +302,20 @@ mod tests {
271302
let events = vec![ins, upd];
272303

273304
let upserts = materialize_events_append_only(&events, &schema).unwrap();
274-
assert_eq!(upserts.len(), 1);
275-
assert_eq!(upserts[0].values[1], PgCell::String("a".to_string()));
305+
assert_debug_snapshot!(upserts, @r#"
306+
[
307+
TableRow {
308+
values: [
309+
I64(
310+
1,
311+
),
312+
String(
313+
"a",
314+
),
315+
],
316+
},
317+
]
318+
"#);
276319
}
277320

278321
#[test]
@@ -317,8 +360,61 @@ mod tests {
317360

318361
// We expect one delete predicate (for tenant_id=10 AND user_id=101)
319362
// and one upsert (tenant_id=10 AND user_id=100 with name=a2)
320-
assert_eq!(deletes.len(), 1);
321-
assert_eq!(upserts.len(), 1);
322-
assert_eq!(upserts[0].values[2], PgCell::String("a2".to_string()));
363+
assert_debug_snapshot!(deletes, @r#"
364+
[
365+
BinaryExpr(
366+
BinaryExpr {
367+
left: BinaryExpr(
368+
BinaryExpr {
369+
left: Column(
370+
Column {
371+
relation: None,
372+
name: "tenant_id",
373+
},
374+
),
375+
op: Eq,
376+
right: Literal(
377+
Int32(10),
378+
None,
379+
),
380+
},
381+
),
382+
op: And,
383+
right: BinaryExpr(
384+
BinaryExpr {
385+
left: Column(
386+
Column {
387+
relation: None,
388+
name: "user_id",
389+
},
390+
),
391+
op: Eq,
392+
right: Literal(
393+
Int64(101),
394+
None,
395+
),
396+
},
397+
),
398+
},
399+
),
400+
]
401+
"#);
402+
assert_debug_snapshot!(upserts, @r#"
403+
[
404+
TableRow {
405+
values: [
406+
I32(
407+
10,
408+
),
409+
I64(
410+
100,
411+
),
412+
String(
413+
"a2",
414+
),
415+
],
416+
},
417+
]
418+
"#);
323419
}
324420
}

0 commit comments

Comments
 (0)