Skip to content

Commit f98a27e

Browse files
more wip
1 parent d48763c commit f98a27e

File tree

7 files changed

+771
-294
lines changed

7 files changed

+771
-294
lines changed

etl-destinations/src/deltalake/core.rs

Lines changed: 88 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use etl::destination::Destination;
66
use etl::error::{ErrorKind, EtlResult};
77
use etl::store::schema::SchemaStore;
88
use etl::store::state::StateStore;
9-
use etl::types::{Event, TableId, TableRow, TableSchema};
9+
use etl::types::{Event, TableId, TableRow as PgTableRow, TableSchema as PgTableSchema};
1010
use etl::{bail, etl_error};
1111
use futures::future::try_join_all;
1212
use std::collections::HashMap;
@@ -15,7 +15,8 @@ use tokio::sync::Mutex;
1515
use tracing::{info, trace};
1616

1717
use crate::deltalake::TableRowEncoder;
18-
use crate::deltalake::operations::append_to_table;
18+
use crate::deltalake::events::{materialize_events, materialize_events_append_only};
19+
use crate::deltalake::operations::{append_to_table, merge_to_table};
1920
use crate::deltalake::schema::postgres_to_delta_schema;
2021
use crate::deltalake::table::DeltaTableConfig;
2122

@@ -214,25 +215,30 @@ where
214215
.config_for_table_name(&table_schema.name.name)
215216
.append_only;
216217

217-
let (delete_predicates, upsert_rows) =
218-
crate::deltalake::events::materialize_events(&events, &table_schema, is_append_only)?;
218+
if is_append_only {
219+
let rows = materialize_events_append_only(&events, &table_schema)?;
220+
self.write_table_rows_internal(&table_id, rows).await?;
221+
} else {
222+
let (delete_predicates, rows) = materialize_events(&events, &table_schema)?;
223+
self.execute_delete_append_transaction_expr(
224+
table_id,
225+
&table_schema,
226+
rows,
227+
delete_predicates,
228+
)
229+
.await?;
230+
}
219231

220-
self.execute_delete_append_transaction_expr(
221-
table_id,
222-
&table_schema,
223-
delete_predicates,
224-
upsert_rows,
225-
)
226-
.await
232+
Ok(())
227233
}
228234

229235
/// Execute delete+append transaction for CDC using DataFusion expressions for keys
230236
async fn execute_delete_append_transaction_expr(
231237
&self,
232238
table_id: TableId,
233-
table_schema: &TableSchema,
239+
table_schema: &PgTableSchema,
240+
upsert_rows: Vec<&PgTableRow>,
234241
delete_predicates: Vec<Expr>,
235-
upsert_rows: Vec<&TableRow>,
236242
) -> EtlResult<()> {
237243
let table = match self.table_cache.entry(table_id) {
238244
Occupied(entry) => entry.into_ref(),
@@ -242,33 +248,16 @@ where
242248
}
243249
};
244250

245-
if !delete_predicates.is_empty() {
246-
let combined_predicate = delete_predicates
247-
.into_iter()
248-
.reduce(|acc, e| acc.or(e))
249-
.expect("non-empty predicates");
250-
251-
trace!(
252-
"Deleting rows from table {} with predicate (Expr)",
253-
table_id.0
254-
);
255-
256-
let table = table.lock().await;
257-
let ops = DeltaOps::from(table.clone());
258-
ops.delete()
259-
.with_predicate(combined_predicate)
260-
.await
261-
.map_err(|e| {
262-
etl_error!(
263-
ErrorKind::DestinationError,
264-
"Failed to delete rows from Delta table",
265-
format!(
266-
"Error deleting from table for table_id {}: {}",
267-
table_id.0, e
268-
)
269-
)
270-
})?;
271-
}
251+
let combined_predicate = if !delete_predicates.is_empty() {
252+
Some(
253+
delete_predicates
254+
.into_iter()
255+
.reduce(|acc, e| acc.or(e))
256+
.expect("non-empty predicates"),
257+
)
258+
} else {
259+
None
260+
};
272261

273262
if !upsert_rows.is_empty() {
274263
trace!(
@@ -277,29 +266,28 @@ where
277266
table_id.0
278267
);
279268

280-
let record_batch = TableRowEncoder::encode_table_rows(table_schema, upsert_rows)
281-
.map_err(|e| {
282-
etl_error!(
283-
ErrorKind::ConversionError,
284-
"Failed to encode table rows for append",
285-
format!("Error converting to Arrow: {}", e)
286-
)
287-
})?;
288-
289269
let config = self.config_for_table_name(&table_schema.name.name);
290270
let mut table = table.lock().await;
291-
append_to_table(&mut table, &config, record_batch)
292-
.await
293-
.map_err(|e| {
294-
etl_error!(
295-
ErrorKind::DestinationError,
296-
"Failed to append rows to Delta table",
297-
format!(
298-
"Error appending to table for table_id {}: {}",
299-
table_id.0, e
300-
)
301-
)
302-
})?;
271+
todo!();
272+
// merge_to_table(
273+
// table,
274+
// &config,
275+
// table_schema,
276+
// primary_keys,
277+
// upsert_rows,
278+
// combined_predicate,
279+
// )
280+
// .await
281+
// .map_err(|e| {
282+
// etl_error!(
283+
// ErrorKind::DestinationError,
284+
// "Failed to append rows to Delta table",
285+
// format!(
286+
// "Error appending to table for table_id {}: {}",
287+
// table_id.0, e
288+
// )
289+
// )
290+
// })?;
303291
}
304292

305293
Ok(())
@@ -313,41 +301,28 @@ where
313301

314302
Ok(())
315303
}
316-
}
317-
318-
impl<S> Destination for DeltaLakeDestination<S>
319-
where
320-
S: StateStore + SchemaStore + Send + Sync,
321-
{
322-
fn name() -> &'static str {
323-
"deltalake"
324-
}
325304

326-
async fn truncate_table(&self, _table_id: TableId) -> EtlResult<()> {
327-
todo!()
328-
}
329-
330-
async fn write_table_rows(
305+
async fn write_table_rows_internal(
331306
&self,
332-
table_id: TableId,
333-
table_rows: Vec<TableRow>,
307+
table_id: &TableId,
308+
table_rows: Vec<&PgTableRow>,
334309
) -> EtlResult<()> {
335310
if table_rows.is_empty() {
336311
return Ok(());
337312
}
338313

339-
let table = match self.table_cache.entry(table_id) {
314+
let table = match self.table_cache.entry(*table_id) {
340315
Occupied(entry) => entry.into_ref(),
341316
Vacant(entry) => {
342-
let table = self.get_or_create_table(&table_id).await?;
317+
let table = self.get_or_create_table(table_id).await?;
343318
entry.insert(Arc::new(Mutex::new(table)))
344319
}
345320
}
346321
.downgrade();
347322

348323
let table_schema = self
349324
.store
350-
.get_table_schema(&table_id)
325+
.get_table_schema(table_id)
351326
.await?
352327
.ok_or_else(|| {
353328
etl_error!(
@@ -357,19 +332,17 @@ where
357332
)
358333
})?;
359334

360-
{}
335+
let row_length = table_rows.len();
336+
trace!("Writing {} rows to Delta table", row_length);
361337

362338
let record_batch =
363-
TableRowEncoder::encode_table_rows(&table_schema, table_rows.iter().collect())
364-
.map_err(|e| {
365-
etl_error!(
366-
ErrorKind::ConversionError,
367-
"Failed to encode table rows",
368-
format!("Error converting to Arrow: {}", e)
369-
)
370-
})?;
371-
372-
trace!("Writing {} rows to Delta table", table_rows.len(),);
339+
TableRowEncoder::encode_table_rows(&table_schema, table_rows).map_err(|e| {
340+
etl_error!(
341+
ErrorKind::ConversionError,
342+
"Failed to encode table rows",
343+
format!("Error converting to Arrow: {}", e)
344+
)
345+
})?;
373346

374347
let config = self.config_for_table_name(&table_schema.name.name);
375348
let mut table = table.lock().await;
@@ -379,18 +352,39 @@ where
379352
etl_error!(
380353
ErrorKind::DestinationError,
381354
"Failed to write to Delta table",
382-
format!("Error writing to table for table_id {}: {}", table_id.0, e)
355+
format!("Error writing to table for table_id {}: {}", table_id, e)
383356
)
384357
})?;
385358

386359
info!(
387360
"Successfully wrote {} rows to Delta table for table_id: {}",
388-
table_rows.len(),
389-
table_id.0
361+
row_length, table_id.0
390362
);
391363

392364
Ok(())
393365
}
366+
}
367+
368+
impl<S> Destination for DeltaLakeDestination<S>
369+
where
370+
S: StateStore + SchemaStore + Send + Sync,
371+
{
372+
fn name() -> &'static str {
373+
"deltalake"
374+
}
375+
376+
async fn truncate_table(&self, _table_id: TableId) -> EtlResult<()> {
377+
todo!()
378+
}
379+
380+
async fn write_table_rows(
381+
&self,
382+
table_id: TableId,
383+
table_rows: Vec<PgTableRow>,
384+
) -> EtlResult<()> {
385+
self.write_table_rows_internal(&table_id, table_rows.iter().collect())
386+
.await
387+
}
394388

395389
async fn write_events(&self, events: Vec<Event>) -> EtlResult<()> {
396390
// todo(abhi): Implement CDC event processing as described in PLAN.md

0 commit comments

Comments
 (0)