Skip to content

Commit e3c3383

Browse files
Improve data mapping integration test
Signed-off-by: Abhi Agarwal <[email protected]>
1 parent 9de4f17 commit e3c3383

File tree

3 files changed

+290
-67
lines changed

3 files changed

+290
-67
lines changed

etl-destinations/src/arrow/encoding.rs

Lines changed: 84 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,12 @@ fn build_array_for_field(rows: &[TableRow], field_idx: usize, data_type: &DataTy
102102
build_timestamptz_array(rows, field_idx, tz)
103103
}
104104
DataType::Timestamp(TimeUnit::Microsecond, None) => {
105-
build_primitive_array::<TimestampMicrosecondType, _>(rows, field_idx, cell_to_timestamp)
105+
// Support both naive timestamps and time-of-day encoded as timestamp-from-epoch.
106+
build_primitive_array::<TimestampMicrosecondType, _>(
107+
rows,
108+
field_idx,
109+
cell_to_timestamp_or_time,
110+
)
106111
}
107112
DataType::Decimal128(precision, scale) => {
108113
build_decimal128_array(rows, field_idx, *precision, *scale)
@@ -188,7 +193,12 @@ fn build_timestamptz_array(rows: &[TableRow], field_idx: usize, tz: &str) -> Arr
188193
let mut builder = TimestampMicrosecondBuilder::new().with_timezone(tz);
189194

190195
for row in rows {
191-
let arrow_value = cell_to_timestamptz(&row.values[field_idx]);
196+
// Accept either timestamptz values or time-of-day (encoded from epoch date).
197+
let arrow_value = match &row.values[field_idx] {
198+
Cell::TimestampTz(_) => cell_to_timestamptz(&row.values[field_idx]),
199+
Cell::Time(_) => cell_to_time_as_timestamp(&row.values[field_idx]),
200+
_ => None,
201+
};
192202
builder.append_option(arrow_value);
193203
}
194204

@@ -429,6 +439,20 @@ fn cell_to_timestamp(cell: &Cell) -> Option<i64> {
429439
}
430440
}
431441

442+
/// Converts a [`Cell`] to a 64-bit timestamp value (microseconds since Unix epoch),
443+
/// accepting either a naive timestamp or a time-of-day.
444+
///
445+
/// - [`Cell::Timestamp`] is converted like [`cell_to_timestamp`].
446+
/// - [`Cell::Time`] is converted to microseconds since Unix epoch by treating it as
447+
/// a time on the Unix epoch date (1970-01-01T00:00:00).
448+
fn cell_to_timestamp_or_time(cell: &Cell) -> Option<i64> {
449+
match cell {
450+
Cell::Timestamp(_) => cell_to_timestamp(cell),
451+
Cell::Time(time) => time.signed_duration_since(MIDNIGHT).num_microseconds(),
452+
_ => None,
453+
}
454+
}
455+
432456
/// Converts a [`Cell`] to a timezone-aware timestamp value (microseconds since Unix epoch).
433457
///
434458
/// Transforms timezone-aware [`Cell::TimestampTz`] values into microseconds
@@ -530,7 +554,7 @@ fn build_list_array(rows: &[TableRow], field_idx: usize, field: FieldRef) -> Arr
530554
DataType::Float64 => build_float64_list_array(rows, field_idx, field),
531555
DataType::Utf8 => build_string_list_array(rows, field_idx, field),
532556
DataType::Binary => build_binary_list_array(rows, field_idx, field),
533-
DataType::LargeBinary => build_binary_list_array(rows, field_idx, field),
557+
DataType::LargeBinary => build_large_binary_list_array(rows, field_idx, field),
534558
DataType::Date32 => build_date32_list_array(rows, field_idx, field),
535559
DataType::Time64(TimeUnit::Microsecond) => build_time64_list_array(rows, field_idx, field),
536560
DataType::Time64(TimeUnit::Nanosecond) => build_time64_list_array(rows, field_idx, field),
@@ -791,6 +815,34 @@ fn build_string_list_array(rows: &[TableRow], field_idx: usize, field: FieldRef)
791815

792816
/// Builds a list array for binary elements.
793817
fn build_binary_list_array(rows: &[TableRow], field_idx: usize, field: FieldRef) -> ArrayRef {
818+
let mut list_builder = ListBuilder::new(BinaryBuilder::new()).with_field(field.clone());
819+
820+
for row in rows {
821+
if let Some(array_cell) = cell_to_array_cell(&row.values[field_idx]) {
822+
match array_cell {
823+
ArrayCell::Bytes(vec) => {
824+
for item in vec {
825+
match item {
826+
Some(bytes) => list_builder.values().append_value(bytes),
827+
None => list_builder.values().append_null(),
828+
}
829+
}
830+
list_builder.append(true);
831+
}
832+
_ => {
833+
return build_list_array_for_strings(rows, field_idx, field);
834+
}
835+
}
836+
} else {
837+
list_builder.append_null();
838+
}
839+
}
840+
841+
Arc::new(list_builder.finish())
842+
}
843+
844+
/// Builds a list array for large binary elements.
845+
fn build_large_binary_list_array(rows: &[TableRow], field_idx: usize, field: FieldRef) -> ArrayRef {
794846
let mut list_builder = ListBuilder::new(LargeBinaryBuilder::new()).with_field(field.clone());
795847

796848
for row in rows {
@@ -889,6 +941,15 @@ fn build_timestamp_list_array(rows: &[TableRow], field_idx: usize, field: FieldR
889941
}
890942
list_builder.append(true);
891943
}
944+
ArrayCell::Time(vec) => {
945+
for item in vec {
946+
let arrow_value = item.and_then(|time| {
947+
time.signed_duration_since(MIDNIGHT).num_microseconds()
948+
});
949+
list_builder.values().append_option(arrow_value);
950+
}
951+
list_builder.append(true);
952+
}
892953
_ => {
893954
return build_list_array_for_strings(rows, field_idx, field);
894955
}
@@ -923,6 +984,15 @@ fn build_timestamptz_list_array(rows: &[TableRow], field_idx: usize, field: Fiel
923984
}
924985
list_builder.append(true);
925986
}
987+
ArrayCell::Time(vec) => {
988+
for item in vec {
989+
let arrow_value = item.and_then(|time| {
990+
time.signed_duration_since(MIDNIGHT).num_microseconds()
991+
});
992+
list_builder.values().append_option(arrow_value);
993+
}
994+
list_builder.append(true);
995+
}
926996
_ => {
927997
return build_list_array_for_strings(rows, field_idx, field);
928998
}
@@ -935,6 +1005,14 @@ fn build_timestamptz_list_array(rows: &[TableRow], field_idx: usize, field: Fiel
9351005
Arc::new(list_builder.finish())
9361006
}
9371007

1008+
/// Converts a [`Cell::Time`] to a timestamp-from-epoch in microseconds.
1009+
fn cell_to_time_as_timestamp(cell: &Cell) -> Option<i64> {
1010+
match cell {
1011+
Cell::Time(time) => time.signed_duration_since(MIDNIGHT).num_microseconds(),
1012+
_ => None,
1013+
}
1014+
}
1015+
9381016
/// Builds a list array for UUID elements.
9391017
fn build_uuid_list_array(rows: &[TableRow], field_idx: usize, field: FieldRef) -> ArrayRef {
9401018
let mut list_builder =
@@ -2608,7 +2686,7 @@ mod tests {
26082686
use arrow::array::ListArray;
26092687
use arrow::datatypes::Field;
26102688

2611-
let field = Field::new("items", DataType::LargeBinary, true);
2689+
let field = Field::new("items", DataType::Binary, true);
26122690
let field_ref = Arc::new(field);
26132691

26142692
let test_bytes_1 = vec![1, 2, 3, 4, 5];
@@ -2646,7 +2724,7 @@ mod tests {
26462724
let first_list = list_array.value(0);
26472725
let binary_array = first_list
26482726
.as_any()
2649-
.downcast_ref::<arrow::array::LargeBinaryArray>()
2727+
.downcast_ref::<arrow::array::BinaryArray>()
26502728
.unwrap();
26512729
assert_eq!(binary_array.len(), 3);
26522730
assert_eq!(binary_array.value(0), test_bytes_1);
@@ -2658,7 +2736,7 @@ mod tests {
26582736
let second_list = list_array.value(1);
26592737
let binary_array = second_list
26602738
.as_any()
2661-
.downcast_ref::<arrow::array::LargeBinaryArray>()
2739+
.downcast_ref::<arrow::array::BinaryArray>()
26622740
.unwrap();
26632741
assert_eq!(binary_array.len(), 1);
26642742
assert_eq!(binary_array.value(0), empty_bytes);

0 commit comments

Comments
 (0)