Skip to content

Commit 012b568

Browse files
Continue refactor
Signed-off-by: Abhi Agarwal <[email protected]>
1 parent 13a74ab commit 012b568

File tree

6 files changed

+73
-237
lines changed

6 files changed

+73
-237
lines changed

etl-destinations/src/arrow/encoding.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,14 @@ use arrow::{
77
TimestampMicrosecondBuilder,
88
},
99
datatypes::{
10-
DataType, Date32Type, Field, FieldRef, Float32Type, Float64Type, Int16Type, Int32Type,
10+
DataType, Date32Type, FieldRef, Float32Type, Float64Type, Int16Type, Int32Type,
1111
Int64Type, Schema, Time64MicrosecondType, TimeUnit, TimestampMicrosecondType, UInt32Type,
1212
},
1313
error::ArrowError,
1414
};
1515
use chrono::{NaiveDate, NaiveTime};
1616
use etl::types::{
1717
ArrayCell, Cell, DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, TableRow,
18-
TableSchema as PgTableSchema, Type as PgType,
1918
};
2019

2120
pub const UNIX_EPOCH: NaiveDate =
@@ -27,7 +26,7 @@ const UUID_BYTE_WIDTH: i32 = 16;
2726

2827
/// Extract numeric precision from Postgres atttypmod
2928
/// Based on: https://stackoverflow.com/questions/72725508/how-to-calculate-numeric-precision-and-other-vals-from-atttypmod
30-
fn extract_numeric_precision(atttypmod: i32) -> u8 {
29+
pub fn extract_numeric_precision(atttypmod: i32) -> u8 {
3130
if atttypmod == -1 {
3231
// No limit specified, use maximum precision
3332
38
@@ -39,13 +38,13 @@ fn extract_numeric_precision(atttypmod: i32) -> u8 {
3938

4039
/// Extract numeric scale from Postgres atttypmod
4140
/// Based on: https://stackoverflow.com/questions/72725508/how-to-calculate-numeric-precision-and-other-vals-from-atttypmod
42-
fn extract_numeric_scale(atttypmod: i32) -> i8 {
41+
pub fn extract_numeric_scale(atttypmod: i32) -> u8 {
4342
if atttypmod == -1 {
4443
// No limit specified, use reasonable default scale
4544
18
4645
} else {
4746
let scale = (atttypmod - 4) & 65535;
48-
std::cmp::min(scale as i8, 38) // Cap at reasonable scale
47+
std::cmp::min(scale as u8, 38) // Cap at reasonable scale
4948
}
5049
}
5150

etl-destinations/src/deltalake/core.rs

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use std::sync::Arc;
1515
use tokio::sync::Mutex;
1616
use tracing::{info, trace};
1717

18-
use crate::deltalake::TableRowEncoder;
18+
use crate::arrow::rows_to_record_batch;
1919
use crate::deltalake::config::DeltaTableConfig;
2020
use crate::deltalake::events::{materialize_events, materialize_events_append_only};
2121
use crate::deltalake::maintenance::TableMaintenanceState;
@@ -355,32 +355,21 @@ where
355355

356356
let table = self.table_handle(table_id).await?;
357357

358-
let table_schema = self
359-
.store
360-
.get_table_schema(table_id)
361-
.await?
362-
.ok_or_else(|| {
363-
etl_error!(
364-
ErrorKind::MissingTableSchema,
365-
"Table schema not found",
366-
format!("Schema for table {} not found in store", table_id.0)
367-
)
368-
})?;
369-
370358
let row_length = table_rows.len();
371359
trace!("Writing {} rows to Delta table", row_length);
360+
361+
let config = self.config_for_table_name(&table_schema.name.name);
362+
let mut table_guard = table.lock().await;
363+
let schema = table_guard.snapshot().schema();
372364

373365
let record_batch =
374-
TableRowEncoder::encode_table_rows(&table_schema, table_rows).map_err(|e| {
366+
rows_to_record_batch(table_rows.iter(), table_schema.clone()).map_err(|e| {
375367
etl_error!(
376368
ErrorKind::ConversionError,
377369
"Failed to encode table rows",
378370
format!("Error converting to Arrow: {}", e)
379371
)
380372
})?;
381-
382-
let config = self.config_for_table_name(&table_schema.name.name);
383-
let mut table_guard = table.lock().await;
384373
append_to_table(&mut table_guard, config.as_ref(), record_batch)
385374
.await
386375
.map_err(|e| {

etl-destinations/src/deltalake/expr.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
// Utilities related to constructing DataFusion expressions
22

3-
use crate::deltalake::schema::TableRowEncoder;
4-
use crate::deltalake::schema::cell_to_scalar_value_for_arrow;
53
use deltalake::datafusion::common::Column;
64
use deltalake::datafusion::prelude::{Expr, lit};
75
use etl::error::EtlResult;

etl-destinations/src/deltalake/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,3 @@ pub(crate) mod util;
99

1010
pub use config::DeltaTableConfig;
1111
pub use core::{DeltaDestinationConfig, DeltaLakeDestination};
12-
pub use schema::TableRowEncoder;

etl-destinations/src/deltalake/operations/merge.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use deltalake::operations::merge::MergeBuilder;
55
use deltalake::{DeltaResult, DeltaTable, datafusion::prelude::Expr};
66
use etl::types::{TableRow as PgTableRow, TableSchema as PgTableSchema};
77

8-
use crate::deltalake::TableRowEncoder;
98
use crate::deltalake::config::DeltaTableConfig;
109
use crate::deltalake::expr::qualify_primary_keys;
1110

0 commit comments

Comments
 (0)