Skip to content

Commit 13a74ab

Browse files
wip refactor arrow schema
Signed-off-by: Abhi Agarwal <[email protected]>
1 parent 4a0829b commit 13a74ab

File tree

5 files changed

+340
-1126
lines changed

5 files changed

+340
-1126
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ actix-web = { version = "4.11.0", default-features = false }
3232
actix-web-httpauth = { version = "0.8.2", default-features = false }
3333
actix-web-metrics = { version = "0.3.0", default-features = false }
3434
anyhow = { version = "1.0.98", default-features = false }
35-
arrow = { version = "55.0", default-features = false }
35+
arrow = { version = "56.2.0", default-features = false }
3636
async-trait = { version = "0.1.88" }
3737
aws-lc-rs = { version = "1.13.3", default-features = false }
3838
base64 = { version = "0.22.1", default-features = false }
@@ -55,7 +55,7 @@ k8s-openapi = { version = "0.25.0", default-features = false }
5555
kube = { version = "1.1.0", default-features = false }
5656
metrics = { version = "0.24.2", default-features = false }
5757
metrics-exporter-prometheus = { version = "0.17.2", default-features = false }
58-
parquet = { version = "55.0", default-features = false }
58+
parquet = { version = "56.2.0", default-features = false }
5959
pg_escape = { version = "0.1.1", default-features = false }
6060
pin-project-lite = { version = "0.2.16", default-features = false }
6161
postgres-replication = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, rev = "c4b473b478b3adfbf8667d2fbe895d8423f1290b" }

etl-destinations/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ deltalake = [
2929
"dep:futures",
3030
"dep:tokio",
3131
"dep:tracing",
32+
"arrow",
3233
]
3334

3435
[dependencies]

etl-destinations/src/arrow/encoding.rs

Lines changed: 163 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,21 @@ use std::sync::Arc;
22

33
use arrow::{
44
array::{
5-
ArrayRef, ArrowPrimitiveType, BooleanBuilder, FixedSizeBinaryBuilder, LargeBinaryBuilder,
6-
ListBuilder, PrimitiveBuilder, RecordBatch, StringBuilder, TimestampMicrosecondBuilder,
5+
ArrayRef, ArrowPrimitiveType, BooleanBuilder, Decimal128Array, FixedSizeBinaryBuilder,
6+
LargeBinaryBuilder, ListBuilder, PrimitiveBuilder, RecordBatch, StringBuilder,
7+
TimestampMicrosecondBuilder,
78
},
89
datatypes::{
9-
DataType, Date32Type, FieldRef, Float32Type, Float64Type, Int32Type, Int64Type, Schema,
10-
Time64MicrosecondType, TimeUnit, TimestampMicrosecondType,
10+
DataType, Date32Type, Field, FieldRef, Float32Type, Float64Type, Int16Type, Int32Type,
11+
Int64Type, Schema, Time64MicrosecondType, TimeUnit, TimestampMicrosecondType, UInt32Type,
1112
},
1213
error::ArrowError,
1314
};
1415
use chrono::{NaiveDate, NaiveTime};
15-
use etl::types::{ArrayCell, Cell, DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, TableRow};
16+
use etl::types::{
17+
ArrayCell, Cell, DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, TableRow,
18+
TableSchema as PgTableSchema, Type as PgType,
19+
};
1620

1721
pub const UNIX_EPOCH: NaiveDate =
1822
NaiveDate::from_ymd_opt(1970, 1, 1).expect("unix epoch is a valid date");
@@ -21,6 +25,30 @@ const MIDNIGHT: NaiveTime = NaiveTime::from_hms_opt(0, 0, 0).expect("midnight is
2125

2226
const UUID_BYTE_WIDTH: i32 = 16;
2327

28+
/// Extract numeric precision from Postgres atttypmod
29+
/// Based on: https://stackoverflow.com/questions/72725508/how-to-calculate-numeric-precision-and-other-vals-from-atttypmod
30+
fn extract_numeric_precision(atttypmod: i32) -> u8 {
31+
if atttypmod == -1 {
32+
// No limit specified, use maximum precision
33+
38
34+
} else {
35+
let precision = ((atttypmod - 4) >> 16) & 65535;
36+
std::cmp::min(precision as u8, 38) // Cap at Arrow's max precision
37+
}
38+
}
39+
40+
/// Extract numeric scale from Postgres atttypmod
41+
/// Based on: https://stackoverflow.com/questions/72725508/how-to-calculate-numeric-precision-and-other-vals-from-atttypmod
42+
fn extract_numeric_scale(atttypmod: i32) -> i8 {
43+
if atttypmod == -1 {
44+
// No limit specified, use reasonable default scale
45+
18
46+
} else {
47+
let scale = (atttypmod - 4) & 65535;
48+
std::cmp::min(scale as i8, 38) // Cap at reasonable scale
49+
}
50+
}
51+
2452
/// Converts a slice of [`TableRow`]s to an Arrow [`RecordBatch`].
2553
///
2654
/// This function transforms tabular data from the ETL pipeline's internal format
@@ -56,22 +84,31 @@ pub fn rows_to_record_batch(rows: &[TableRow], schema: Schema) -> Result<RecordB
5684
fn build_array_for_field(rows: &[TableRow], field_idx: usize, data_type: &DataType) -> ArrayRef {
5785
match data_type {
5886
DataType::Boolean => build_boolean_array(rows, field_idx),
87+
DataType::Int16 => build_primitive_array::<Int16Type, _>(rows, field_idx, cell_to_i16),
5988
DataType::Int32 => build_primitive_array::<Int32Type, _>(rows, field_idx, cell_to_i32),
6089
DataType::Int64 => build_primitive_array::<Int64Type, _>(rows, field_idx, cell_to_i64),
90+
DataType::UInt32 => build_primitive_array::<UInt32Type, _>(rows, field_idx, cell_to_u32),
6191
DataType::Float32 => build_primitive_array::<Float32Type, _>(rows, field_idx, cell_to_f32),
6292
DataType::Float64 => build_primitive_array::<Float64Type, _>(rows, field_idx, cell_to_f64),
6393
DataType::Utf8 => build_string_array(rows, field_idx),
94+
DataType::Binary => build_binary_array(rows, field_idx),
6495
DataType::LargeBinary => build_binary_array(rows, field_idx),
6596
DataType::Date32 => build_primitive_array::<Date32Type, _>(rows, field_idx, cell_to_date32),
6697
DataType::Time64(TimeUnit::Microsecond) => {
6798
build_primitive_array::<Time64MicrosecondType, _>(rows, field_idx, cell_to_time64)
6899
}
100+
DataType::Time64(TimeUnit::Nanosecond) => {
101+
build_primitive_array::<Time64MicrosecondType, _>(rows, field_idx, cell_to_time64)
102+
}
69103
DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => {
70104
build_timestamptz_array(rows, field_idx, tz)
71105
}
72106
DataType::Timestamp(TimeUnit::Microsecond, None) => {
73107
build_primitive_array::<TimestampMicrosecondType, _>(rows, field_idx, cell_to_timestamp)
74108
}
109+
DataType::Decimal128(precision, scale) => {
110+
build_decimal128_array(rows, field_idx, *precision, *scale)
111+
}
75112
DataType::FixedSizeBinary(UUID_BYTE_WIDTH) => build_uuid_array(rows, field_idx),
76113
DataType::List(field) => build_list_array(rows, field_idx, field.clone()),
77114
_ => build_string_array(rows, field_idx),
@@ -123,6 +160,22 @@ impl_array_builder!(build_boolean_array, BooleanBuilder, cell_to_bool);
123160
impl_array_builder!(build_string_array, StringBuilder, cell_to_string);
124161
impl_array_builder!(build_binary_array, LargeBinaryBuilder, cell_to_bytes);
125162

163+
/// Builds a decimal128 array from [`TableRow`]s for a specific field.
164+
fn build_decimal128_array(
165+
rows: &[TableRow],
166+
field_idx: usize,
167+
precision: u8,
168+
scale: i8,
169+
) -> ArrayRef {
170+
let values: Vec<Option<i128>> = rows
171+
.iter()
172+
.map(|row| cell_to_decimal128(&row.values[field_idx], precision, scale))
173+
.collect();
174+
175+
let decimal_type = DataType::Decimal128(precision, scale);
176+
Arc::new(Decimal128Array::from(values).with_data_type(decimal_type))
177+
}
178+
126179
/// Builds a timezone-aware timestamp array from [`TableRow`]s.
127180
///
128181
/// This function creates an Arrow timestamp array with microsecond precision
@@ -213,6 +266,22 @@ fn cell_to_i64(cell: &Cell) -> Option<i64> {
213266
}
214267
}
215268

269+
/// Converts a [`Cell`] to a 16-bit signed integer.
270+
fn cell_to_i16(cell: &Cell) -> Option<i16> {
271+
match cell {
272+
Cell::I16(v) => Some(*v),
273+
_ => None,
274+
}
275+
}
276+
277+
/// Converts a [`Cell`] to a 32-bit unsigned integer.
278+
fn cell_to_u32(cell: &Cell) -> Option<u32> {
279+
match cell {
280+
Cell::U32(v) => Some(*v),
281+
_ => None,
282+
}
283+
}
284+
216285
/// Converts a [`Cell`] to a 32-bit floating-point number.
217286
///
218287
/// Extracts 32-bit float values from [`Cell::F32`] variants, returning
@@ -235,6 +304,23 @@ fn cell_to_f64(cell: &Cell) -> Option<f64> {
235304
}
236305
}
237306

307+
/// Converts a [`Cell`] to a decimal128 value.
308+
fn cell_to_decimal128(cell: &Cell, _precision: u8, scale: i8) -> Option<i128> {
309+
match cell {
310+
Cell::Numeric(n) => {
311+
// This is a simplified conversion - ideally we'd preserve the exact decimal representation
312+
if let Ok(string_val) = n.to_string().parse::<f64>() {
313+
// Scale up by the scale factor and convert to i128
314+
let scaled = (string_val * 10_f64.powi(scale as i32)) as i128;
315+
Some(scaled)
316+
} else {
317+
None
318+
}
319+
}
320+
_ => None,
321+
}
322+
}
323+
238324
/// Converts a [`Cell`] to a byte vector.
239325
///
240326
/// Extracts binary data from [`Cell::Bytes`] variants by cloning the
@@ -375,20 +461,27 @@ fn cell_to_array_cell(cell: &Cell) -> Option<&ArrayCell> {
375461
fn build_list_array(rows: &[TableRow], field_idx: usize, field: FieldRef) -> ArrayRef {
376462
match field.data_type() {
377463
DataType::Boolean => build_boolean_list_array(rows, field_idx, field),
464+
DataType::Int16 => build_int16_list_array(rows, field_idx, field),
378465
DataType::Int32 => build_int32_list_array(rows, field_idx, field),
379466
DataType::Int64 => build_int64_list_array(rows, field_idx, field),
467+
DataType::UInt32 => build_uint32_list_array(rows, field_idx, field),
380468
DataType::Float32 => build_float32_list_array(rows, field_idx, field),
381469
DataType::Float64 => build_float64_list_array(rows, field_idx, field),
382470
DataType::Utf8 => build_string_list_array(rows, field_idx, field),
471+
DataType::Binary => build_binary_list_array(rows, field_idx, field),
383472
DataType::LargeBinary => build_binary_list_array(rows, field_idx, field),
384473
DataType::Date32 => build_date32_list_array(rows, field_idx, field),
385474
DataType::Time64(TimeUnit::Microsecond) => build_time64_list_array(rows, field_idx, field),
475+
DataType::Time64(TimeUnit::Nanosecond) => build_time64_list_array(rows, field_idx, field),
386476
DataType::Timestamp(TimeUnit::Microsecond, None) => {
387477
build_timestamp_list_array(rows, field_idx, field)
388478
}
389479
DataType::Timestamp(TimeUnit::Microsecond, Some(_)) => {
390480
build_timestamptz_list_array(rows, field_idx, field)
391481
}
482+
DataType::Decimal128(precision, scale) => {
483+
build_decimal128_list_array(rows, field_idx, field.clone(), *precision, *scale)
484+
}
392485
DataType::FixedSizeBinary(UUID_BYTE_WIDTH) => build_uuid_list_array(rows, field_idx, field),
393486
// For unsupported element types, fall back to string representation
394487
_ => build_list_array_for_strings(rows, field_idx, field),
@@ -421,6 +514,32 @@ fn build_boolean_list_array(rows: &[TableRow], field_idx: usize, field: FieldRef
421514
Arc::new(list_builder.finish())
422515
}
423516

517+
/// Builds a list array for 16-bit integer elements.
518+
fn build_int16_list_array(rows: &[TableRow], field_idx: usize, field: FieldRef) -> ArrayRef {
519+
let mut list_builder =
520+
ListBuilder::new(PrimitiveBuilder::<Int16Type>::new()).with_field(field.clone());
521+
522+
for row in rows {
523+
if let Some(array_cell) = cell_to_array_cell(&row.values[field_idx]) {
524+
match array_cell {
525+
ArrayCell::I16(vec) => {
526+
for item in vec {
527+
list_builder.values().append_option(*item);
528+
}
529+
list_builder.append(true);
530+
}
531+
_ => {
532+
return build_list_array_for_strings(rows, field_idx, field);
533+
}
534+
}
535+
} else {
536+
list_builder.append_null();
537+
}
538+
}
539+
540+
Arc::new(list_builder.finish())
541+
}
542+
424543
/// Builds a list array for 32-bit integer elements.
425544
fn build_int32_list_array(rows: &[TableRow], field_idx: usize, field: FieldRef) -> ArrayRef {
426545
let mut list_builder =
@@ -485,6 +604,32 @@ fn build_int64_list_array(rows: &[TableRow], field_idx: usize, field: FieldRef)
485604
Arc::new(list_builder.finish())
486605
}
487606

607+
/// Builds a list array for 32-bit unsigned integer elements.
608+
fn build_uint32_list_array(rows: &[TableRow], field_idx: usize, field: FieldRef) -> ArrayRef {
609+
let mut list_builder =
610+
ListBuilder::new(PrimitiveBuilder::<UInt32Type>::new()).with_field(field.clone());
611+
612+
for row in rows {
613+
if let Some(array_cell) = cell_to_array_cell(&row.values[field_idx]) {
614+
match array_cell {
615+
ArrayCell::U32(vec) => {
616+
for item in vec {
617+
list_builder.values().append_option(*item);
618+
}
619+
list_builder.append(true);
620+
}
621+
_ => {
622+
return build_list_array_for_strings(rows, field_idx, field);
623+
}
624+
}
625+
} else {
626+
list_builder.append_null();
627+
}
628+
}
629+
630+
Arc::new(list_builder.finish())
631+
}
632+
488633
/// Builds a list array for 32-bit float elements.
489634
fn build_float32_list_array(rows: &[TableRow], field_idx: usize, field: FieldRef) -> ArrayRef {
490635
let mut list_builder =
@@ -763,6 +908,19 @@ fn build_uuid_list_array(rows: &[TableRow], field_idx: usize, field: FieldRef) -
763908
Arc::new(list_builder.finish())
764909
}
765910

911+
/// Builds a list array for Decimal128 elements.
912+
fn build_decimal128_list_array(
913+
rows: &[TableRow],
914+
field_idx: usize,
915+
field: FieldRef,
916+
_precision: u8,
917+
_scale: i8,
918+
) -> ArrayRef {
919+
// For now, fall back to string representation for decimal arrays
920+
// This is a simplified implementation that avoids complex Arrow data type manipulation
921+
build_list_array_for_strings(rows, field_idx, field)
922+
}
923+
766924
/// Builds a list array for string elements.
767925
///
768926
/// This function creates an Arrow list array with string elements by processing

etl-destinations/src/deltalake/expr.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Utilities related to constructing DataFusion expressions
22

33
use crate::deltalake::schema::TableRowEncoder;
4+
use crate::deltalake::schema::cell_to_scalar_value_for_arrow;
45
use deltalake::datafusion::common::Column;
56
use deltalake::datafusion::prelude::{Expr, lit};
67
use etl::error::EtlResult;
@@ -16,7 +17,7 @@ pub fn cell_to_scalar_expr(
1617
&schema.column_schemas[col_idx].typ,
1718
schema.column_schemas[col_idx].modifier,
1819
);
19-
let sv = TableRowEncoder::cell_to_scalar_value_for_arrow(cell, &arrow_type)?;
20+
let sv = cell_to_scalar_value_for_arrow(cell, &arrow_type)?;
2021
Ok(lit(sv))
2122
}
2223

0 commit comments

Comments
 (0)