Skip to content

Commit 155ca5b

Browse files
authored
Fix Arrow list export (#5804)
Signed-off-by: Nicholas Gates <[email protected]>
1 parent d12f356 commit 155ca5b

File tree

5 files changed

+91
-146
lines changed

5 files changed

+91
-146
lines changed

vortex-array/src/arrays/extension/vtable/rules.rs

Lines changed: 2 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,16 @@ use vortex_error::VortexResult;
66
use crate::Array;
77
use crate::ArrayRef;
88
use crate::IntoArray;
9-
use crate::arrays::AnyScalarFn;
10-
use crate::arrays::ConstantArray;
11-
use crate::arrays::ConstantVTable;
129
use crate::arrays::ExtensionArray;
1310
use crate::arrays::ExtensionVTable;
1411
use crate::arrays::FilterArray;
1512
use crate::arrays::FilterVTable;
16-
use crate::arrays::ScalarFnArray;
1713
use crate::matchers::Exact;
1814
use crate::optimizer::rules::ArrayParentReduceRule;
1915
use crate::optimizer::rules::ParentRuleSet;
2016

21-
pub(super) const PARENT_RULES: ParentRuleSet<ExtensionVTable> = ParentRuleSet::new(&[
22-
ParentRuleSet::lift(&ExtensionFilterPushDownRule),
23-
ParentRuleSet::lift(&ExtensionScalarFnConstantPushDownRule),
24-
]);
17+
pub(super) const PARENT_RULES: ParentRuleSet<ExtensionVTable> =
18+
ParentRuleSet::new(&[ParentRuleSet::lift(&ExtensionFilterPushDownRule)]);
2519

2620
/// Push filter operations into the storage array of an extension array.
2721
#[derive(Debug)]
@@ -51,68 +45,6 @@ impl ArrayParentReduceRule<ExtensionVTable> for ExtensionFilterPushDownRule {
5145
}
5246
}
5347

54-
/// Push scalar function operations into the storage array when the other operand is a constant
55-
/// with the same extension type.
56-
#[derive(Debug)]
57-
struct ExtensionScalarFnConstantPushDownRule;
58-
59-
impl ArrayParentReduceRule<ExtensionVTable> for ExtensionScalarFnConstantPushDownRule {
60-
type Parent = AnyScalarFn;
61-
62-
fn parent(&self) -> Self::Parent {
63-
AnyScalarFn
64-
}
65-
66-
fn reduce_parent(
67-
&self,
68-
child: &ExtensionArray,
69-
parent: &ScalarFnArray,
70-
child_idx: usize,
71-
) -> VortexResult<Option<ArrayRef>> {
72-
// Check that all other children are constants with matching extension types.
73-
for (idx, sibling) in parent.children().iter().enumerate() {
74-
if idx == child_idx {
75-
continue;
76-
}
77-
78-
// Sibling must be a constant.
79-
let Some(const_array) = sibling.as_opt::<ConstantVTable>() else {
80-
return Ok(None);
81-
};
82-
83-
// Sibling must be an extension scalar with the same extension type.
84-
let Some(ext_scalar) = const_array.scalar().as_extension_opt() else {
85-
return Ok(None);
86-
};
87-
88-
// ExtDType::eq_ignore_nullability checks id, metadata, and storage dtype
89-
if !ext_scalar
90-
.ext_dtype()
91-
.eq_ignore_nullability(child.ext_dtype())
92-
{
93-
return Ok(None);
94-
}
95-
}
96-
97-
// Build new children with storage arrays/scalars.
98-
let mut new_children = Vec::with_capacity(parent.children().len());
99-
for (idx, sibling) in parent.children().iter().enumerate() {
100-
if idx == child_idx {
101-
new_children.push(child.storage().clone());
102-
} else {
103-
let const_array = sibling.as_::<ConstantVTable>();
104-
let storage_scalar = const_array.scalar().as_extension().storage();
105-
new_children.push(ConstantArray::new(storage_scalar, child.len()).into_array());
106-
}
107-
}
108-
109-
Ok(Some(
110-
ScalarFnArray::try_new(parent.scalar_fn().clone(), new_children, child.len())?
111-
.into_array(),
112-
))
113-
}
114-
}
115-
11648
#[cfg(test)]
11749
mod tests {
11850
use std::sync::Arc;
@@ -134,7 +66,6 @@ mod tests {
13466
use crate::arrays::ExtensionVTable;
13567
use crate::arrays::FilterArray;
13668
use crate::arrays::PrimitiveArray;
137-
use crate::arrays::PrimitiveVTable;
13869
use crate::arrays::ScalarFnArrayExt;
13970
use crate::expr::Binary;
14071
use crate::expr::Operator;
@@ -202,52 +133,6 @@ mod tests {
202133
assert_eq!(canonical.len(), 3);
203134
}
204135

205-
#[test]
206-
fn test_scalar_fn_constant_pushdown_comparison() {
207-
let ext_dtype = test_ext_dtype();
208-
let storage = buffer![10i64, 20, 30, 40, 50].into_array();
209-
let ext_array = ExtensionArray::new(ext_dtype.clone(), storage).into_array();
210-
211-
// Create a constant extension scalar with value 25
212-
let const_scalar = Scalar::extension(ext_dtype, Scalar::from(25i64));
213-
let const_array = ConstantArray::new(const_scalar, 5).into_array();
214-
215-
// Create a binary comparison: ext_array < const_array
216-
let scalar_fn_array = Binary
217-
.try_new_array(5, Operator::Lt, [ext_array, const_array])
218-
.unwrap();
219-
220-
// Optimize should push down the comparison to storage
221-
let optimized = scalar_fn_array.optimize().unwrap();
222-
223-
// The result should still be a ScalarFnArray but operating on primitive storage
224-
let scalar_fn = optimized.as_opt::<crate::arrays::ScalarFnVTable>();
225-
assert!(
226-
scalar_fn.is_some(),
227-
"Expected ScalarFnArray after optimization"
228-
);
229-
230-
// The children should now be primitives, not extensions
231-
let children = scalar_fn.unwrap().children();
232-
assert_eq!(children.len(), 2);
233-
234-
// First child should be the primitive storage
235-
assert!(
236-
children[0].as_opt::<PrimitiveVTable>().is_some(),
237-
"Expected first child to be PrimitiveArray, got {}",
238-
children[0].encoding_id()
239-
);
240-
241-
// Second child should be a constant with primitive value
242-
assert!(
243-
children[1]
244-
.as_opt::<crate::arrays::ConstantVTable>()
245-
.is_some(),
246-
"Expected second child to be ConstantArray, got {}",
247-
children[1].encoding_id()
248-
);
249-
}
250-
251136
#[test]
252137
fn test_scalar_fn_no_pushdown_different_ext_types() {
253138
let ext_dtype1 = Arc::new(ExtDType::new(

vortex-array/src/arrays/struct_/vtable/rules.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
use vortex_error::VortexResult;
5+
use vortex_error::vortex_err;
56

67
use crate::ArrayRef;
78
use crate::IntoArray;
@@ -49,13 +50,18 @@ impl ArrayParentReduceRule<StructVTable> for StructCastPushDownRule {
4950
new_fields.push(field_array.cast(field_dtype)?)
5051
}
5152

53+
let validity = if parent.options.is_nullable() {
54+
array.validity().clone().into_nullable()
55+
} else {
56+
array
57+
.validity()
58+
.clone()
59+
.into_non_nullable(array.len)
60+
.ok_or_else(|| vortex_err!("Failed to cast nullable struct to non-nullable"))?
61+
};
62+
5263
let new_struct = unsafe {
53-
StructArray::new_unchecked(
54-
new_fields,
55-
target_fields.clone(),
56-
array.len(),
57-
array.validity().clone(),
58-
)
64+
StructArray::new_unchecked(new_fields, target_fields.clone(), array.len(), validity)
5965
};
6066

6167
Ok(Some(new_struct.into_array()))

vortex-array/src/arrow/executor/list.rs

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,23 @@
44
use std::any::type_name;
55
use std::sync::Arc;
66

7-
use arrow_array::Array;
87
use arrow_array::ArrayRef as ArrowArrayRef;
98
use arrow_array::GenericListArray;
109
use arrow_array::OffsetSizeTrait;
11-
use arrow_schema::DataType;
1210
use arrow_schema::FieldRef;
1311
use vortex_buffer::BufferMut;
14-
use vortex_compute::arrow::IntoArrow;
12+
use vortex_compute::cast::Cast;
1513
use vortex_dtype::DType;
1614
use vortex_dtype::NativePType;
1715
use vortex_dtype::Nullability;
1816
use vortex_dtype::PTypeDowncastExt;
19-
use vortex_error::VortexError;
2017
use vortex_error::VortexExpect;
2118
use vortex_error::VortexResult;
2219
use vortex_error::vortex_ensure;
2320
use vortex_error::vortex_err;
2421
use vortex_session::VortexSession;
2522

23+
use crate::Array;
2624
use crate::ArrayRef;
2725
use crate::IntoArray;
2826
use crate::VectorExecutor;
@@ -35,6 +33,7 @@ use crate::arrow::ArrowArrayExecutor;
3533
use crate::arrow::executor::validity::to_arrow_null_buffer;
3634
use crate::builtins::ArrayBuiltins;
3735
use crate::validity::Validity;
36+
use crate::vectors::VectorIntoArray;
3837
use crate::vtable::ValidityHelper;
3938

4039
/// Convert a Vortex array into an Arrow GenericBinaryArray.
@@ -64,12 +63,32 @@ pub(super) fn to_arrow_list<O: OffsetSizeTrait + NativePType>(
6463
// In other words, check that offsets + sizes are monotonically increasing.
6564

6665
// Otherwise, we execute the array to become a ListViewVector.
67-
let list_view = array.execute_vector(session)?.into_arrow()?;
68-
match O::IS_LARGE {
69-
true => arrow_cast::cast(&list_view, &DataType::LargeList(elements_field.clone())),
70-
false => arrow_cast::cast(&list_view, &DataType::List(elements_field.clone())),
71-
}
72-
.map_err(VortexError::from)
66+
let elements_dtype = array
67+
.dtype()
68+
.as_list_element_opt()
69+
.ok_or_else(|| vortex_err!("Cannot convert non-list array to Arrow ListArray"))?;
70+
let list_view = array.execute_vector(session)?.into_list();
71+
let (elements, offsets, sizes, validity) = list_view.into_parts();
72+
let offset_dtype = DType::Primitive(O::PTYPE, Nullability::NonNullable);
73+
let list_view = unsafe {
74+
ListViewArray::new_unchecked(
75+
(*elements).clone().into_array(elements_dtype),
76+
offsets.cast(&offset_dtype)?.into_array(&offset_dtype),
77+
sizes.cast(&offset_dtype)?.into_array(&offset_dtype),
78+
Validity::from_mask(validity, array.dtype().nullability()),
79+
)
80+
};
81+
82+
list_view_to_list::<O>(list_view, elements_field, session)
83+
84+
// FIXME(ngates): we need this PR from arrow-rs:
85+
// https://github.com/apache/arrow-rs/pull/8735
86+
// let list_view = array.execute_vector(session)?.into_arrow()?;
87+
// match O::IS_LARGE {
88+
// true => arrow_cast::cast(&list_view, &DataType::LargeList(elements_field.clone())),
89+
// false => arrow_cast::cast(&list_view, &DataType::List(elements_field.clone())),
90+
// }
91+
// .map_err(VortexError::from)
7392
}
7493

7594
/// Convert a Vortex VarBinArray into an Arrow GenericBinaryArray.
@@ -203,6 +222,7 @@ fn list_view_to_list<O: OffsetSizeTrait + NativePType>(
203222
}
204223
new_offsets.push(O::usize_as(take_indices.len()));
205224
}
225+
assert_eq!(new_offsets.len(), offsets.len() + 1);
206226

207227
// Now we can "take" the elements using the computed indices.
208228
let elements =
@@ -214,20 +234,11 @@ fn list_view_to_list<O: OffsetSizeTrait + NativePType>(
214234
"Cannot convert to non-nullable Arrow array with null elements"
215235
);
216236

217-
// We need to compute the final offsets from the sizes.
218-
let mut final_offsets = Vec::with_capacity(sizes.len() + 1);
219-
final_offsets.push(O::usize_as(0));
220-
for i in 0..sizes.len() {
221-
let last_offset = final_offsets[i].as_usize();
222-
let size = sizes[i].as_usize();
223-
final_offsets.push(O::usize_as(last_offset + size));
224-
}
225-
226237
let null_buffer = to_arrow_null_buffer(&validity, sizes.len(), session)?;
227238

228239
Ok(Arc::new(GenericListArray::<O>::new(
229240
elements_field.clone(),
230-
offsets.into_arrow_offset_buffer(),
241+
new_offsets.freeze().into_arrow_offset_buffer(),
231242
elements,
232243
null_buffer,
233244
)))

vortex-array/src/arrow/executor/mod.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,13 @@ use arrow_array::cast::AsArray;
2222
use arrow_array::types::*;
2323
use arrow_schema::DataType;
2424
use arrow_schema::Schema;
25+
use itertools::Itertools;
2526
use vortex_error::VortexResult;
2627
use vortex_error::vortex_bail;
28+
use vortex_error::vortex_ensure;
2729
use vortex_session::VortexSession;
2830

31+
use crate::Array;
2932
use crate::ArrayRef;
3033
use crate::arrow::executor::bool::to_arrow_bool;
3134
use crate::arrow::executor::byte::to_arrow_byte_array;
@@ -61,6 +64,13 @@ pub trait ArrowArrayExecutor: Sized {
6164
let array = self.execute_arrow(&DataType::Struct(schema.fields.clone()), session)?;
6265
Ok(RecordBatch::from(array.as_struct()))
6366
}
67+
68+
/// Execute the array to produce Arrow `RecordBatch`'s with the given schema.
69+
fn execute_record_batches(
70+
self,
71+
schema: &Schema,
72+
session: &VortexSession,
73+
) -> VortexResult<Vec<RecordBatch>>;
6474
}
6575

6676
impl ArrowArrayExecutor for ArrayRef {
@@ -69,7 +79,9 @@ impl ArrowArrayExecutor for ArrayRef {
6979
data_type: &DataType,
7080
session: &VortexSession,
7181
) -> VortexResult<ArrowArrayRef> {
72-
match data_type {
82+
let len = self.len();
83+
84+
let arrow = match data_type {
7385
DataType::Null => to_arrow_null(self, session),
7486
DataType::Boolean => to_arrow_bool(self, session),
7587
DataType::Int8 => to_arrow_primitive::<Int8Type>(self, session),
@@ -133,6 +145,24 @@ impl ArrowArrayExecutor for ArrayRef {
133145
| DataType::Union(..) => {
134146
vortex_bail!("Conversion to Arrow type {data_type} is not supported");
135147
}
136-
}
148+
}?;
149+
150+
vortex_ensure!(
151+
arrow.len() == len,
152+
"Arrow array length does not match Vortex array length after conversion to {:?}",
153+
arrow
154+
);
155+
156+
Ok(arrow)
157+
}
158+
159+
fn execute_record_batches(
160+
self,
161+
schema: &Schema,
162+
session: &VortexSession,
163+
) -> VortexResult<Vec<RecordBatch>> {
164+
self.to_array_iterator()
165+
.map(|a| a?.execute_record_batch(schema, session))
166+
.try_collect()
137167
}
138168
}

vortex-array/src/arrow/executor/struct_.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ use vortex_session::VortexSession;
1919

2020
use crate::Array;
2121
use crate::ArrayRef;
22+
use crate::IntoArray;
23+
use crate::ToCanonical;
2224
use crate::VectorExecutor;
25+
use crate::arrays::ChunkedVTable;
2326
use crate::arrays::ScalarFnVTable;
2427
use crate::arrays::StructVTable;
2528
use crate::arrow::ArrowArrayExecutor;
@@ -35,7 +38,17 @@ pub(super) fn to_arrow_struct(
3538
) -> VortexResult<ArrowArrayRef> {
3639
let len = array.len();
3740

38-
// First, we attempt to short-circuit if the array is already a StructVTable:
41+
// If the array is chunked, then we invert the chunk-of-struct to struct-of-chunk.
42+
let array = match array.try_into::<ChunkedVTable>() {
43+
Ok(array) => {
44+
// NOTE(ngates): this currently uses the old into_canonical code path, but we should
45+
// just call directly into the swizzle-chunks function.
46+
array.to_struct().into_array()
47+
}
48+
Err(array) => array,
49+
};
50+
51+
// Attempt to short-circuit if the array is already a StructVTable:
3952
let array = match array.try_into::<StructVTable>() {
4053
Ok(array) => {
4154
let validity = to_arrow_null_buffer(array.validity(), array.len(), session)?;

0 commit comments

Comments
 (0)