Skip to content

Commit fb96a1d

Browse files
fix[array]: correct the handling of to_arrow execution and struct casting (#5867)
Expand nullable fields missing in the struct --------- Signed-off-by: Joe Isaacs <[email protected]>
1 parent eb5c300 commit fb96a1d

File tree

6 files changed

+91
-26
lines changed

6 files changed

+91
-26
lines changed

vortex-array/src/arrays/scalar_fn/rules.rs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ use crate::arrays::FilterArray;
2424
use crate::arrays::FilterVTable;
2525
use crate::arrays::ScalarFnArray;
2626
use crate::arrays::ScalarFnVTable;
27+
use crate::arrays::StructArray;
2728
use crate::expr::ExecutionArgs;
29+
use crate::expr::Pack;
2830
use crate::expr::ReduceCtx;
2931
use crate::expr::ReduceNode;
3032
use crate::expr::ReduceNodeRef;
@@ -34,13 +36,43 @@ use crate::optimizer::rules::ArrayParentReduceRule;
3436
use crate::optimizer::rules::ArrayReduceRule;
3537
use crate::optimizer::rules::ParentRuleSet;
3638
use crate::optimizer::rules::ReduceRuleSet;
39+
use crate::validity::Validity;
3740

38-
pub(super) const RULES: ReduceRuleSet<ScalarFnVTable> =
39-
ReduceRuleSet::new(&[&ScalarFnConstantRule, &ScalarFnAbstractReduceRule]);
41+
pub(super) const RULES: ReduceRuleSet<ScalarFnVTable> = ReduceRuleSet::new(&[
42+
&ScalarFnPackToStructRule,
43+
&ScalarFnConstantRule,
44+
&ScalarFnAbstractReduceRule,
45+
]);
4046

4147
pub(super) const PARENT_RULES: ParentRuleSet<ScalarFnVTable> =
4248
ParentRuleSet::new(&[ParentRuleSet::lift(&ScalarFnUnaryFilterPushDownRule)]);
4349

50+
/// Converts a ScalarFnArray with Pack into a StructArray directly.
51+
#[derive(Debug)]
52+
struct ScalarFnPackToStructRule;
53+
impl ArrayReduceRule<ScalarFnVTable> for ScalarFnPackToStructRule {
54+
fn reduce(&self, array: &ScalarFnArray) -> VortexResult<Option<ArrayRef>> {
55+
let Some(pack_options) = array.scalar_fn.as_opt::<Pack>() else {
56+
return Ok(None);
57+
};
58+
59+
let validity = match pack_options.nullability {
60+
vortex_dtype::Nullability::NonNullable => Validity::NonNullable,
61+
vortex_dtype::Nullability::Nullable => Validity::AllValid,
62+
};
63+
64+
Ok(Some(
65+
StructArray::try_new(
66+
pack_options.names.clone(),
67+
array.children.clone(),
68+
array.len,
69+
validity,
70+
)?
71+
.into_array(),
72+
))
73+
}
74+
}
75+
4476
#[derive(Debug)]
4577
struct ScalarFnConstantRule;
4678
impl ArrayReduceRule<ScalarFnVTable> for ScalarFnConstantRule {

vortex-array/src/arrays/struct_/vtable/rules.rs

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
use vortex_error::VortexResult;
5+
use vortex_error::vortex_ensure;
56
use vortex_error::vortex_err;
67

78
use crate::ArrayRef;
@@ -27,7 +28,12 @@ pub(super) const PARENT_RULES: ParentRuleSet<StructVTable> = ParentRuleSet::new(
2728
ParentRuleSet::lift(&StructGetItemRule),
2829
]);
2930

30-
/// Rule to push down cast into struct fields
31+
/// Rule to push down cast into struct fields.
32+
///
33+
/// TODO(joe/rob): should be have this in casts.
34+
///
35+
/// This rule supports schema evolution by allowing new nullable fields to be added
36+
/// at the end of the struct, filled with null values.
3137
#[derive(Debug)]
3238
struct StructCastPushDownRule;
3339
impl ArrayParentReduceRule<StructVTable> for StructCastPushDownRule {
@@ -44,10 +50,38 @@ impl ArrayParentReduceRule<StructVTable> for StructCastPushDownRule {
4450
_child_idx: usize,
4551
) -> VortexResult<Option<ArrayRef>> {
4652
let target_fields = parent.options.as_struct_fields();
53+
let source_field_count = array.fields.len();
54+
let target_field_count = target_fields.nfields();
4755

48-
let mut new_fields = Vec::with_capacity(target_fields.nfields());
49-
for (field_array, field_dtype) in array.fields.iter().zip(target_fields.fields()) {
50-
new_fields.push(field_array.cast(field_dtype)?)
56+
// Target must have at least as many fields as source
57+
vortex_ensure!(
58+
target_field_count >= source_field_count,
59+
"Cannot cast struct: target has fewer fields ({}) than source ({})",
60+
target_field_count,
61+
source_field_count
62+
);
63+
64+
let mut new_fields = Vec::with_capacity(target_field_count);
65+
66+
// Cast existing source fields to target types
67+
for (field_array, field_dtype) in array
68+
.fields
69+
.iter()
70+
.zip(target_fields.fields().take(source_field_count))
71+
{
72+
new_fields.push(field_array.cast(field_dtype)?);
73+
}
74+
75+
// Add null arrays for any extra target fields (schema evolution)
76+
for field_dtype in target_fields.fields().skip(source_field_count) {
77+
vortex_ensure!(
78+
field_dtype.is_nullable(),
79+
"Cannot add non-nullable field during struct cast (schema evolution only supports nullable fields)"
80+
);
81+
new_fields.push(
82+
ConstantArray::new(vortex_scalar::Scalar::null(field_dtype), array.len())
83+
.into_array(),
84+
);
5185
}
5286

5387
let validity = if parent.options.is_nullable() {

vortex-array/src/arrow/executor/byte.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
use std::sync::Arc;
55

66
use arrow_array::ArrayRef as ArrowArrayRef;
7-
use arrow_array::GenericBinaryArray;
7+
use arrow_array::GenericByteArray;
88
use arrow_array::types::ByteArrayType;
99
use vortex_compute::arrow::IntoArrow;
1010
use vortex_dtype::DType;
@@ -62,8 +62,7 @@ where
6262
let data = array.bytes().clone().into_arrow_buffer();
6363

6464
let null_buffer = to_arrow_null_buffer(array.validity(), array.len(), session)?;
65-
6665
Ok(Arc::new(unsafe {
67-
GenericBinaryArray::<T::Offset>::new_unchecked(offsets, data, null_buffer)
66+
GenericByteArray::<T>::new_unchecked(offsets, data, null_buffer)
6867
}))
6968
}

vortex-array/src/arrow/executor/struct_.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use arrow_array::StructArray;
88
use arrow_buffer::NullBuffer;
99
use arrow_schema::DataType;
1010
use arrow_schema::Fields;
11+
use itertools::Itertools;
1112
use vortex_compute::arrow::IntoArrow;
1213
use vortex_dtype::DType;
1314
use vortex_dtype::StructFields;
@@ -93,7 +94,7 @@ fn create_from_fields(
9394
session: &VortexSession,
9495
) -> VortexResult<ArrowArrayRef> {
9596
let mut arrow_fields = Vec::with_capacity(vortex_fields.len());
96-
for (field, vx_field) in fields.iter().zip(vortex_fields.into_iter()) {
97+
for (field, vx_field) in fields.iter().zip_eq(vortex_fields.into_iter()) {
9798
let arrow_field = vx_field.execute_arrow(field.data_type(), session)?;
9899
vortex_ensure!(
99100
field.is_nullable() || arrow_field.null_count() == 0,

vortex-datafusion/src/persistent/opener.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use futures::stream;
2929
use object_store::ObjectStore;
3030
use object_store::path::Path;
3131
use tracing::Instrument;
32+
use vortex::array::Array;
3233
use vortex::array::ArrayRef;
3334
use vortex::array::arrow::ArrowArrayExecutor;
3435
use vortex::dtype::FieldName;
@@ -111,10 +112,9 @@ impl FileOpener for VortexOpener {
111112
Some(indices) => Arc::new(table_schema.file_schema().project(indices)?),
112113
};
113114

114-
let schema_adapter = self.schema_adapter_factory.create(
115-
projected_schema.clone(),
116-
table_schema.table_schema().clone(),
117-
);
115+
let schema_adapter = self
116+
.schema_adapter_factory
117+
.create(projected_schema, table_schema.table_schema().clone());
118118

119119
// Update partition column access in the filter to use literals instead
120120
let partition_fields = self.table_schema.table_partition_cols().clone();
@@ -295,7 +295,8 @@ impl FileOpener for VortexOpener {
295295
.with_ordered(has_output_ordering)
296296
.map(move |chunk| {
297297
if *USE_VORTEX_OPERATORS {
298-
chunk.execute_record_batch(&projected_schema, &chunk_session)
298+
let schema = chunk.dtype().to_arrow_schema()?;
299+
chunk.execute_record_batch(&schema, &chunk_session)
299300
} else {
300301
RecordBatch::try_from(chunk.as_ref())
301302
}
@@ -809,11 +810,9 @@ mod tests {
809810
// struct column.
810811
let data = opener
811812
.open(PartitionedFile::new(file_path.to_string(), data_size))?
812-
.await
813-
.unwrap()
813+
.await?
814814
.try_collect::<Vec<_>>()
815-
.await
816-
.unwrap();
815+
.await?;
817816

818817
assert_eq!(data.len(), 1);
819818
assert_eq!(data[0].num_rows(), 3);

vortex-scan/src/arrow.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -191,24 +191,22 @@ mod tests {
191191
}
192192

193193
#[test]
194-
fn test_record_batch_iterator_adapter() {
194+
fn test_record_batch_iterator_adapter() -> VortexResult<()> {
195195
let schema = create_arrow_schema();
196196
let batch1 = RecordBatch::try_new(
197197
schema.clone(),
198198
vec![
199199
Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrowArrayRef,
200200
Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob")])) as ArrowArrayRef,
201201
],
202-
)
203-
.unwrap();
202+
)?;
204203
let batch2 = RecordBatch::try_new(
205204
schema.clone(),
206205
vec![
207206
Arc::new(Int32Array::from(vec![None, Some(4)])) as ArrowArrayRef,
208207
Arc::new(StringArray::from(vec![Some("Charlie"), None])) as ArrowArrayRef,
209208
],
210-
)
211-
.unwrap();
209+
)?;
212210

213211
let iter = vec![Ok(batch1), Ok(batch2)].into_iter();
214212
let mut adapter = RecordBatchIteratorAdapter {
@@ -220,13 +218,15 @@ mod tests {
220218
assert_eq!(adapter.schema(), schema);
221219

222220
// Test Iterator trait
223-
let first = adapter.next().unwrap().unwrap();
221+
let first = adapter.next().unwrap()?;
224222
assert_eq!(first.num_rows(), 2);
225223

226-
let second = adapter.next().unwrap().unwrap();
224+
let second = adapter.next().unwrap()?;
227225
assert_eq!(second.num_rows(), 2);
228226

229227
assert!(adapter.next().is_none());
228+
229+
Ok(())
230230
}
231231

232232
#[test]

0 commit comments

Comments
 (0)