@@ -2,9 +2,9 @@ use std::sync::Arc;
2
2
3
3
use arrow:: {
4
4
array:: {
5
- ArrayRef , ArrowPrimitiveType , BooleanBuilder , Decimal128Array , FixedSizeBinaryBuilder ,
6
- LargeBinaryBuilder , ListBuilder , PrimitiveBuilder , RecordBatch , StringBuilder ,
7
- TimestampMicrosecondBuilder ,
5
+ ArrayRef , ArrowPrimitiveType , BinaryBuilder , BooleanBuilder , Decimal128Array ,
6
+ Decimal128Builder , FixedSizeBinaryBuilder , LargeBinaryBuilder , ListBuilder ,
7
+ PrimitiveBuilder , RecordBatch , StringBuilder , TimestampMicrosecondBuilder ,
8
8
} ,
9
9
datatypes:: {
10
10
DataType , Date32Type , FieldRef , Float32Type , Float64Type , Int16Type , Int32Type , Int64Type ,
@@ -13,7 +13,10 @@ use arrow::{
13
13
error:: ArrowError ,
14
14
} ;
15
15
use chrono:: { NaiveDate , NaiveTime } ;
16
- use etl:: types:: { ArrayCell , Cell , DATE_FORMAT , TIME_FORMAT , TIMESTAMP_FORMAT , TableRow } ;
16
+ use etl:: {
17
+ conversions:: numeric:: Sign ,
18
+ types:: { ArrayCell , Cell , DATE_FORMAT , TIME_FORMAT , TIMESTAMP_FORMAT , TableRow } ,
19
+ } ;
17
20
18
21
pub const UNIX_EPOCH : NaiveDate =
19
22
NaiveDate :: from_ymd_opt ( 1970 , 1 , 1 ) . expect ( "unix epoch is a valid date" ) ;
@@ -81,15 +84,13 @@ pub fn rows_to_record_batch(rows: &[TableRow], schema: Schema) -> Result<RecordB
81
84
fn build_array_for_field ( rows : & [ TableRow ] , field_idx : usize , data_type : & DataType ) -> ArrayRef {
82
85
match data_type {
83
86
DataType :: Boolean => build_boolean_array ( rows, field_idx) ,
84
- DataType :: Int16 => build_primitive_array :: < Int16Type , _ > ( rows, field_idx, cell_to_i16) ,
85
87
DataType :: Int32 => build_primitive_array :: < Int32Type , _ > ( rows, field_idx, cell_to_i32) ,
86
88
DataType :: Int64 => build_primitive_array :: < Int64Type , _ > ( rows, field_idx, cell_to_i64) ,
87
- DataType :: UInt32 => build_primitive_array :: < UInt32Type , _ > ( rows, field_idx, cell_to_u32) ,
88
89
DataType :: Float32 => build_primitive_array :: < Float32Type , _ > ( rows, field_idx, cell_to_f32) ,
89
90
DataType :: Float64 => build_primitive_array :: < Float64Type , _ > ( rows, field_idx, cell_to_f64) ,
90
91
DataType :: Utf8 => build_string_array ( rows, field_idx) ,
91
92
DataType :: Binary => build_binary_array ( rows, field_idx) ,
92
- DataType :: LargeBinary => build_binary_array ( rows, field_idx) ,
93
+ DataType :: LargeBinary => build_large_binary_array ( rows, field_idx) ,
93
94
DataType :: Date32 => build_primitive_array :: < Date32Type , _ > ( rows, field_idx, cell_to_date32) ,
94
95
DataType :: Time64 ( TimeUnit :: Microsecond ) => {
95
96
build_primitive_array :: < Time64MicrosecondType , _ > ( rows, field_idx, cell_to_time64)
@@ -155,7 +156,8 @@ macro_rules! impl_array_builder {
155
156
156
157
impl_array_builder ! ( build_boolean_array, BooleanBuilder , cell_to_bool) ;
157
158
impl_array_builder ! ( build_string_array, StringBuilder , cell_to_string) ;
158
- impl_array_builder ! ( build_binary_array, LargeBinaryBuilder , cell_to_bytes) ;
159
+ impl_array_builder ! ( build_binary_array, BinaryBuilder , cell_to_bytes) ;
160
+ impl_array_builder ! ( build_large_binary_array, LargeBinaryBuilder , cell_to_bytes) ;
159
161
160
162
/// Builds a decimal128 array from [`TableRow`]s for a specific field.
161
163
fn build_decimal128_array (
@@ -263,22 +265,6 @@ fn cell_to_i64(cell: &Cell) -> Option<i64> {
263
265
}
264
266
}
265
267
266
- /// Converts a [`Cell`] to a 16-bit signed integer.
267
- fn cell_to_i16 ( cell : & Cell ) -> Option < i16 > {
268
- match cell {
269
- Cell :: I16 ( v) => Some ( * v) ,
270
- _ => None ,
271
- }
272
- }
273
-
274
- /// Converts a [`Cell`] to a 32-bit unsigned integer.
275
- fn cell_to_u32 ( cell : & Cell ) -> Option < u32 > {
276
- match cell {
277
- Cell :: U32 ( v) => Some ( * v) ,
278
- _ => None ,
279
- }
280
- }
281
-
282
268
/// Converts a [`Cell`] to a 32-bit floating-point number.
283
269
///
284
270
/// Extracts 32-bit float values from [`Cell::F32`] variants, returning
@@ -302,19 +288,97 @@ fn cell_to_f64(cell: &Cell) -> Option<f64> {
302
288
}
303
289
304
290
/// Converts a [`Cell`] to a decimal128 value.
305
- fn cell_to_decimal128 ( cell : & Cell , _precision : u8 , scale : i8 ) -> Option < i128 > {
291
+ fn cell_to_decimal128 ( cell : & Cell , precision : u8 , scale : i8 ) -> Option < i128 > {
306
292
match cell {
307
- Cell :: Numeric ( n) => {
308
- // This is a simplified conversion - ideally we'd preserve the exact decimal representation
309
- if let Ok ( string_val) = n. to_string ( ) . parse :: < f64 > ( ) {
310
- // Scale up by the scale factor and convert to i128
311
- let scaled = ( string_val * 10_f64 . powi ( scale as i32 ) ) as i128 ;
312
- Some ( scaled)
293
+ Cell :: Numeric ( n) => pg_numeric_to_decimal_i128 ( n, precision as i32 , scale as i32 ) ,
294
+ _ => None ,
295
+ }
296
+ }
297
+
298
+ /// Convert PgNumeric to a scaled i128 matching Decimal128(precision, scale) exactly using string math.
299
+ fn pg_numeric_to_decimal_i128 (
300
+ n : & etl:: types:: PgNumeric ,
301
+ precision : i32 ,
302
+ scale : i32 ,
303
+ ) -> Option < i128 > {
304
+ if precision <= 0 || scale < 0 || scale > precision {
305
+ return None ;
306
+ }
307
+
308
+ match n {
309
+ etl:: types:: PgNumeric :: NaN
310
+ | etl:: types:: PgNumeric :: PositiveInfinity
311
+ | etl:: types:: PgNumeric :: NegativeInfinity => None ,
312
+ etl:: types:: PgNumeric :: Value {
313
+ sign,
314
+ weight,
315
+ scale : _,
316
+ digits,
317
+ } => {
318
+ if digits. is_empty ( ) {
319
+ return Some ( 0 ) ;
320
+ }
321
+
322
+ // Compose base-10000 groups into an integer accumulator.
323
+ let mut acc: i128 = 0 ;
324
+ for & g in digits. iter ( ) {
325
+ let gi = g as i128 ;
326
+ acc = acc. checked_mul ( 10_000 ) ?. checked_add ( gi) ?;
327
+ }
328
+
329
+ // Decimal 10^ exponent to align composed base-10000 integer with actual value,
330
+ // then apply desired target scale. Do NOT use pg_scale here; value is fully
331
+ // described by digits and weight.
332
+ let shift_groups = * weight as i32 - ( digits. len ( ) as i32 - 1 ) ;
333
+ let exp10 = shift_groups * 4 + scale;
334
+
335
+ // Apply 10^exp10 scaling with checked math.
336
+ fn pow10_i128 ( mut e : i32 ) -> Option < i128 > {
337
+ if e < 0 {
338
+ return None ;
339
+ }
340
+ let mut r: i128 = 1 ;
341
+ while e > 0 {
342
+ r = r. checked_mul ( 10 ) ?;
343
+ e -= 1 ;
344
+ }
345
+ Some ( r)
346
+ }
347
+
348
+ if exp10 >= 0 {
349
+ acc = acc. checked_mul ( pow10_i128 ( exp10) ?) ?;
313
350
} else {
314
- None
351
+ let div = pow10_i128 ( -exp10) ?;
352
+ acc /= div; // truncate toward zero
353
+ }
354
+
355
+ // Apply sign
356
+ let is_negative = matches ! ( sign, Sign :: Negative ) ;
357
+ if is_negative {
358
+ acc = -acc;
359
+ }
360
+
361
+ // Enforce precision limit
362
+ fn count_digits ( mut v : i128 ) -> i32 {
363
+ if v == 0 {
364
+ return 1 ;
365
+ }
366
+ if v < 0 {
367
+ v = -v;
368
+ }
369
+ let mut c = 0 ;
370
+ while v > 0 {
371
+ v /= 10 ;
372
+ c += 1 ;
373
+ }
374
+ c
375
+ }
376
+ if count_digits ( acc) > precision {
377
+ return None ;
315
378
}
379
+
380
+ Some ( acc)
316
381
}
317
- _ => None ,
318
382
}
319
383
}
320
384
@@ -910,12 +974,39 @@ fn build_decimal128_list_array(
910
974
rows : & [ TableRow ] ,
911
975
field_idx : usize ,
912
976
field : FieldRef ,
913
- _precision : u8 ,
914
- _scale : i8 ,
977
+ precision : u8 ,
978
+ scale : i8 ,
915
979
) -> ArrayRef {
916
- // For now, fall back to string representation for decimal arrays
917
- // This is a simplified implementation that avoids complex Arrow data type manipulation
918
- build_list_array_for_strings ( rows, field_idx, field)
980
+ let mut list_builder = ListBuilder :: new (
981
+ Decimal128Builder :: new ( ) . with_data_type ( DataType :: Decimal128 ( precision, scale) ) ,
982
+ )
983
+ . with_field ( field. clone ( ) ) ;
984
+
985
+ for row in rows {
986
+ if let Some ( array_cell) = cell_to_array_cell ( & row. values [ field_idx] ) {
987
+ match array_cell {
988
+ ArrayCell :: Numeric ( vec) => {
989
+ for item in vec {
990
+ let val = item. as_ref ( ) . and_then ( |n| {
991
+ pg_numeric_to_decimal_i128 ( n, precision as i32 , scale as i32 )
992
+ } ) ;
993
+ match val {
994
+ Some ( v) => list_builder. values ( ) . append_value ( v) ,
995
+ None => list_builder. values ( ) . append_null ( ) ,
996
+ }
997
+ }
998
+ list_builder. append ( true ) ;
999
+ }
1000
+ _ => {
1001
+ return build_list_array_for_strings ( rows, field_idx, field) ;
1002
+ }
1003
+ }
1004
+ } else {
1005
+ list_builder. append_null ( ) ;
1006
+ }
1007
+ }
1008
+
1009
+ Arc :: new ( list_builder. finish ( ) )
919
1010
}
920
1011
921
1012
/// Builds a list array for string elements.
@@ -1486,19 +1577,49 @@ mod tests {
1486
1577
} ,
1487
1578
] ;
1488
1579
1489
- let array_ref = build_array_for_field ( & rows, 0 , & DataType :: LargeBinary ) ;
1580
+ let array_ref = build_array_for_field ( & rows, 0 , & DataType :: Binary ) ;
1490
1581
let binary_array = array_ref
1491
1582
. as_any ( )
1492
- . downcast_ref :: < arrow:: array:: LargeBinaryArray > ( )
1583
+ . downcast_ref :: < arrow:: array:: BinaryArray > ( )
1493
1584
. unwrap ( ) ;
1494
-
1495
1585
assert_eq ! ( binary_array. len( ) , 4 ) ;
1496
1586
assert_eq ! ( binary_array. value( 0 ) , test_bytes) ;
1497
1587
assert_eq ! ( binary_array. value( 1 ) , Vec :: <u8 >:: new( ) ) ;
1498
1588
assert ! ( binary_array. is_null( 2 ) ) ;
1499
1589
assert ! ( binary_array. is_null( 3 ) ) ;
1500
1590
}
1501
1591
1592
+ #[ test]
1593
+ fn test_build_large_binary_array ( ) {
1594
+ let test_bytes = vec ! [ 1 , 2 , 3 , 4 , 5 ] ;
1595
+ let rows = vec ! [
1596
+ TableRow {
1597
+ values: vec![ Cell :: Bytes ( test_bytes. clone( ) ) ] ,
1598
+ } ,
1599
+ TableRow {
1600
+ values: vec![ Cell :: Bytes ( vec![ ] ) ] ,
1601
+ } ,
1602
+ TableRow {
1603
+ values: vec![ Cell :: Null ] ,
1604
+ } ,
1605
+ TableRow {
1606
+ values: vec![ Cell :: String ( "not bytes" . to_string( ) ) ] ,
1607
+ } ,
1608
+ ] ;
1609
+
1610
+ let array_ref = build_array_for_field ( & rows, 0 , & DataType :: LargeBinary ) ;
1611
+ let large_binary_array = array_ref
1612
+ . as_any ( )
1613
+ . downcast_ref :: < arrow:: array:: LargeBinaryArray > ( )
1614
+ . unwrap ( ) ;
1615
+
1616
+ assert_eq ! ( large_binary_array. len( ) , 4 ) ;
1617
+ assert_eq ! ( large_binary_array. value( 0 ) , test_bytes) ;
1618
+ assert_eq ! ( large_binary_array. value( 1 ) , Vec :: <u8 >:: new( ) ) ;
1619
+ assert ! ( large_binary_array. is_null( 2 ) ) ;
1620
+ assert ! ( large_binary_array. is_null( 3 ) ) ;
1621
+ }
1622
+
1502
1623
#[ test]
1503
1624
fn test_build_date32_array ( ) {
1504
1625
use chrono:: NaiveDate ;
@@ -1667,6 +1788,47 @@ mod tests {
1667
1788
assert ! ( uuid_array. is_null( 2 ) ) ;
1668
1789
}
1669
1790
1791
+ #[ test]
1792
+ fn test_build_decimal128_array ( ) {
1793
+ use arrow:: datatypes:: { Field , Schema } ;
1794
+ use etl:: types:: PgNumeric ;
1795
+
1796
+ let rows = vec ! [
1797
+ TableRow {
1798
+ values: vec![ Cell :: Numeric ( "123.45" . parse:: <PgNumeric >( ) . unwrap( ) ) ] ,
1799
+ } ,
1800
+ TableRow {
1801
+ values: vec![ Cell :: Numeric ( "-0.01" . parse:: <PgNumeric >( ) . unwrap( ) ) ] ,
1802
+ } ,
1803
+ TableRow {
1804
+ values: vec![ Cell :: Null ] ,
1805
+ } ,
1806
+ TableRow {
1807
+ values: vec![ Cell :: Numeric ( "0" . parse:: <PgNumeric >( ) . unwrap( ) ) ] ,
1808
+ } ,
1809
+ ] ;
1810
+
1811
+ let schema = Schema :: new ( vec ! [ Field :: new(
1812
+ "amount" ,
1813
+ DataType :: Decimal128 ( 10 , 2 ) ,
1814
+ true ,
1815
+ ) ] ) ;
1816
+
1817
+ let batch = rows_to_record_batch ( & rows, schema) . unwrap ( ) ;
1818
+ let dec_array = batch
1819
+ . column ( 0 )
1820
+ . as_any ( )
1821
+ . downcast_ref :: < arrow:: array:: Decimal128Array > ( )
1822
+ . unwrap ( ) ;
1823
+
1824
+ assert_eq ! ( dec_array. len( ) , 4 ) ;
1825
+ assert_eq ! ( dec_array. data_type( ) , & DataType :: Decimal128 ( 10 , 2 ) ) ;
1826
+ assert_eq ! ( dec_array. value( 0 ) , 12_345 ) ; // 123.45 -> 12345 (scale 2)
1827
+ assert_eq ! ( dec_array. value( 1 ) , -1 ) ; // -0.01 -> -1 (scale 2)
1828
+ assert ! ( dec_array. is_null( 2 ) ) ;
1829
+ assert_eq ! ( dec_array. value( 3 ) , 0 ) ;
1830
+ }
1831
+
1670
1832
#[ test]
1671
1833
fn test_rows_to_record_batch_simple ( ) {
1672
1834
use arrow:: datatypes:: { Field , Schema } ;
@@ -2888,6 +3050,59 @@ mod tests {
2888
3050
assert ! ( list_array. is_null( 3 ) ) ;
2889
3051
}
2890
3052
3053
+ #[ test]
3054
+ fn test_build_decimal128_list_array ( ) {
3055
+ use arrow:: array:: ListArray ;
3056
+ use arrow:: datatypes:: Field ;
3057
+ use etl:: types:: PgNumeric ;
3058
+
3059
+ let precision: u8 = 10 ;
3060
+ let scale: i8 = 2 ;
3061
+
3062
+ let field = Field :: new ( "item" , DataType :: Decimal128 ( precision, scale) , true ) ;
3063
+ let field_ref = Arc :: new ( field) ;
3064
+
3065
+ let rows = vec ! [
3066
+ TableRow {
3067
+ values: vec![ Cell :: Array ( ArrayCell :: Numeric ( vec![
3068
+ Some ( "123.45" . parse:: <PgNumeric >( ) . unwrap( ) ) ,
3069
+ None ,
3070
+ Some ( "-0.01" . parse:: <PgNumeric >( ) . unwrap( ) ) ,
3071
+ ] ) ) ] ,
3072
+ } ,
3073
+ TableRow {
3074
+ values: vec![ Cell :: Array ( ArrayCell :: Numeric ( vec![ ] ) ) ] ,
3075
+ } , // empty list
3076
+ TableRow {
3077
+ values: vec![ Cell :: Null ] ,
3078
+ } , // null list
3079
+ ] ;
3080
+
3081
+ let array_ref = build_decimal128_list_array ( & rows, 0 , field_ref. clone ( ) , precision, scale) ;
3082
+ let list_array = array_ref. as_any ( ) . downcast_ref :: < ListArray > ( ) . unwrap ( ) ;
3083
+
3084
+ assert_eq ! ( list_array. len( ) , 3 ) ;
3085
+
3086
+ // Row 0
3087
+ assert ! ( !list_array. is_null( 0 ) ) ;
3088
+ let first_list = list_array. value ( 0 ) ;
3089
+ let dec_array = first_list
3090
+ . as_any ( )
3091
+ . downcast_ref :: < arrow:: array:: Decimal128Array > ( )
3092
+ . unwrap ( ) ;
3093
+ assert_eq ! ( dec_array. len( ) , 3 ) ;
3094
+ assert_eq ! ( dec_array. value( 0 ) , 12_345 ) ; // 123.45
3095
+ assert ! ( dec_array. is_null( 1 ) ) ;
3096
+ assert_eq ! ( dec_array. value( 2 ) , -1 ) ; // -0.01
3097
+
3098
+ // Row 1: empty list
3099
+ assert ! ( !list_array. is_null( 1 ) ) ;
3100
+ assert_eq ! ( list_array. value( 1 ) . len( ) , 0 ) ;
3101
+
3102
+ // Row 2: null list
3103
+ assert ! ( list_array. is_null( 2 ) ) ;
3104
+ }
3105
+
2891
3106
#[ test]
2892
3107
fn test_build_list_array_for_strings ( ) {
2893
3108
use arrow:: array:: ListArray ;
0 commit comments