Skip to content

Commit 6319414

Browse files
Remove default arrow filter impl. (#3256)
Make it explicit to register a kernel with that impl --------- Signed-off-by: Joe Isaacs <[email protected]>
1 parent c59a631 commit 6319414

File tree

9 files changed

+81
-135
lines changed

9 files changed

+81
-135
lines changed
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
use vortex_error::VortexResult;
2+
use vortex_mask::Mask;
3+
4+
use crate::arrays::ListEncoding;
5+
use crate::compute::{FilterKernel, FilterKernelAdapter, arrow_filter_fn};
6+
use crate::{ArrayRef, register_kernel};
7+
8+
impl FilterKernel for ListEncoding {
9+
fn filter(&self, array: &Self::Array, mask: &Mask) -> VortexResult<ArrayRef> {
10+
arrow_filter_fn(array, mask)
11+
}
12+
}
13+
14+
register_kernel!(FilterKernelAdapter(ListEncoding).lift());

vortex-array/src/arrays/list/compute/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
mod filter;
12
mod mask;
23

34
use vortex_error::VortexResult;
Lines changed: 0 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +0,0 @@
1-
use std::sync::Arc;
2-
3-
use arrow_array::ArrayRef;
4-
use arrow_schema::{DataType, Field, FieldRef};
5-
use vortex_dtype::PType;
6-
use vortex_error::{VortexResult, vortex_bail};
7-
8-
use crate::arrays::{ListArray, ListEncoding};
9-
use crate::arrow::IntoArrowArray;
10-
use crate::compute::{ToArrowFn, cast};
11-
use crate::{Array, ToCanonical};
12-
13-
impl ToArrowFn<&ListArray> for ListEncoding {
14-
fn to_arrow(&self, array: &ListArray, data_type: &DataType) -> VortexResult<Option<ArrayRef>> {
15-
let (cast_ptype, element_dtype) = match data_type {
16-
DataType::List(field) => (PType::I32, field.data_type()),
17-
DataType::LargeList(field) => (PType::I64, field.data_type()),
18-
_ => {
19-
vortex_bail!("Unsupported data type: {data_type}");
20-
}
21-
};
22-
23-
let offsets = array
24-
.offsets()
25-
.to_primitive()
26-
.map_err(|err| err.with_context("Failed to canonicalize offsets"))?;
27-
28-
let arrow_offsets = cast(&offsets, cast_ptype.into())
29-
.map_err(|err| err.with_context("Failed to cast offsets to PrimitiveArray"))?
30-
.to_primitive()?;
31-
32-
let values = array.elements().clone().into_arrow(element_dtype)?;
33-
34-
let field_ref = FieldRef::new(Field::new_list_field(
35-
values.data_type().clone(),
36-
array.elements().dtype().nullability().into(),
37-
));
38-
39-
let nulls = array.validity_mask()?.to_null_buffer();
40-
41-
Ok(Some(match arrow_offsets.ptype() {
42-
PType::I32 => Arc::new(arrow_array::ListArray::try_new(
43-
field_ref,
44-
arrow_offsets.buffer::<i32>().into_arrow_offset_buffer(),
45-
values,
46-
nulls,
47-
)?),
48-
PType::I64 => Arc::new(arrow_array::LargeListArray::try_new(
49-
field_ref,
50-
arrow_offsets.buffer::<i64>().into_arrow_offset_buffer(),
51-
values,
52-
nulls,
53-
)?),
54-
_ => vortex_bail!("Invalid offsets type {}", arrow_offsets.ptype()),
55-
}))
56-
}
57-
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
use vortex_error::VortexResult;
2+
use vortex_mask::Mask;
3+
4+
use crate::arrays::{StructArray, StructEncoding};
5+
use crate::compute::{FilterKernel, FilterKernelAdapter, filter};
6+
use crate::{Array, ArrayRef, register_kernel};
7+
8+
impl FilterKernel for StructEncoding {
9+
fn filter(&self, array: &StructArray, mask: &Mask) -> VortexResult<ArrayRef> {
10+
let validity = array.validity().filter(mask)?;
11+
12+
let fields: Vec<ArrayRef> = array
13+
.fields()
14+
.iter()
15+
.map(|field| filter(field, mask))
16+
.try_collect()?;
17+
let length = fields
18+
.first()
19+
.map(|a| a.len())
20+
.unwrap_or_else(|| mask.true_count());
21+
22+
StructArray::try_new_with_dtype(fields, array.struct_dtype().clone(), length, validity)
23+
.map(|a| a.into_array())
24+
}
25+
}
26+
27+
register_kernel!(FilterKernelAdapter(StructEncoding).lift());

vortex-array/src/arrays/struct_/compute/mod.rs

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
mod cast;
2+
mod filter;
23
mod mask;
34

45
use itertools::Itertools;
56
use vortex_error::VortexResult;
6-
use vortex_mask::Mask;
77

88
use crate::arrays::StructEncoding;
99
use crate::arrays::struct_::StructArray;
1010
use crate::compute::{
11-
FilterKernel, FilterKernelAdapter, IsConstantKernel, IsConstantKernelAdapter, IsConstantOpts,
12-
MinMaxKernel, MinMaxKernelAdapter, MinMaxResult, TakeKernel, TakeKernelAdapter, filter,
13-
is_constant_opts, take,
11+
IsConstantKernel, IsConstantKernelAdapter, IsConstantOpts, MinMaxKernel, MinMaxKernelAdapter,
12+
MinMaxResult, TakeKernel, TakeKernelAdapter, is_constant_opts, take,
1413
};
1514
use crate::{Array, ArrayRef, ArrayVisitor, register_kernel};
1615

@@ -32,27 +31,6 @@ impl TakeKernel for StructEncoding {
3231

3332
register_kernel!(TakeKernelAdapter(StructEncoding).lift());
3433

35-
impl FilterKernel for StructEncoding {
36-
fn filter(&self, array: &StructArray, mask: &Mask) -> VortexResult<ArrayRef> {
37-
let validity = array.validity().filter(mask)?;
38-
39-
let fields: Vec<ArrayRef> = array
40-
.fields()
41-
.iter()
42-
.map(|field| filter(field, mask))
43-
.try_collect()?;
44-
let length = fields
45-
.first()
46-
.map(|a| a.len())
47-
.unwrap_or_else(|| mask.true_count());
48-
49-
StructArray::try_new_with_dtype(fields, array.struct_dtype().clone(), length, validity)
50-
.map(|a| a.into_array())
51-
}
52-
}
53-
54-
register_kernel!(FilterKernelAdapter(StructEncoding).lift());
55-
5634
impl MinMaxKernel for StructEncoding {
5735
fn min_max(&self, _array: &StructArray) -> VortexResult<Option<MinMaxResult>> {
5836
// TODO(joe): Implement struct min max
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
use vortex_error::VortexResult;
2+
use vortex_mask::Mask;
3+
4+
use crate::arrays::VarBinViewEncoding;
5+
use crate::compute::{FilterKernel, FilterKernelAdapter, arrow_filter_fn};
6+
use crate::{ArrayRef, register_kernel};
7+
8+
impl FilterKernel for VarBinViewEncoding {
9+
fn filter(&self, array: &Self::Array, mask: &Mask) -> VortexResult<ArrayRef> {
10+
arrow_filter_fn(array, mask)
11+
}
12+
}
13+
14+
register_kernel!(FilterKernelAdapter(VarBinViewEncoding).lift());

vortex-array/src/arrays/varbinview/compute/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod cast;
2+
mod filter;
23
mod is_constant;
34
mod is_sorted;
45
mod mask;

vortex-array/src/arrays/varbinview/compute/to_arrow.rs

Lines changed: 0 additions & 39 deletions
This file was deleted.

vortex-array/src/compute/filter.rs

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -82,15 +82,6 @@ impl ComputeFnVTable for Filter {
8282
return Ok(array.to_array().into());
8383
}
8484

85-
// Since we handle the AllTrue and AllFalse cases in the entry-point filter function,
86-
// implementations can use `AllOr::expect_some` to unwrap the mixed values variant.
87-
let values = match &mask {
88-
Mask::AllTrue(_) => return Ok(array.to_array().into()),
89-
Mask::AllFalse(_) => return Ok(Canonical::empty(array.dtype()).into_array().into()),
90-
Mask::Values(values) => values,
91-
};
92-
93-
// Check each kernel for the array
9485
for kernel in kernels {
9586
if let Some(output) = kernel.invoke(args)? {
9687
return Ok(output);
@@ -108,14 +99,17 @@ impl ComputeFnVTable for Filter {
10899
.into());
109100
}
110101

111-
// Fallback: implement using Arrow kernels.
112102
log::debug!("No filter implementation found for {}", array.encoding(),);
113103

114-
let array_ref = array.to_array().into_arrow_preferred()?;
115-
let mask_array = BooleanArray::new(values.boolean_buffer().clone(), None);
116-
let filtered = arrow_select::filter::filter(array_ref.as_ref(), &mask_array)?;
104+
if !array.is_canonical() {
105+
let canonical = array.to_canonical()?.into_array();
106+
return filter(&canonical, mask).map(Into::into);
107+
};
117108

118-
Ok(ArrayRef::from_arrow(filtered, array.dtype().is_nullable()).into())
109+
vortex_bail!(
110+
"No filter implementation found for array {}",
111+
array.encoding()
112+
)
119113
}
120114

121115
fn return_dtype(&self, args: &InvocationArgs) -> VortexResult<DType> {
@@ -234,6 +228,19 @@ impl TryFrom<&dyn Array> for Mask {
234228
}
235229
}
236230

231+
pub fn arrow_filter_fn(array: &dyn Array, mask: &Mask) -> VortexResult<ArrayRef> {
232+
let values = match &mask {
233+
Mask::Values(values) => values,
234+
_ => unreachable!("check in filter invoke"),
235+
};
236+
237+
let array_ref = array.to_array().into_arrow_preferred()?;
238+
let mask_array = BooleanArray::new(values.boolean_buffer().clone(), None);
239+
let filtered = arrow_select::filter::filter(array_ref.as_ref(), &mask_array)?;
240+
241+
Ok(ArrayRef::from_arrow(filtered, array.dtype().is_nullable()))
242+
}
243+
237244
#[cfg(test)]
238245
mod test {
239246
use super::*;

0 commit comments

Comments
 (0)