Skip to content

Commit 9a76489

Browse files
authored
Fix Arrow struct export (#5731)
And... * Adds unary filter push-down rule * Fixes ordering of filter + project in layout reader Signed-off-by: Nicholas Gates <[email protected]>
1 parent b7fb836 commit 9a76489

File tree

4 files changed

+80
-28
lines changed

4 files changed

+80
-28
lines changed

vortex-array/src/arrays/filter/array.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ impl FilterArray {
3131
}
3232
}
3333

34-
pub fn mask(&self) -> &Mask {
34+
/// The mask used to filter the child array.
35+
pub fn filter_mask(&self) -> &Mask {
3536
&self.mask
3637
}
3738
}

vortex-array/src/arrays/scalar_fn/rules.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,26 @@ use crate::ArrayVisitor;
1919
use crate::IntoArray;
2020
use crate::arrays::ConstantArray;
2121
use crate::arrays::ConstantVTable;
22+
use crate::arrays::FilterArray;
23+
use crate::arrays::FilterVTable;
2224
use crate::arrays::ScalarFnArray;
2325
use crate::arrays::ScalarFnVTable;
2426
use crate::expr::ExecutionArgs;
2527
use crate::expr::ReduceCtx;
2628
use crate::expr::ReduceNode;
2729
use crate::expr::ReduceNodeRef;
2830
use crate::expr::ScalarFn;
31+
use crate::optimizer::rules::ArrayParentReduceRule;
2932
use crate::optimizer::rules::ArrayReduceRule;
33+
use crate::optimizer::rules::Exact;
3034
use crate::optimizer::rules::ParentRuleSet;
3135
use crate::optimizer::rules::ReduceRuleSet;
3236

3337
pub(super) const RULES: ReduceRuleSet<ScalarFnVTable> =
3438
ReduceRuleSet::new(&[&ScalarFnConstantRule, &ScalarFnAbstractReduceRule]);
3539

36-
pub(super) const PARENT_RULES: ParentRuleSet<ScalarFnVTable> = ParentRuleSet::new(&[]);
40+
pub(super) const PARENT_RULES: ParentRuleSet<ScalarFnVTable> =
41+
ParentRuleSet::new(&[ParentRuleSet::lift(&ScalarFnUnaryFilterPushDownRule)]);
3742

3843
#[derive(Debug)]
3944
struct ScalarFnConstantRule;
@@ -152,3 +157,50 @@ impl ReduceCtx for ArrayReduceCtx {
152157
))
153158
}
154159
}
160+
161+
#[derive(Debug)]
162+
struct ScalarFnUnaryFilterPushDownRule;
163+
164+
impl ArrayParentReduceRule<ScalarFnVTable> for ScalarFnUnaryFilterPushDownRule {
165+
type Parent = Exact<FilterVTable>;
166+
167+
fn parent(&self) -> Self::Parent {
168+
Exact::from(&FilterVTable)
169+
}
170+
171+
fn reduce_parent(
172+
&self,
173+
child: &ScalarFnArray,
174+
parent: &FilterArray,
175+
_child_idx: usize,
176+
) -> VortexResult<Option<ArrayRef>> {
177+
// If we only have one non-constant child, then it is _always_ cheaper to push down the
178+
// filter over the children of the scalar function array.
179+
if child
180+
.children
181+
.iter()
182+
.filter(|c| !c.is::<ConstantVTable>())
183+
.count()
184+
== 1
185+
{
186+
let new_children: Vec<_> = child
187+
.children
188+
.iter()
189+
.map(|c| match c.as_opt::<ConstantVTable>() {
190+
Some(array) => {
191+
ConstantArray::new(array.scalar().clone(), parent.len()).into_array()
192+
}
193+
None => FilterArray::new(c.clone(), parent.filter_mask().clone()).into_array(),
194+
})
195+
.collect();
196+
197+
let new_array =
198+
ScalarFnArray::try_new(child.scalar_fn.clone(), new_children, parent.len())?
199+
.into_array();
200+
201+
return Ok(Some(new_array));
202+
}
203+
204+
Ok(None)
205+
}
206+
}

vortex-array/src/arrow/executor/struct_.rs

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,20 @@ use std::sync::Arc;
66
use arrow_array::ArrayRef as ArrowArrayRef;
77
use arrow_array::StructArray;
88
use arrow_buffer::NullBuffer;
9+
use arrow_schema::DataType;
910
use arrow_schema::Fields;
10-
use itertools::Itertools;
11+
use vortex_compute::arrow::IntoArrow;
12+
use vortex_dtype::DType;
13+
use vortex_dtype::StructFields;
14+
use vortex_dtype::arrow::FromArrowType;
15+
use vortex_error::VortexError;
1116
use vortex_error::VortexResult;
12-
use vortex_error::vortex_bail;
1317
use vortex_error::vortex_ensure;
1418
use vortex_session::VortexSession;
1519

1620
use crate::Array;
1721
use crate::ArrayRef;
22+
use crate::VectorExecutor;
1823
use crate::arrays::ScalarFnVTable;
1924
use crate::arrays::StructVTable;
2025
use crate::arrow::ArrowArrayExecutor;
@@ -52,28 +57,19 @@ pub(super) fn to_arrow_struct(
5257
);
5358
}
5459

55-
// Otherwise, we have some options:
56-
// 1. Use get_item expression to extract each field? This is a bit sad because get_item
57-
// will perform the validity masking again.
58-
// 2. Execute a full struct vector. But this may do unnecessary work on fields that may
59-
// have a more direct conversion to the desired Arrow field type.
60-
// 3. Something else?
61-
//
62-
// For now, we go with option 1. Although we really ought to figure out CSE for this.
63-
let field_arrays = fields
64-
.iter()
65-
.map(|f| array.get_item(f.name().as_str()))
66-
.try_collect()?;
60+
// Otherwise, we fall back to executing the full struct vector.
61+
// First we apply a cast to ensure we push down casting where possible into the struct fields.
62+
let vx_fields = StructFields::from_arrow(fields);
63+
let array = array.cast(DType::Struct(
64+
vx_fields,
65+
vortex_dtype::Nullability::Nullable,
66+
))?;
6767

68-
if !array.all_valid() {
69-
// TODO(ngates): we should grab the nullability using the is_not_null expression.
70-
vortex_bail!(
71-
"Cannot convert nullable Struct array with nulls to Arrow\n{}",
72-
array.display_tree()
73-
);
74-
}
68+
let struct_array = array.execute_vector(session)?.into_struct().into_arrow()?;
7569

76-
create_from_fields(fields, field_arrays, None, len, session)
70+
// Finally, we cast to Arrow to ensure any types not representable by Vortex (e.g. Dictionary)
71+
// are properly converted.
72+
arrow_cast::cast(&struct_array, &DataType::Struct(fields.clone())).map_err(VortexError::from)
7773
}
7874

7975
fn create_from_fields(

vortex-layout/src/layouts/flat/reader.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -219,14 +219,17 @@ impl LayoutReader for FlatReader {
219219
}
220220

221221
Ok(if *USE_VORTEX_OPERATORS {
222-
// Evaluate the projection expression.
223-
array = array.apply(&expr)?;
224-
225-
// Filter the array based on the row mask.
222+
// First apply the filter to the array.
223+
// NOTE(ngates): we *must* filter first before applying the expression, as the
224+
// expression may depend on the filtered rows being removed e.g.
225+
// `CAST(a, u8) WHERE a < 256`
226226
if !mask.all_true() {
227227
array = array.filter(mask)?;
228228
}
229229

230+
// Evaluate the projection expression.
231+
array = array.apply(&expr)?;
232+
230233
array
231234
} else {
232235
// Filter the array based on the row mask.

0 commit comments

Comments
 (0)