Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions vortex-array/src/arrays/scalar_fn/rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use crate::arrays::FilterArray;
use crate::arrays::FilterVTable;
use crate::arrays::ScalarFnArray;
use crate::arrays::ScalarFnVTable;
use crate::arrays::StructArray;
use crate::expr::ExecutionArgs;
use crate::expr::Pack;
use crate::expr::ReduceCtx;
use crate::expr::ReduceNode;
use crate::expr::ReduceNodeRef;
Expand All @@ -34,13 +36,43 @@ use crate::optimizer::rules::ArrayParentReduceRule;
use crate::optimizer::rules::ArrayReduceRule;
use crate::optimizer::rules::ParentRuleSet;
use crate::optimizer::rules::ReduceRuleSet;
use crate::validity::Validity;

pub(super) const RULES: ReduceRuleSet<ScalarFnVTable> =
ReduceRuleSet::new(&[&ScalarFnConstantRule, &ScalarFnAbstractReduceRule]);
pub(super) const RULES: ReduceRuleSet<ScalarFnVTable> = ReduceRuleSet::new(&[
&ScalarFnPackToStructRule,
&ScalarFnConstantRule,
&ScalarFnAbstractReduceRule,
]);

pub(super) const PARENT_RULES: ParentRuleSet<ScalarFnVTable> =
ParentRuleSet::new(&[ParentRuleSet::lift(&ScalarFnUnaryFilterPushDownRule)]);

/// Converts a ScalarFnArray with Pack into a StructArray directly.
#[derive(Debug)]
Copy link
Contributor

@0ax1 0ax1 Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is derive(debug) needed?

struct ScalarFnPackToStructRule;
impl ArrayReduceRule<ScalarFnVTable> for ScalarFnPackToStructRule {
fn reduce(&self, array: &ScalarFnArray) -> VortexResult<Option<ArrayRef>> {
let Some(pack_options) = array.scalar_fn.as_opt::<Pack>() else {
return Ok(None);
};

let validity = match pack_options.nullability {
vortex_dtype::Nullability::NonNullable => Validity::NonNullable,
vortex_dtype::Nullability::Nullable => Validity::AllValid,
};

Ok(Some(
StructArray::try_new(
pack_options.names.clone(),
array.children.clone(),
array.len,
validity,
)?
.into_array(),
))
}
}

#[derive(Debug)]
struct ScalarFnConstantRule;
impl ArrayReduceRule<ScalarFnVTable> for ScalarFnConstantRule {
Expand Down
42 changes: 38 additions & 4 deletions vortex-array/src/arrays/struct_/vtable/rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use vortex_error::VortexResult;
use vortex_error::vortex_ensure;
use vortex_error::vortex_err;

use crate::ArrayRef;
Expand All @@ -27,7 +28,12 @@ pub(super) const PARENT_RULES: ParentRuleSet<StructVTable> = ParentRuleSet::new(
ParentRuleSet::lift(&StructGetItemRule),
]);

/// Rule to push down cast into struct fields
/// Rule to push down cast into struct fields.
///
/// TODO(joe/rob): should be have this in casts.
///
/// This rule supports schema evolution by allowing new nullable fields to be added
/// at the end of the struct, filled with null values.
#[derive(Debug)]
struct StructCastPushDownRule;
impl ArrayParentReduceRule<StructVTable> for StructCastPushDownRule {
Expand All @@ -44,10 +50,38 @@ impl ArrayParentReduceRule<StructVTable> for StructCastPushDownRule {
_child_idx: usize,
) -> VortexResult<Option<ArrayRef>> {
let target_fields = parent.options.as_struct_fields();
let source_field_count = array.fields.len();
let target_field_count = target_fields.nfields();

let mut new_fields = Vec::with_capacity(target_fields.nfields());
for (field_array, field_dtype) in array.fields.iter().zip(target_fields.fields()) {
new_fields.push(field_array.cast(field_dtype)?)
// Target must have at least as many fields as source
vortex_ensure!(
target_field_count >= source_field_count,
"Cannot cast struct: target has fewer fields ({}) than source ({})",
target_field_count,
source_field_count
);

let mut new_fields = Vec::with_capacity(target_field_count);

// Cast existing source fields to target types
for (field_array, field_dtype) in array
.fields
.iter()
.zip(target_fields.fields().take(source_field_count))
{
new_fields.push(field_array.cast(field_dtype)?);
}

// Add null arrays for any extra target fields (schema evolution)
for field_dtype in target_fields.fields().skip(source_field_count) {
vortex_ensure!(
field_dtype.is_nullable(),
"Cannot add non-nullable field during struct cast (schema evolution only supports nullable fields)"
);
new_fields.push(
ConstantArray::new(vortex_scalar::Scalar::null(field_dtype), array.len())
.into_array(),
);
}

let validity = if parent.options.is_nullable() {
Expand Down
5 changes: 2 additions & 3 deletions vortex-array/src/arrow/executor/byte.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::sync::Arc;

use arrow_array::ArrayRef as ArrowArrayRef;
use arrow_array::GenericBinaryArray;
use arrow_array::GenericByteArray;
use arrow_array::types::ByteArrayType;
use vortex_compute::arrow::IntoArrow;
use vortex_dtype::DType;
Expand Down Expand Up @@ -62,8 +62,7 @@ where
let data = array.bytes().clone().into_arrow_buffer();

let null_buffer = to_arrow_null_buffer(array.validity(), array.len(), session)?;

Ok(Arc::new(unsafe {
GenericBinaryArray::<T::Offset>::new_unchecked(offsets, data, null_buffer)
GenericByteArray::<T>::new_unchecked(offsets, data, null_buffer)
}))
}
3 changes: 2 additions & 1 deletion vortex-array/src/arrow/executor/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use arrow_array::StructArray;
use arrow_buffer::NullBuffer;
use arrow_schema::DataType;
use arrow_schema::Fields;
use itertools::Itertools;
use vortex_compute::arrow::IntoArrow;
use vortex_dtype::DType;
use vortex_dtype::StructFields;
Expand Down Expand Up @@ -93,7 +94,7 @@ fn create_from_fields(
session: &VortexSession,
) -> VortexResult<ArrowArrayRef> {
let mut arrow_fields = Vec::with_capacity(vortex_fields.len());
for (field, vx_field) in fields.iter().zip(vortex_fields.into_iter()) {
for (field, vx_field) in fields.iter().zip_eq(vortex_fields.into_iter()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let arrow_field = vx_field.execute_arrow(field.data_type(), session)?;
vortex_ensure!(
field.is_nullable() || arrow_field.null_count() == 0,
Expand Down
17 changes: 8 additions & 9 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use futures::stream;
use object_store::ObjectStore;
use object_store::path::Path;
use tracing::Instrument;
use vortex::array::Array;
use vortex::array::ArrayRef;
use vortex::array::arrow::ArrowArrayExecutor;
use vortex::dtype::FieldName;
Expand Down Expand Up @@ -111,10 +112,9 @@ impl FileOpener for VortexOpener {
Some(indices) => Arc::new(table_schema.file_schema().project(indices)?),
};

let schema_adapter = self.schema_adapter_factory.create(
projected_schema.clone(),
table_schema.table_schema().clone(),
);
let schema_adapter = self
.schema_adapter_factory
.create(projected_schema, table_schema.table_schema().clone());

// Update partition column access in the filter to use literals instead
let partition_fields = self.table_schema.table_partition_cols().clone();
Expand Down Expand Up @@ -295,7 +295,8 @@ impl FileOpener for VortexOpener {
.with_ordered(has_output_ordering)
.map(move |chunk| {
if *USE_VORTEX_OPERATORS {
chunk.execute_record_batch(&projected_schema, &chunk_session)
let schema = chunk.dtype().to_arrow_schema()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can get the schema once from the scan and projection expression and use it here (I'm doing it in the DF52 PR, so not a blocker)

chunk.execute_record_batch(&schema, &chunk_session)
} else {
RecordBatch::try_from(chunk.as_ref())
}
Expand Down Expand Up @@ -809,11 +810,9 @@ mod tests {
// struct column.
let data = opener
.open(PartitionedFile::new(file_path.to_string(), data_size))?
.await
.unwrap()
.await?
.try_collect::<Vec<_>>()
.await
.unwrap();
.await?;

assert_eq!(data.len(), 1);
assert_eq!(data[0].num_rows(), 3);
Expand Down
14 changes: 7 additions & 7 deletions vortex-scan/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,24 +191,22 @@ mod tests {
}

#[test]
fn test_record_batch_iterator_adapter() {
fn test_record_batch_iterator_adapter() -> VortexResult<()> {
let schema = create_arrow_schema();
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrowArrayRef,
Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob")])) as ArrowArrayRef,
],
)
.unwrap();
)?;
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![None, Some(4)])) as ArrowArrayRef,
Arc::new(StringArray::from(vec![Some("Charlie"), None])) as ArrowArrayRef,
],
)
.unwrap();
)?;

let iter = vec![Ok(batch1), Ok(batch2)].into_iter();
let mut adapter = RecordBatchIteratorAdapter {
Expand All @@ -220,13 +218,15 @@ mod tests {
assert_eq!(adapter.schema(), schema);

// Test Iterator trait
let first = adapter.next().unwrap().unwrap();
let first = adapter.next().unwrap()?;
assert_eq!(first.num_rows(), 2);

let second = adapter.next().unwrap().unwrap();
let second = adapter.next().unwrap()?;
assert_eq!(second.num_rows(), 2);

assert!(adapter.next().is_none());

Ok(())
}

#[test]
Expand Down
Loading