diff --git a/vortex-array/src/arrays/scalar_fn/rules.rs b/vortex-array/src/arrays/scalar_fn/rules.rs index 201ef41342e..af96cbc5533 100644 --- a/vortex-array/src/arrays/scalar_fn/rules.rs +++ b/vortex-array/src/arrays/scalar_fn/rules.rs @@ -24,7 +24,9 @@ use crate::arrays::FilterArray; use crate::arrays::FilterVTable; use crate::arrays::ScalarFnArray; use crate::arrays::ScalarFnVTable; +use crate::arrays::StructArray; use crate::expr::ExecutionArgs; +use crate::expr::Pack; use crate::expr::ReduceCtx; use crate::expr::ReduceNode; use crate::expr::ReduceNodeRef; @@ -34,13 +36,43 @@ use crate::optimizer::rules::ArrayParentReduceRule; use crate::optimizer::rules::ArrayReduceRule; use crate::optimizer::rules::ParentRuleSet; use crate::optimizer::rules::ReduceRuleSet; +use crate::validity::Validity; -pub(super) const RULES: ReduceRuleSet = - ReduceRuleSet::new(&[&ScalarFnConstantRule, &ScalarFnAbstractReduceRule]); +pub(super) const RULES: ReduceRuleSet = ReduceRuleSet::new(&[ + &ScalarFnPackToStructRule, + &ScalarFnConstantRule, + &ScalarFnAbstractReduceRule, +]); pub(super) const PARENT_RULES: ParentRuleSet = ParentRuleSet::new(&[ParentRuleSet::lift(&ScalarFnUnaryFilterPushDownRule)]); +/// Converts a ScalarFnArray with Pack into a StructArray directly. +#[derive(Debug)] +struct ScalarFnPackToStructRule; +impl ArrayReduceRule for ScalarFnPackToStructRule { + fn reduce(&self, array: &ScalarFnArray) -> VortexResult> { + let Some(pack_options) = array.scalar_fn.as_opt::() else { + return Ok(None); + }; + + let validity = match pack_options.nullability { + vortex_dtype::Nullability::NonNullable => Validity::NonNullable, + vortex_dtype::Nullability::Nullable => Validity::AllValid, + }; + + Ok(Some( + StructArray::try_new( + pack_options.names.clone(), + array.children.clone(), + array.len, + validity, + )? + .into_array(), + )) + } +} + #[derive(Debug)] struct ScalarFnConstantRule; impl ArrayReduceRule for ScalarFnConstantRule { diff --git a/vortex-array/src/arrays/struct_/vtable/rules.rs b/vortex-array/src/arrays/struct_/vtable/rules.rs index 9a02c898f90..3f6bbbdb156 100644 --- a/vortex-array/src/arrays/struct_/vtable/rules.rs +++ b/vortex-array/src/arrays/struct_/vtable/rules.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use vortex_error::VortexResult; +use vortex_error::vortex_ensure; use vortex_error::vortex_err; use crate::ArrayRef; @@ -27,7 +28,12 @@ pub(super) const PARENT_RULES: ParentRuleSet = ParentRuleSet::new( ParentRuleSet::lift(&StructGetItemRule), ]); -/// Rule to push down cast into struct fields +/// Rule to push down cast into struct fields. +/// +/// TODO(joe/rob): should be have this in casts. +/// +/// This rule supports schema evolution by allowing new nullable fields to be added +/// at the end of the struct, filled with null values. #[derive(Debug)] struct StructCastPushDownRule; impl ArrayParentReduceRule for StructCastPushDownRule { @@ -44,10 +50,38 @@ impl ArrayParentReduceRule for StructCastPushDownRule { _child_idx: usize, ) -> VortexResult> { let target_fields = parent.options.as_struct_fields(); + let source_field_count = array.fields.len(); + let target_field_count = target_fields.nfields(); - let mut new_fields = Vec::with_capacity(target_fields.nfields()); - for (field_array, field_dtype) in array.fields.iter().zip(target_fields.fields()) { - new_fields.push(field_array.cast(field_dtype)?) + // Target must have at least as many fields as source + vortex_ensure!( + target_field_count >= source_field_count, + "Cannot cast struct: target has fewer fields ({}) than source ({})", + target_field_count, + source_field_count + ); + + let mut new_fields = Vec::with_capacity(target_field_count); + + // Cast existing source fields to target types + for (field_array, field_dtype) in array + .fields + .iter() + .zip(target_fields.fields().take(source_field_count)) + { + new_fields.push(field_array.cast(field_dtype)?); + } + + // Add null arrays for any extra target fields (schema evolution) + for field_dtype in target_fields.fields().skip(source_field_count) { + vortex_ensure!( + field_dtype.is_nullable(), + "Cannot add non-nullable field during struct cast (schema evolution only supports nullable fields)" + ); + new_fields.push( + ConstantArray::new(vortex_scalar::Scalar::null(field_dtype), array.len()) + .into_array(), + ); } let validity = if parent.options.is_nullable() { diff --git a/vortex-array/src/arrow/executor/byte.rs b/vortex-array/src/arrow/executor/byte.rs index 296eab3a8fa..63823377bd2 100644 --- a/vortex-array/src/arrow/executor/byte.rs +++ b/vortex-array/src/arrow/executor/byte.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use arrow_array::ArrayRef as ArrowArrayRef; -use arrow_array::GenericBinaryArray; +use arrow_array::GenericByteArray; use arrow_array::types::ByteArrayType; use vortex_compute::arrow::IntoArrow; use vortex_dtype::DType; @@ -62,8 +62,7 @@ where let data = array.bytes().clone().into_arrow_buffer(); let null_buffer = to_arrow_null_buffer(array.validity(), array.len(), session)?; - Ok(Arc::new(unsafe { - GenericBinaryArray::::new_unchecked(offsets, data, null_buffer) + GenericByteArray::::new_unchecked(offsets, data, null_buffer) })) } diff --git a/vortex-array/src/arrow/executor/struct_.rs b/vortex-array/src/arrow/executor/struct_.rs index b0346b8ed20..bfdfcf55fc6 100644 --- a/vortex-array/src/arrow/executor/struct_.rs +++ b/vortex-array/src/arrow/executor/struct_.rs @@ -8,6 +8,7 @@ use arrow_array::StructArray; use arrow_buffer::NullBuffer; use arrow_schema::DataType; use arrow_schema::Fields; +use itertools::Itertools; use vortex_compute::arrow::IntoArrow; use vortex_dtype::DType; use vortex_dtype::StructFields; @@ -93,7 +94,7 @@ fn create_from_fields( session: &VortexSession, ) -> VortexResult { let mut arrow_fields = Vec::with_capacity(vortex_fields.len()); - for (field, vx_field) in fields.iter().zip(vortex_fields.into_iter()) { + for (field, vx_field) in fields.iter().zip_eq(vortex_fields.into_iter()) { let arrow_field = vx_field.execute_arrow(field.data_type(), session)?; vortex_ensure!( field.is_nullable() || arrow_field.null_count() == 0, diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 91755e7a92f..5553ef994f8 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -29,6 +29,7 @@ use futures::stream; use object_store::ObjectStore; use object_store::path::Path; use tracing::Instrument; +use vortex::array::Array; use vortex::array::ArrayRef; use vortex::array::arrow::ArrowArrayExecutor; use vortex::dtype::FieldName; @@ -111,10 +112,9 @@ impl FileOpener for VortexOpener { Some(indices) => Arc::new(table_schema.file_schema().project(indices)?), }; - let schema_adapter = self.schema_adapter_factory.create( - projected_schema.clone(), - table_schema.table_schema().clone(), - ); + let schema_adapter = self + .schema_adapter_factory + .create(projected_schema, table_schema.table_schema().clone()); // Update partition column access in the filter to use literals instead let partition_fields = self.table_schema.table_partition_cols().clone(); @@ -295,7 +295,8 @@ impl FileOpener for VortexOpener { .with_ordered(has_output_ordering) .map(move |chunk| { if *USE_VORTEX_OPERATORS { - chunk.execute_record_batch(&projected_schema, &chunk_session) + let schema = chunk.dtype().to_arrow_schema()?; + chunk.execute_record_batch(&schema, &chunk_session) } else { RecordBatch::try_from(chunk.as_ref()) } @@ -809,11 +810,9 @@ mod tests { // struct column. let data = opener .open(PartitionedFile::new(file_path.to_string(), data_size))? - .await - .unwrap() + .await? .try_collect::>() - .await - .unwrap(); + .await?; assert_eq!(data.len(), 1); assert_eq!(data[0].num_rows(), 3); diff --git a/vortex-scan/src/arrow.rs b/vortex-scan/src/arrow.rs index b050be8779f..6431898e7e3 100644 --- a/vortex-scan/src/arrow.rs +++ b/vortex-scan/src/arrow.rs @@ -191,7 +191,7 @@ mod tests { } #[test] - fn test_record_batch_iterator_adapter() { + fn test_record_batch_iterator_adapter() -> VortexResult<()> { let schema = create_arrow_schema(); let batch1 = RecordBatch::try_new( schema.clone(), @@ -199,16 +199,14 @@ mod tests { Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrowArrayRef, Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob")])) as ArrowArrayRef, ], - ) - .unwrap(); + )?; let batch2 = RecordBatch::try_new( schema.clone(), vec![ Arc::new(Int32Array::from(vec![None, Some(4)])) as ArrowArrayRef, Arc::new(StringArray::from(vec![Some("Charlie"), None])) as ArrowArrayRef, ], - ) - .unwrap(); + )?; let iter = vec![Ok(batch1), Ok(batch2)].into_iter(); let mut adapter = RecordBatchIteratorAdapter { @@ -220,13 +218,15 @@ mod tests { assert_eq!(adapter.schema(), schema); // Test Iterator trait - let first = adapter.next().unwrap().unwrap(); + let first = adapter.next().unwrap()?; assert_eq!(first.num_rows(), 2); - let second = adapter.next().unwrap().unwrap(); + let second = adapter.next().unwrap()?; assert_eq!(second.num_rows(), 2); assert!(adapter.next().is_none()); + + Ok(()) } #[test]