Skip to content

Commit a485f5e

Browse files
authored
Feature: Support importing nested vectors as well as timestamp ns (#5178)
fix: #4848 for real Signed-off-by: Robert Kruszewski <[email protected]> --------- Signed-off-by: Robert Kruszewski <[email protected]>
1 parent dacbb25 commit a485f5e

File tree

3 files changed

+235
-12
lines changed

3 files changed

+235
-12
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vortex-duckdb/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ crate-type = ["staticlib", "cdylib", "rlib"]
2323
anyhow = { workspace = true }
2424
arrow-array = { workspace = true }
2525
arrow-buffer = { workspace = true }
26+
arrow-schema = { workspace = true }
2627
async-compat = { workspace = true }
2728
async-fs = { workspace = true }
2829
bitvec = { workspace = true }

vortex-duckdb/src/convert/vector.rs

Lines changed: 233 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,28 @@ use arrow_array::types::{
1010
UInt32Type, UInt64Type,
1111
};
1212
use arrow_array::{
13-
Array, BooleanArray, Date32Array, Decimal128Array, PrimitiveArray, StringArray,
14-
Time64MicrosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
15-
TimestampNanosecondArray, TimestampSecondArray,
13+
Array, BooleanArray, Date32Array, Decimal128Array, FixedSizeListArray, GenericListViewArray,
14+
PrimitiveArray, StringArray, Time64MicrosecondArray, Time64NanosecondArray,
15+
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
16+
TimestampSecondArray,
1617
};
1718
use arrow_buffer::buffer::BooleanBuffer;
19+
use arrow_schema::Field;
1820
use num_traits::AsPrimitive;
1921
use vortex::ArrayRef;
2022
use vortex::arrays::StructArray;
2123
use vortex::arrow::FromArrowArray;
22-
use vortex::dtype::{DecimalDType, FieldNames};
23-
use vortex::error::{VortexResult, vortex_err};
24+
use vortex::buffer::BufferMut;
25+
use vortex::dtype::{DType, DecimalDType, FieldNames, Nullability};
26+
use vortex::error::{VortexExpect, VortexResult, vortex_err};
2427
use vortex::scalar::DecimalType;
28+
use vortex::validity::Validity;
2529

30+
use crate::convert::dtype::FromLogicalType;
2631
use crate::cpp::{
27-
DUCKDB_TYPE, duckdb_date, duckdb_string_t, duckdb_string_t_data, duckdb_string_t_length,
28-
duckdb_time, duckdb_timestamp, duckdb_timestamp_ms, duckdb_timestamp_s,
32+
DUCKDB_TYPE, duckdb_date, duckdb_list_entry, duckdb_string_t, duckdb_string_t_data,
33+
duckdb_string_t_length, duckdb_time, duckdb_time_ns, duckdb_timestamp, duckdb_timestamp_ms,
34+
duckdb_timestamp_s,
2935
};
3036
use crate::duckdb::{DataChunk, Vector};
3137
use crate::exporter::precision_to_duckdb_storage_size;
@@ -187,6 +193,16 @@ pub fn flat_vector_to_arrow_array(
187193
),
188194
))
189195
}
196+
DUCKDB_TYPE::DUCKDB_TYPE_TIME_NS => {
197+
let data = vector.as_slice_with_len::<duckdb_time_ns>(len);
198+
199+
Ok(Arc::new(
200+
Time64NanosecondArray::from_iter_values_with_nulls(
201+
data.iter().map(|duckdb_time_ns { nanos }| *nanos),
202+
vector.validity_ref(data.len()).to_null_buffer(),
203+
),
204+
))
205+
}
190206
DUCKDB_TYPE::DUCKDB_TYPE_SMALLINT => {
191207
let data = vector.as_slice_with_len::<i16>(len);
192208

@@ -311,6 +327,84 @@ pub fn flat_vector_to_arrow_array(
311327

312328
Ok(Arc::new(decimal_array))
313329
}
330+
DUCKDB_TYPE::DUCKDB_TYPE_ARRAY => {
331+
let array_elem_size = vector.logical_type().array_type_array_size();
332+
let array_child_type = vector.logical_type().array_child_type();
333+
let data_arrow = flat_vector_to_arrow_array(
334+
&mut vector.array_vector_get_child(),
335+
len * array_elem_size as usize,
336+
)?;
337+
Ok(Arc::new(FixedSizeListArray::try_new(
338+
Arc::new(Field::new(
339+
"element",
340+
DType::from_logical_type(array_child_type, Nullability::Nullable)?
341+
.to_arrow_dtype()?,
342+
true,
343+
)),
344+
array_elem_size as i32,
345+
data_arrow,
346+
vector.validity_ref(len).to_null_buffer(),
347+
)?))
348+
}
349+
DUCKDB_TYPE::DUCKDB_TYPE_LIST => {
350+
let array_child_type = vector.logical_type().list_child_type();
351+
352+
let mut offsets = BufferMut::with_capacity(len);
353+
let mut lengths = BufferMut::with_capacity(len);
354+
for duckdb_list_entry { offset, length } in
355+
vector.as_slice_with_len::<duckdb_list_entry>(len)
356+
{
357+
unsafe {
358+
offsets.push_unchecked(
359+
i64::try_from(*offset).vortex_expect("offset must fit i64"),
360+
);
361+
lengths.push_unchecked(
362+
i64::try_from(*length).vortex_expect("length must fit i64"),
363+
);
364+
}
365+
}
366+
let offsets = offsets.freeze();
367+
let lengths = lengths.freeze();
368+
let arrow_child = flat_vector_to_arrow_array(
369+
&mut vector.list_vector_get_child(),
370+
usize::try_from(offsets[len - 1] + lengths[len - 1])
371+
.vortex_expect("last offset and length sum must fit in usize "),
372+
)?;
373+
374+
Ok(Arc::new(GenericListViewArray::try_new(
375+
Arc::new(Field::new(
376+
"element",
377+
DType::from_logical_type(array_child_type, Nullability::Nullable)?
378+
.to_arrow_dtype()?,
379+
true,
380+
)),
381+
offsets.into_arrow_scalar_buffer(),
382+
lengths.into_arrow_scalar_buffer(),
383+
arrow_child,
384+
vector.validity_ref(len).to_null_buffer(),
385+
)?))
386+
}
387+
DUCKDB_TYPE::DUCKDB_TYPE_STRUCT => {
388+
let children = (0..vector.logical_type().struct_type_child_count())
389+
.map(|idx| {
390+
flat_vector_to_arrow_array(&mut vector.struct_vector_get_child(idx), len)
391+
})
392+
.collect::<Result<Vec<_>, _>>()?;
393+
if children.is_empty() {
394+
Ok(Arc::new(arrow_array::StructArray::new_empty_fields(
395+
len,
396+
vector.validity_ref(len).to_null_buffer(),
397+
)))
398+
} else {
399+
Ok(Arc::new(arrow_array::StructArray::try_new(
400+
DType::from_logical_type(vector.logical_type(), Nullability::NonNullable)?
401+
.to_arrow_schema()?
402+
.fields,
403+
children,
404+
vector.validity_ref(len).to_null_buffer(),
405+
)?))
406+
}
407+
}
314408
_ => todo!("missing impl for {type_id:?}"),
315409
}
316410
}
@@ -319,28 +413,37 @@ pub fn data_chunk_to_arrow(field_names: &FieldNames, chunk: &DataChunk) -> Vorte
319413
let len = chunk.len();
320414

321415
let columns = (0..chunk.column_count())
322-
.zip(field_names.iter())
323-
.map(|(i, name)| {
416+
.map(|i| {
324417
let mut vector = chunk.get_vector(i);
325418
vector.flatten(len);
326419
flat_vector_to_arrow_array(&mut vector, len.as_())
327420
.map(|array_data| {
328421
let chunk_len: usize = chunk.len().as_();
329422
assert_eq!(array_data.len(), chunk_len);
330-
(name, ArrayRef::from_arrow(array_data.as_ref(), true))
423+
ArrayRef::from_arrow(array_data.as_ref(), true)
331424
})
332425
.map_err(|e| vortex_err!("duckdb to arrow conversion failure {e}"))
333426
})
334-
.collect::<VortexResult<Vec<_>>>()?;
335-
StructArray::try_from_iter(columns).map(|a| a.to_array())
427+
.collect::<VortexResult<Arc<_>>>()?;
428+
StructArray::try_new(
429+
field_names.clone(),
430+
columns,
431+
len.as_(),
432+
Validity::NonNullable,
433+
)
434+
.map(|a| a.to_array())
336435
}
337436

338437
#[cfg(test)]
339438
mod tests {
439+
use std::ffi::CString;
440+
441+
use arrow_array::cast::AsArray;
340442
use arrow_array::{
341443
BooleanArray, Int32Array, TimestampMicrosecondArray, TimestampMillisecondArray,
342444
TimestampSecondArray,
343445
};
446+
use vortex::error::VortexUnwrap;
344447

345448
use super::*;
346449
use crate::cpp::DUCKDB_TYPE;
@@ -598,4 +701,122 @@ mod tests {
598701
assert_eq!(arrow_array.value(0), 1);
599702
assert_eq!(arrow_array.value(2), 3);
600703
}
704+
705+
#[test]
706+
fn test_list() {
707+
let values = vec![1i32, 2, 3, 4];
708+
let len = 1;
709+
710+
let logical_type =
711+
LogicalType::list_type(LogicalType::new(DUCKDB_TYPE::DUCKDB_TYPE_INTEGER))
712+
.vortex_unwrap();
713+
let mut vector = Vector::with_capacity(logical_type, len);
714+
715+
// Populate with data
716+
unsafe {
717+
let entries = vector.as_slice_mut::<duckdb_list_entry>(len);
718+
entries[0] = duckdb_list_entry {
719+
offset: 0,
720+
length: values.len() as u64,
721+
};
722+
let mut child = vector.list_vector_get_child();
723+
let slice = child.as_slice_mut::<i32>(values.len());
724+
slice.copy_from_slice(&values);
725+
}
726+
727+
// Test conversion
728+
let result = flat_vector_to_arrow_array(&mut vector, len).unwrap();
729+
let arrow_array = result.as_list_view::<i64>();
730+
731+
assert_eq!(arrow_array.len(), len);
732+
assert_eq!(
733+
arrow_array.value(0).as_primitive::<Int32Type>(),
734+
&Int32Array::from_iter([1, 2, 3, 4])
735+
);
736+
}
737+
738+
#[test]
739+
fn test_fixed_sized_list() {
740+
let values = vec![1i32, 2, 3, 4];
741+
let len = 1;
742+
743+
let logical_type =
744+
LogicalType::array_type(LogicalType::new(DUCKDB_TYPE::DUCKDB_TYPE_INTEGER), 4)
745+
.vortex_unwrap();
746+
let mut vector = Vector::with_capacity(logical_type, len);
747+
748+
// Populate with data
749+
unsafe {
750+
let mut child = vector.array_vector_get_child();
751+
let slice = child.as_slice_mut::<i32>(values.len());
752+
slice.copy_from_slice(&values);
753+
}
754+
755+
// Test conversion
756+
let result = flat_vector_to_arrow_array(&mut vector, len).unwrap();
757+
let arrow_array = result.as_fixed_size_list();
758+
759+
assert_eq!(arrow_array.len(), len);
760+
assert_eq!(
761+
arrow_array.value(0).as_primitive::<Int32Type>(),
762+
&Int32Array::from_iter([1, 2, 3, 4])
763+
);
764+
}
765+
766+
#[test]
767+
fn test_empty_struct() {
768+
let len = 4;
769+
let logical_type = LogicalType::struct_type([], []).vortex_unwrap();
770+
let mut vector = Vector::with_capacity(logical_type, len);
771+
772+
// Test conversion
773+
let result = flat_vector_to_arrow_array(&mut vector, len).unwrap();
774+
let arrow_array = result.as_struct();
775+
776+
assert_eq!(arrow_array.len(), len);
777+
assert_eq!(arrow_array.fields().len(), 0);
778+
}
779+
780+
#[test]
781+
fn test_struct() {
782+
let values1 = vec![1i32, 2, 3, 4];
783+
let values2 = vec![5i32, 6, 7, 8];
784+
let len = values1.len();
785+
786+
let logical_type = LogicalType::struct_type(
787+
[
788+
LogicalType::new(DUCKDB_TYPE::DUCKDB_TYPE_INTEGER),
789+
LogicalType::new(DUCKDB_TYPE::DUCKDB_TYPE_INTEGER),
790+
],
791+
[CString::new("a").unwrap(), CString::new("b").unwrap()],
792+
)
793+
.vortex_unwrap();
794+
let mut vector = Vector::with_capacity(logical_type, len);
795+
796+
// Populate with data
797+
for (i, values) in
798+
(0..vector.logical_type().struct_type_child_count()).zip([values1, values2])
799+
{
800+
unsafe {
801+
let mut child = vector.struct_vector_get_child(i);
802+
let slice = child.as_slice_mut::<i32>(len);
803+
slice.copy_from_slice(&values);
804+
}
805+
}
806+
807+
// Test conversion
808+
let result = flat_vector_to_arrow_array(&mut vector, len).unwrap();
809+
let arrow_array = result.as_struct();
810+
811+
assert_eq!(arrow_array.len(), len);
812+
assert_eq!(arrow_array.fields().len(), 2);
813+
assert_eq!(
814+
arrow_array.column(0).as_primitive::<Int32Type>(),
815+
&Int32Array::from_iter([1, 2, 3, 4])
816+
);
817+
assert_eq!(
818+
arrow_array.column(1).as_primitive::<Int32Type>(),
819+
&Int32Array::from_iter([5, 6, 7, 8])
820+
);
821+
}
601822
}

0 commit comments

Comments
 (0)