Skip to content

Commit 48ed3e0

Browse files
Fix Decimal128 mapping
Signed-off-by: Abhi Agarwal <[email protected]>
1 parent 4aeecc4 commit 48ed3e0

File tree

5 files changed

+345
-130
lines changed

5 files changed

+345
-130
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,5 @@ pyvenv.cfg
3333

3434
# Log files
3535
*.log
36+
37+
lcov.info

etl-destinations/src/arrow/encoding.rs

Lines changed: 256 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ use std::sync::Arc;
22

33
use arrow::{
44
array::{
5-
ArrayRef, ArrowPrimitiveType, BooleanBuilder, Decimal128Array, FixedSizeBinaryBuilder,
6-
LargeBinaryBuilder, ListBuilder, PrimitiveBuilder, RecordBatch, StringBuilder,
7-
TimestampMicrosecondBuilder,
5+
ArrayRef, ArrowPrimitiveType, BinaryBuilder, BooleanBuilder, Decimal128Array,
6+
Decimal128Builder, FixedSizeBinaryBuilder, LargeBinaryBuilder, ListBuilder,
7+
PrimitiveBuilder, RecordBatch, StringBuilder, TimestampMicrosecondBuilder,
88
},
99
datatypes::{
1010
DataType, Date32Type, FieldRef, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
@@ -13,7 +13,10 @@ use arrow::{
1313
error::ArrowError,
1414
};
1515
use chrono::{NaiveDate, NaiveTime};
16-
use etl::types::{ArrayCell, Cell, DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, TableRow};
16+
use etl::{
17+
conversions::numeric::Sign,
18+
types::{ArrayCell, Cell, DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, TableRow},
19+
};
1720

1821
pub const UNIX_EPOCH: NaiveDate =
1922
NaiveDate::from_ymd_opt(1970, 1, 1).expect("unix epoch is a valid date");
@@ -81,15 +84,13 @@ pub fn rows_to_record_batch(rows: &[TableRow], schema: Schema) -> Result<RecordB
8184
fn build_array_for_field(rows: &[TableRow], field_idx: usize, data_type: &DataType) -> ArrayRef {
8285
match data_type {
8386
DataType::Boolean => build_boolean_array(rows, field_idx),
84-
DataType::Int16 => build_primitive_array::<Int16Type, _>(rows, field_idx, cell_to_i16),
8587
DataType::Int32 => build_primitive_array::<Int32Type, _>(rows, field_idx, cell_to_i32),
8688
DataType::Int64 => build_primitive_array::<Int64Type, _>(rows, field_idx, cell_to_i64),
87-
DataType::UInt32 => build_primitive_array::<UInt32Type, _>(rows, field_idx, cell_to_u32),
8889
DataType::Float32 => build_primitive_array::<Float32Type, _>(rows, field_idx, cell_to_f32),
8990
DataType::Float64 => build_primitive_array::<Float64Type, _>(rows, field_idx, cell_to_f64),
9091
DataType::Utf8 => build_string_array(rows, field_idx),
9192
DataType::Binary => build_binary_array(rows, field_idx),
92-
DataType::LargeBinary => build_binary_array(rows, field_idx),
93+
DataType::LargeBinary => build_large_binary_array(rows, field_idx),
9394
DataType::Date32 => build_primitive_array::<Date32Type, _>(rows, field_idx, cell_to_date32),
9495
DataType::Time64(TimeUnit::Microsecond) => {
9596
build_primitive_array::<Time64MicrosecondType, _>(rows, field_idx, cell_to_time64)
@@ -155,7 +156,8 @@ macro_rules! impl_array_builder {
155156

156157
impl_array_builder!(build_boolean_array, BooleanBuilder, cell_to_bool);
157158
impl_array_builder!(build_string_array, StringBuilder, cell_to_string);
158-
impl_array_builder!(build_binary_array, LargeBinaryBuilder, cell_to_bytes);
159+
impl_array_builder!(build_binary_array, BinaryBuilder, cell_to_bytes);
160+
impl_array_builder!(build_large_binary_array, LargeBinaryBuilder, cell_to_bytes);
159161

160162
/// Builds a decimal128 array from [`TableRow`]s for a specific field.
161163
fn build_decimal128_array(
@@ -263,22 +265,6 @@ fn cell_to_i64(cell: &Cell) -> Option<i64> {
263265
}
264266
}
265267

266-
/// Converts a [`Cell`] to a 16-bit signed integer.
267-
fn cell_to_i16(cell: &Cell) -> Option<i16> {
268-
match cell {
269-
Cell::I16(v) => Some(*v),
270-
_ => None,
271-
}
272-
}
273-
274-
/// Converts a [`Cell`] to a 32-bit unsigned integer.
275-
fn cell_to_u32(cell: &Cell) -> Option<u32> {
276-
match cell {
277-
Cell::U32(v) => Some(*v),
278-
_ => None,
279-
}
280-
}
281-
282268
/// Converts a [`Cell`] to a 32-bit floating-point number.
283269
///
284270
/// Extracts 32-bit float values from [`Cell::F32`] variants, returning
@@ -302,19 +288,97 @@ fn cell_to_f64(cell: &Cell) -> Option<f64> {
302288
}
303289

304290
/// Converts a [`Cell`] to a decimal128 value.
305-
fn cell_to_decimal128(cell: &Cell, _precision: u8, scale: i8) -> Option<i128> {
291+
fn cell_to_decimal128(cell: &Cell, precision: u8, scale: i8) -> Option<i128> {
306292
match cell {
307-
Cell::Numeric(n) => {
308-
// This is a simplified conversion - ideally we'd preserve the exact decimal representation
309-
if let Ok(string_val) = n.to_string().parse::<f64>() {
310-
// Scale up by the scale factor and convert to i128
311-
let scaled = (string_val * 10_f64.powi(scale as i32)) as i128;
312-
Some(scaled)
293+
Cell::Numeric(n) => pg_numeric_to_decimal_i128(n, precision as i32, scale as i32),
294+
_ => None,
295+
}
296+
}
297+
298+
/// Convert PgNumeric to a scaled i128 matching Decimal128(precision, scale) exactly using string math.
299+
fn pg_numeric_to_decimal_i128(
300+
n: &etl::types::PgNumeric,
301+
precision: i32,
302+
scale: i32,
303+
) -> Option<i128> {
304+
if precision <= 0 || scale < 0 || scale > precision {
305+
return None;
306+
}
307+
308+
match n {
309+
etl::types::PgNumeric::NaN
310+
| etl::types::PgNumeric::PositiveInfinity
311+
| etl::types::PgNumeric::NegativeInfinity => None,
312+
etl::types::PgNumeric::Value {
313+
sign,
314+
weight,
315+
scale: _,
316+
digits,
317+
} => {
318+
if digits.is_empty() {
319+
return Some(0);
320+
}
321+
322+
// Compose base-10000 groups into an integer accumulator.
323+
let mut acc: i128 = 0;
324+
for &g in digits.iter() {
325+
let gi = g as i128;
326+
acc = acc.checked_mul(10_000)?.checked_add(gi)?;
327+
}
328+
329+
// Decimal 10^ exponent to align composed base-10000 integer with actual value,
330+
// then apply desired target scale. Do NOT use pg_scale here; value is fully
331+
// described by digits and weight.
332+
let shift_groups = *weight as i32 - (digits.len() as i32 - 1);
333+
let exp10 = shift_groups * 4 + scale;
334+
335+
// Apply 10^exp10 scaling with checked math.
336+
fn pow10_i128(mut e: i32) -> Option<i128> {
337+
if e < 0 {
338+
return None;
339+
}
340+
let mut r: i128 = 1;
341+
while e > 0 {
342+
r = r.checked_mul(10)?;
343+
e -= 1;
344+
}
345+
Some(r)
346+
}
347+
348+
if exp10 >= 0 {
349+
acc = acc.checked_mul(pow10_i128(exp10)?)?;
313350
} else {
314-
None
351+
let div = pow10_i128(-exp10)?;
352+
acc /= div; // truncate toward zero
353+
}
354+
355+
// Apply sign
356+
let is_negative = matches!(sign, Sign::Negative);
357+
if is_negative {
358+
acc = -acc;
359+
}
360+
361+
// Enforce precision limit
362+
fn count_digits(mut v: i128) -> i32 {
363+
if v == 0 {
364+
return 1;
365+
}
366+
if v < 0 {
367+
v = -v;
368+
}
369+
let mut c = 0;
370+
while v > 0 {
371+
v /= 10;
372+
c += 1;
373+
}
374+
c
375+
}
376+
if count_digits(acc) > precision {
377+
return None;
315378
}
379+
380+
Some(acc)
316381
}
317-
_ => None,
318382
}
319383
}
320384

@@ -910,12 +974,39 @@ fn build_decimal128_list_array(
910974
rows: &[TableRow],
911975
field_idx: usize,
912976
field: FieldRef,
913-
_precision: u8,
914-
_scale: i8,
977+
precision: u8,
978+
scale: i8,
915979
) -> ArrayRef {
916-
// For now, fall back to string representation for decimal arrays
917-
// This is a simplified implementation that avoids complex Arrow data type manipulation
918-
build_list_array_for_strings(rows, field_idx, field)
980+
let mut list_builder = ListBuilder::new(
981+
Decimal128Builder::new().with_data_type(DataType::Decimal128(precision, scale)),
982+
)
983+
.with_field(field.clone());
984+
985+
for row in rows {
986+
if let Some(array_cell) = cell_to_array_cell(&row.values[field_idx]) {
987+
match array_cell {
988+
ArrayCell::Numeric(vec) => {
989+
for item in vec {
990+
let val = item.as_ref().and_then(|n| {
991+
pg_numeric_to_decimal_i128(n, precision as i32, scale as i32)
992+
});
993+
match val {
994+
Some(v) => list_builder.values().append_value(v),
995+
None => list_builder.values().append_null(),
996+
}
997+
}
998+
list_builder.append(true);
999+
}
1000+
_ => {
1001+
return build_list_array_for_strings(rows, field_idx, field);
1002+
}
1003+
}
1004+
} else {
1005+
list_builder.append_null();
1006+
}
1007+
}
1008+
1009+
Arc::new(list_builder.finish())
9191010
}
9201011

9211012
/// Builds a list array for string elements.
@@ -1486,19 +1577,49 @@ mod tests {
14861577
},
14871578
];
14881579

1489-
let array_ref = build_array_for_field(&rows, 0, &DataType::LargeBinary);
1580+
let array_ref = build_array_for_field(&rows, 0, &DataType::Binary);
14901581
let binary_array = array_ref
14911582
.as_any()
1492-
.downcast_ref::<arrow::array::LargeBinaryArray>()
1583+
.downcast_ref::<arrow::array::BinaryArray>()
14931584
.unwrap();
1494-
14951585
assert_eq!(binary_array.len(), 4);
14961586
assert_eq!(binary_array.value(0), test_bytes);
14971587
assert_eq!(binary_array.value(1), Vec::<u8>::new());
14981588
assert!(binary_array.is_null(2));
14991589
assert!(binary_array.is_null(3));
15001590
}
15011591

1592+
#[test]
1593+
fn test_build_large_binary_array() {
1594+
let test_bytes = vec![1, 2, 3, 4, 5];
1595+
let rows = vec![
1596+
TableRow {
1597+
values: vec![Cell::Bytes(test_bytes.clone())],
1598+
},
1599+
TableRow {
1600+
values: vec![Cell::Bytes(vec![])],
1601+
},
1602+
TableRow {
1603+
values: vec![Cell::Null],
1604+
},
1605+
TableRow {
1606+
values: vec![Cell::String("not bytes".to_string())],
1607+
},
1608+
];
1609+
1610+
let array_ref = build_array_for_field(&rows, 0, &DataType::LargeBinary);
1611+
let large_binary_array = array_ref
1612+
.as_any()
1613+
.downcast_ref::<arrow::array::LargeBinaryArray>()
1614+
.unwrap();
1615+
1616+
assert_eq!(large_binary_array.len(), 4);
1617+
assert_eq!(large_binary_array.value(0), test_bytes);
1618+
assert_eq!(large_binary_array.value(1), Vec::<u8>::new());
1619+
assert!(large_binary_array.is_null(2));
1620+
assert!(large_binary_array.is_null(3));
1621+
}
1622+
15021623
#[test]
15031624
fn test_build_date32_array() {
15041625
use chrono::NaiveDate;
@@ -1667,6 +1788,47 @@ mod tests {
16671788
assert!(uuid_array.is_null(2));
16681789
}
16691790

1791+
#[test]
1792+
fn test_build_decimal128_array() {
1793+
use arrow::datatypes::{Field, Schema};
1794+
use etl::types::PgNumeric;
1795+
1796+
let rows = vec![
1797+
TableRow {
1798+
values: vec![Cell::Numeric("123.45".parse::<PgNumeric>().unwrap())],
1799+
},
1800+
TableRow {
1801+
values: vec![Cell::Numeric("-0.01".parse::<PgNumeric>().unwrap())],
1802+
},
1803+
TableRow {
1804+
values: vec![Cell::Null],
1805+
},
1806+
TableRow {
1807+
values: vec![Cell::Numeric("0".parse::<PgNumeric>().unwrap())],
1808+
},
1809+
];
1810+
1811+
let schema = Schema::new(vec![Field::new(
1812+
"amount",
1813+
DataType::Decimal128(10, 2),
1814+
true,
1815+
)]);
1816+
1817+
let batch = rows_to_record_batch(&rows, schema).unwrap();
1818+
let dec_array = batch
1819+
.column(0)
1820+
.as_any()
1821+
.downcast_ref::<arrow::array::Decimal128Array>()
1822+
.unwrap();
1823+
1824+
assert_eq!(dec_array.len(), 4);
1825+
assert_eq!(dec_array.data_type(), &DataType::Decimal128(10, 2));
1826+
assert_eq!(dec_array.value(0), 12_345); // 123.45 -> 12345 (scale 2)
1827+
assert_eq!(dec_array.value(1), -1); // -0.01 -> -1 (scale 2)
1828+
assert!(dec_array.is_null(2));
1829+
assert_eq!(dec_array.value(3), 0);
1830+
}
1831+
16701832
#[test]
16711833
fn test_rows_to_record_batch_simple() {
16721834
use arrow::datatypes::{Field, Schema};
@@ -2888,6 +3050,59 @@ mod tests {
28883050
assert!(list_array.is_null(3));
28893051
}
28903052

3053+
#[test]
3054+
fn test_build_decimal128_list_array() {
3055+
use arrow::array::ListArray;
3056+
use arrow::datatypes::Field;
3057+
use etl::types::PgNumeric;
3058+
3059+
let precision: u8 = 10;
3060+
let scale: i8 = 2;
3061+
3062+
let field = Field::new("item", DataType::Decimal128(precision, scale), true);
3063+
let field_ref = Arc::new(field);
3064+
3065+
let rows = vec![
3066+
TableRow {
3067+
values: vec![Cell::Array(ArrayCell::Numeric(vec![
3068+
Some("123.45".parse::<PgNumeric>().unwrap()),
3069+
None,
3070+
Some("-0.01".parse::<PgNumeric>().unwrap()),
3071+
]))],
3072+
},
3073+
TableRow {
3074+
values: vec![Cell::Array(ArrayCell::Numeric(vec![]))],
3075+
}, // empty list
3076+
TableRow {
3077+
values: vec![Cell::Null],
3078+
}, // null list
3079+
];
3080+
3081+
let array_ref = build_decimal128_list_array(&rows, 0, field_ref.clone(), precision, scale);
3082+
let list_array = array_ref.as_any().downcast_ref::<ListArray>().unwrap();
3083+
3084+
assert_eq!(list_array.len(), 3);
3085+
3086+
// Row 0
3087+
assert!(!list_array.is_null(0));
3088+
let first_list = list_array.value(0);
3089+
let dec_array = first_list
3090+
.as_any()
3091+
.downcast_ref::<arrow::array::Decimal128Array>()
3092+
.unwrap();
3093+
assert_eq!(dec_array.len(), 3);
3094+
assert_eq!(dec_array.value(0), 12_345); // 123.45
3095+
assert!(dec_array.is_null(1));
3096+
assert_eq!(dec_array.value(2), -1); // -0.01
3097+
3098+
// Row 1: empty list
3099+
assert!(!list_array.is_null(1));
3100+
assert_eq!(list_array.value(1).len(), 0);
3101+
3102+
// Row 2: null list
3103+
assert!(list_array.is_null(2));
3104+
}
3105+
28913106
#[test]
28923107
fn test_build_list_array_for_strings() {
28933108
use arrow::array::ListArray;

0 commit comments

Comments
 (0)