Skip to content

Commit a157b09

Browse files
committed
Add more arrow execution
Signed-off-by: Nicholas Gates <[email protected]>
1 parent e664eb8 commit a157b09

File tree

12 files changed

+195
-37
lines changed

12 files changed

+195
-37
lines changed

vortex-array/src/arrays/filter/array.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,8 @@ impl FilterArray {
3030
stats: ArrayStats::default(),
3131
}
3232
}
33+
34+
pub fn mask(&self) -> &Mask {
35+
&self.mask
36+
}
3337
}

vortex-array/src/arrays/scalar_fn/rules.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use std::any::Any;
55
use std::sync::Arc;
66

7+
use itertools::Itertools;
78
use vortex_dtype::DType;
89
use vortex_error::VortexExpect;
910
use vortex_error::VortexResult;
@@ -19,19 +20,28 @@ use crate::ArrayVisitor;
1920
use crate::IntoArray;
2021
use crate::arrays::ConstantArray;
2122
use crate::arrays::ConstantVTable;
23+
use crate::arrays::FilterArray;
24+
use crate::arrays::FilterVTable;
2225
use crate::arrays::ScalarFnArray;
2326
use crate::arrays::ScalarFnVTable;
2427
use crate::expr::ExecutionArgs;
28+
use crate::expr::ExecutionCost;
2529
use crate::expr::ReduceCtx;
2630
use crate::expr::ReduceNode;
2731
use crate::expr::ReduceNodeRef;
2832
use crate::expr::ScalarFn;
33+
use crate::optimizer::rules::ArrayParentReduceRule;
2934
use crate::optimizer::rules::ArrayReduceRule;
35+
use crate::optimizer::rules::Exact;
36+
use crate::optimizer::rules::ParentRuleSet;
3037
use crate::optimizer::rules::ReduceRuleSet;
3138

3239
pub(super) const RULES: ReduceRuleSet<ScalarFnVTable> =
3340
ReduceRuleSet::new(&[&ScalarFnConstantRule, &ScalarFnAbstractReduceRule]);
3441

42+
pub(super) const PARENT_RULES: ParentRuleSet<ScalarFnVTable> =
43+
ParentRuleSet::new(&[ParentRuleSet::lift(&ScalarFnFilterPushDownRule)]);
44+
3545
#[derive(Debug)]
3646
struct ScalarFnConstantRule;
3747
impl ArrayReduceRule<ScalarFnVTable> for ScalarFnConstantRule {
@@ -149,3 +159,39 @@ impl ReduceCtx for ArrayReduceCtx {
149159
))
150160
}
151161
}
162+
163+
#[derive(Debug)]
164+
struct ScalarFnFilterPushDownRule;
165+
impl ArrayParentReduceRule<ScalarFnVTable> for ScalarFnFilterPushDownRule {
166+
type Parent = Exact<FilterVTable>;
167+
168+
fn parent(&self) -> Self::Parent {
169+
Exact::from(&FilterVTable)
170+
}
171+
172+
fn reduce_parent(
173+
&self,
174+
child: &ScalarFnArray,
175+
parent: &FilterArray,
176+
_child_idx: usize,
177+
) -> VortexResult<Option<ArrayRef>> {
178+
// For metadata-only scalar functions, we can push the filter down
179+
if matches!(
180+
child.scalar_fn.execution_cost(),
181+
ExecutionCost::MetadataOnly
182+
) {
183+
let new_children: Vec<_> = child
184+
.children
185+
.iter()
186+
.map(|c| c.filter(parent.mask().clone()))
187+
.try_collect()?;
188+
189+
let new_array =
190+
ScalarFnArray::try_new(child.scalar_fn.clone(), new_children, parent.len())?;
191+
192+
return Ok(Some(new_array.into_array()));
193+
}
194+
195+
Ok(None)
196+
}
197+
}

vortex-array/src/arrays/scalar_fn/vtable/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::arrays::scalar_fn::array::ScalarFnArray;
2525
use crate::arrays::scalar_fn::kernel::KernelInput;
2626
use crate::arrays::scalar_fn::kernel::ScalarFnKernel;
2727
use crate::arrays::scalar_fn::metadata::ScalarFnMetadata;
28+
use crate::arrays::scalar_fn::rules::PARENT_RULES;
2829
use crate::arrays::scalar_fn::rules::RULES;
2930
use crate::expr;
3031
use crate::expr::ExprVTable;
@@ -167,6 +168,14 @@ impl VTable for ScalarFnVTable {
167168
fn reduce(array: &Self::Array) -> VortexResult<Option<ArrayRef>> {
168169
RULES.evaluate(array)
169170
}
171+
172+
fn reduce_parent(
173+
array: &Self::Array,
174+
parent: &ArrayRef,
175+
child_idx: usize,
176+
) -> VortexResult<Option<ArrayRef>> {
177+
PARENT_RULES.evaluate(array, parent, child_idx)
178+
}
170179
}
171180

172181
/// Array factory functions for scalar functions.

vortex-array/src/arrow/executor/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ mod primitive;
1212
mod run_end;
1313
mod struct_;
1414
mod temporal;
15+
mod validity;
1516

1617
use arrow_array::ArrayRef as ArrowArrayRef;
1718
use arrow_array::RecordBatch;

vortex-array/src/arrow/executor/struct_.rs

Lines changed: 67 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,70 +5,100 @@ use std::sync::Arc;
55

66
use arrow_array::ArrayRef as ArrowArrayRef;
77
use arrow_array::StructArray;
8+
use arrow_buffer::NullBuffer;
89
use arrow_schema::Fields;
910
use itertools::Itertools;
1011
use vortex_error::VortexResult;
12+
use vortex_error::vortex_bail;
1113
use vortex_error::vortex_ensure;
1214
use vortex_session::VortexSession;
1315

1416
use crate::Array;
1517
use crate::ArrayRef;
18+
use crate::arrays::ScalarFnVTable;
1619
use crate::arrays::StructVTable;
1720
use crate::arrow::ArrowArrayExecutor;
18-
use crate::arrow::null_buffer::to_null_buffer;
21+
use crate::arrow::executor::validity::to_arrow_null_buffer;
1922
use crate::builtins::ArrayBuiltins;
23+
use crate::expr::Pack;
24+
use crate::vtable::ValidityHelper;
2025

2126
pub(super) fn to_arrow_struct(
2227
array: ArrayRef,
2328
fields: &Fields,
2429
session: &VortexSession,
2530
) -> VortexResult<ArrowArrayRef> {
2631
let len = array.len();
27-
let validity = array.validity_mask();
2832

29-
let mut field_arrays = Vec::with_capacity(fields.len());
30-
31-
match array.try_into::<StructVTable>() {
33+
// First, we attempt to short-circuit if the array is already a StructVTable:
34+
let array = match array.try_into::<StructVTable>() {
3235
Ok(array) => {
33-
// If the array is already a struct type, then we can convert each field.
34-
for (field, child) in fields.iter().zip_eq(array.into_fields().into_iter()) {
35-
let field_array = child.execute_arrow(field.data_type(), session)?;
36-
vortex_ensure!(
37-
field.is_nullable() || field_array.null_count() == 0,
38-
"Cannot convert field '{}' to non-nullable Arrow field because it contains nulls",
39-
field.name()
40-
);
41-
field_arrays.push(field_array);
42-
}
36+
let validity = to_arrow_null_buffer(array.validity(), array.len(), session)?;
37+
return create_from_fields(fields, array.into_fields(), validity, len, session);
4338
}
44-
Err(array) => {
45-
// Otherwise, we have some options:
46-
// 1. Use get_item expression to extract each field? This is a bit sad because get_item
47-
// will perform the validity masking again.
48-
// 2. Execute a full struct vector. But this may do unnecessary work on fields that may
49-
// have a more direct conversion to the desired Arrow field type.
50-
// 3. Something else?
51-
//
52-
// For now, we go with option 1. Although we really ought to figure out CSE for this.
53-
for field in fields.iter() {
54-
let field_array = array
55-
.get_item(field.name().as_str())?
56-
.execute_arrow(field.data_type(), session)?;
57-
vortex_ensure!(
58-
field.is_nullable() || field_array.null_count() == 0,
59-
"Cannot convert field '{}' to non-nullable Arrow field because it contains nulls",
60-
field.name()
61-
);
62-
field_arrays.push(field_array);
63-
}
39+
Err(array) => array,
40+
};
41+
42+
// We can also short-circuit if the array is a `pack` scalar function:
43+
if let Some(array) = array.as_opt::<ScalarFnVTable>() {
44+
if let Some(_pack_options) = array.scalar_fn().as_opt::<Pack>() {
45+
return create_from_fields(
46+
fields,
47+
array.children().to_vec(),
48+
None, // Pack is never null,
49+
len,
50+
session,
51+
);
6452
}
6553
}
6654

55+
// Otherwise, we have some options:
56+
// 1. Use get_item expression to extract each field? This is a bit sad because get_item
57+
// will perform the validity masking again.
58+
// 2. Execute a full struct vector. But this may do unnecessary work on fields that may
59+
// have a more direct conversion to the desired Arrow field type.
60+
// 3. Something else?
61+
//
62+
// For now, we go with option 1. Although we really ought to figure out CSE for this.
63+
let field_arrays = fields
64+
.iter()
65+
.map(|f| array.get_item(f.name().as_str()))
66+
.try_collect()?;
67+
68+
if !array.all_valid() {
69+
// TODO(ngates): we should grab the nullability using the is_not_null expression.
70+
vortex_bail!(
71+
"Cannot convert nullable Struct array with nulls to Arrow\n{}",
72+
array.display_tree()
73+
);
74+
}
75+
76+
create_from_fields(fields, field_arrays, None, len, session)
77+
}
78+
79+
fn create_from_fields(
80+
fields: &Fields,
81+
vortex_fields: Vec<ArrayRef>,
82+
null_buffer: Option<NullBuffer>,
83+
len: usize,
84+
session: &VortexSession,
85+
) -> VortexResult<ArrowArrayRef> {
86+
let mut arrow_fields = Vec::with_capacity(vortex_fields.len());
87+
for (field, vx_field) in fields.iter().zip(vortex_fields.into_iter()) {
88+
let arrow_field = vx_field.execute_arrow(field.data_type(), session)?;
89+
vortex_ensure!(
90+
field.is_nullable() || arrow_field.null_count() == 0,
91+
"Cannot convert field '{}' to non-nullable Arrow field because it contains nulls",
92+
field.name()
93+
);
94+
arrow_fields.push(arrow_field);
95+
}
96+
6797
Ok(Arc::new(unsafe {
6898
StructArray::new_unchecked_with_length(
6999
fields.clone(),
70-
field_arrays.into(),
71-
to_null_buffer(validity),
100+
arrow_fields.into(),
101+
null_buffer,
72102
len,
73103
)
74104
}))
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use arrow_buffer::NullBuffer;
5+
use vortex_error::VortexResult;
6+
use vortex_session::VortexSession;
7+
8+
use crate::arrow::null_buffer::to_null_buffer;
9+
use crate::mask::MaskExecutor;
10+
use crate::validity::Validity;
11+
12+
pub(super) fn to_arrow_null_buffer(
13+
validity: &Validity,
14+
len: usize,
15+
session: &VortexSession,
16+
) -> VortexResult<Option<NullBuffer>> {
17+
Ok(match validity {
18+
Validity::NonNullable | Validity::AllValid => None,
19+
Validity::AllInvalid => Some(NullBuffer::new_null(len)),
20+
Validity::Array(array) => to_null_buffer(array.execute_mask(session)?),
21+
})
22+
}

vortex-array/src/expr/exprs/get_item.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::expr::Arity;
2525
use crate::expr::ChildName;
2626
use crate::expr::EmptyOptions;
2727
use crate::expr::ExecutionArgs;
28+
use crate::expr::ExecutionCost;
2829
use crate::expr::ExprId;
2930
use crate::expr::Expression;
3031
use crate::expr::Literal;
@@ -141,6 +142,10 @@ impl VTable for GetItem {
141142
}
142143
}
143144

145+
fn execution_cost(&self, _options: &Self::Options) -> ExecutionCost {
146+
ExecutionCost::MetadataOnly
147+
}
148+
144149
fn reduce(
145150
&self,
146151
field_name: &FieldName,

vortex-array/src/expr/exprs/merge.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::arrays::StructArray;
2525
use crate::expr::Arity;
2626
use crate::expr::ChildName;
2727
use crate::expr::ExecutionArgs;
28+
use crate::expr::ExecutionCost;
2829
use crate::expr::ExprId;
2930
use crate::expr::Expression;
3031
use crate::expr::GetItem;
@@ -93,6 +94,9 @@ impl VTable for Merge {
9394
}
9495
write!(f, ")")
9596
}
97+
fn execution_cost(&self, _options: &Self::Options) -> ExecutionCost {
98+
ExecutionCost::MetadataOnly
99+
}
96100

97101
fn return_dtype(&self, options: &Self::Options, arg_dtypes: &[DType]) -> VortexResult<DType> {
98102
let mut field_names = Vec::new();

vortex-array/src/expr/exprs/pack.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::arrays::StructArray;
2929
use crate::expr::Arity;
3030
use crate::expr::ChildName;
3131
use crate::expr::ExecutionArgs;
32+
use crate::expr::ExecutionCost;
3233
use crate::expr::ExprId;
3334
use crate::expr::Expression;
3435
use crate::expr::VTable;
@@ -116,6 +117,9 @@ impl VTable for Pack {
116117
}
117118
write!(f, "){}", options.nullability)
118119
}
120+
fn execution_cost(&self, _options: &Self::Options) -> ExecutionCost {
121+
ExecutionCost::MetadataOnly
122+
}
119123

120124
fn return_dtype(&self, options: &Self::Options, arg_dtypes: &[DType]) -> VortexResult<DType> {
121125
Ok(DType::Struct(

vortex-array/src/expr/exprs/select.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::ToCanonical;
2727
use crate::expr::Arity;
2828
use crate::expr::ChildName;
2929
use crate::expr::ExecutionArgs;
30+
use crate::expr::ExecutionCost;
3031
use crate::expr::ExprId;
3132
use crate::expr::SimplifyCtx;
3233
use crate::expr::VTable;
@@ -111,6 +112,9 @@ impl VTable for Select {
111112
}
112113
}
113114
}
115+
fn execution_cost(&self, _options: &Self::Options) -> ExecutionCost {
116+
ExecutionCost::MetadataOnly
117+
}
114118

115119
fn return_dtype(
116120
&self,

0 commit comments

Comments
 (0)