Skip to content

Commit a447093

Browse files
committed
Filter pushdown
Signed-off-by: Nicholas Gates <[email protected]>
1 parent 813aff7 commit a447093

File tree

15 files changed

+283
-79
lines changed

15 files changed

+283
-79
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use vortex_compute::filter::Filter;
5+
use vortex_dtype::DType;
6+
use vortex_error::VortexResult;
7+
use vortex_mask::Mask;
8+
use vortex_vector::Vector;
9+
10+
use crate::kernel::Kernel;
11+
use crate::kernel::KernelRef;
12+
use crate::kernel::PushDownResult;
13+
14+
#[derive(Debug)]
15+
pub struct FilterKernel {
16+
child: KernelRef,
17+
mask: Mask,
18+
// Used for estimating filter cost
19+
dtype: DType,
20+
}
21+
22+
impl FilterKernel {
23+
pub fn new(child: KernelRef, mask: Mask, dtype: DType) -> Self {
24+
Self { child, mask, dtype }
25+
}
26+
}
27+
28+
impl Kernel for FilterKernel {
29+
fn execute(self: Box<Self>) -> VortexResult<Vector> {
30+
Ok(Filter::filter(&self.child.execute()?, &self.mask))
31+
}
32+
33+
fn cost_estimate(&self, selection: &Mask) -> f64 {
34+
cost_for_dtype(&self.dtype, selection)
35+
}
36+
37+
fn push_down_filter(self: Box<Self>, selection: &Mask) -> VortexResult<PushDownResult> {
38+
let new_mask = self.mask.intersect_by_rank(selection);
39+
Ok(match self.child.push_down_filter(&new_mask)? {
40+
PushDownResult::NotPushed(k) => PushDownResult::NotPushed(Box::new(FilterKernel {
41+
child: k,
42+
mask: new_mask,
43+
dtype: self.dtype.clone(),
44+
})),
45+
PushDownResult::Pushed(new_k) => PushDownResult::Pushed(new_k),
46+
})
47+
}
48+
}
49+
50+
fn cost_for_dtype(dtype: &DType, selection: &Mask) -> f64 {
51+
match dtype {
52+
DType::Null => 0.0,
53+
DType::Extension(ext) => cost_for_dtype(ext.storage_dtype(), selection),
54+
_ => f64::INFINITY,
55+
}
56+
}

vortex-array/src/arrays/filter/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
mod array;
5+
mod kernel;
56
mod vtable;
67

78
pub use array::*;
9+
pub use kernel::*;
810
pub use vtable::*;

vortex-array/src/arrays/filter/vtable.rs

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,18 @@ use std::ops::Range;
77
use vortex_buffer::BufferHandle;
88
use vortex_compute::filter::Filter;
99
use vortex_dtype::DType;
10+
use vortex_error::vortex_bail;
1011
use vortex_error::VortexExpect;
1112
use vortex_error::VortexResult;
12-
use vortex_error::vortex_bail;
1313
use vortex_mask::Mask;
1414
use vortex_scalar::Scalar;
1515

16-
use crate::Array;
17-
use crate::ArrayBufferVisitor;
18-
use crate::ArrayChildVisitor;
19-
use crate::ArrayEq;
20-
use crate::ArrayHash;
21-
use crate::ArrayRef;
22-
use crate::Canonical;
23-
use crate::IntoArray;
24-
use crate::Precision;
25-
use crate::arrays::LEGACY_SESSION;
2616
use crate::arrays::filter::array::FilterArray;
17+
use crate::arrays::filter::kernel::FilterKernel;
18+
use crate::arrays::LEGACY_SESSION;
2719
use crate::kernel::BindCtx;
2820
use crate::kernel::KernelRef;
29-
use crate::kernel::kernel;
21+
use crate::kernel::PushDownResult;
3022
use crate::serde::ArrayChildren;
3123
use crate::stats::StatsSetRef;
3224
use crate::vectors::VectorIntoArray;
@@ -41,6 +33,15 @@ use crate::vtable::OperationsVTable;
4133
use crate::vtable::VTable;
4234
use crate::vtable::ValidityVTable;
4335
use crate::vtable::VisitorVTable;
36+
use crate::Array;
37+
use crate::ArrayBufferVisitor;
38+
use crate::ArrayChildVisitor;
39+
use crate::ArrayEq;
40+
use crate::ArrayHash;
41+
use crate::ArrayRef;
42+
use crate::Canonical;
43+
use crate::IntoArray;
44+
use crate::Precision;
4445

4546
vtable!(Filter);
4647

@@ -95,9 +96,40 @@ impl VTable for FilterVTable {
9596
}
9697

9798
fn bind_kernel(array: &Self::Array, ctx: &mut BindCtx) -> VortexResult<KernelRef> {
98-
let child = array.child.bind_kernel(ctx)?;
99+
let mut child = array.child.bind_kernel(ctx)?;
99100
let mask = array.mask.clone();
100-
Ok(kernel(move || Ok(Filter::filter(&child.execute()?, &mask))))
101+
102+
let full_cost = child.cost_estimate(&Mask::new_true(array.child.len()));
103+
let pushdown_cost = child.cost_estimate(&mask);
104+
log::debug!(
105+
"Filter kernel cost estimate: full={}, pushdown={}",
106+
full_cost,
107+
pushdown_cost
108+
);
109+
110+
if pushdown_cost < full_cost {
111+
// Try to push down the filter to the child if it's cheaper.
112+
child = match child.push_down_filter(&mask)? {
113+
PushDownResult::Pushed(new_k) => {
114+
log::debug!("Filter push down kernel:\n{:?}", new_k);
115+
return Ok(new_k);
116+
}
117+
PushDownResult::NotPushed(child) => {
118+
log::debug!(
119+
"Filter pushdown was cheaper but not supported by child array {}",
120+
array.child.display_tree()
121+
);
122+
child
123+
}
124+
};
125+
}
126+
127+
// Otherwise, wrap up the child in a filter kernel.
128+
Ok(Box::new(FilterKernel::new(
129+
child,
130+
mask,
131+
array.dtype().clone(),
132+
)))
101133
}
102134
}
103135

vortex-array/src/arrays/scalar_fn/kernel.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,17 @@
44
use itertools::Itertools;
55
use vortex_dtype::DType;
66
use vortex_error::VortexResult;
7+
use vortex_mask::Mask;
78
use vortex_vector::Datum;
89
use vortex_vector::Scalar;
910
use vortex_vector::Vector;
1011

12+
use crate::arrays::FilterKernel;
1113
use crate::expr::ExecutionArgs;
1214
use crate::expr::ScalarFn;
1315
use crate::kernel::Kernel;
1416
use crate::kernel::KernelRef;
17+
use crate::kernel::PushDownResult;
1518

1619
/// A CPU kernel for executing scalar functions.
1720
#[derive(Debug)]
@@ -55,4 +58,46 @@ impl Kernel for ScalarFnKernel {
5558

5659
Ok(self.scalar_fn.execute(args)?.ensure_vector(self.row_count))
5760
}
61+
62+
fn cost_estimate(&self, selection: &Mask) -> f64 {
63+
let self_cost = self.scalar_fn.cost_estimate(selection);
64+
let child_cost = self
65+
.inputs
66+
.iter()
67+
.map(|input| match input {
68+
KernelInput::Scalar(_) => 0.0,
69+
KernelInput::Vector(k) => k.cost_estimate(selection),
70+
})
71+
.sum::<f64>();
72+
73+
self_cost + child_cost
74+
}
75+
76+
fn push_down_filter(self: Box<Self>, selection: &Mask) -> VortexResult<PushDownResult> {
77+
let mut new_inputs = Vec::with_capacity(self.inputs.len());
78+
for (input, dtype) in self.inputs.into_iter().zip(&self.input_dtypes) {
79+
match input {
80+
KernelInput::Scalar(s) => {
81+
new_inputs.push(KernelInput::Scalar(s.clone()));
82+
}
83+
KernelInput::Vector(k) => match k.push_down_filter(selection)? {
84+
PushDownResult::Pushed(new_k) => {
85+
new_inputs.push(KernelInput::Vector(new_k));
86+
}
87+
PushDownResult::NotPushed(k) => {
88+
let new_k = FilterKernel::new(k, selection.clone(), dtype.clone());
89+
new_inputs.push(KernelInput::Vector(Box::new(new_k)));
90+
}
91+
},
92+
}
93+
}
94+
95+
Ok(PushDownResult::Pushed(Box::new(ScalarFnKernel {
96+
scalar_fn: self.scalar_fn,
97+
inputs: new_inputs,
98+
input_dtypes: self.input_dtypes,
99+
row_count: self.row_count,
100+
return_dtype: self.return_dtype,
101+
})))
102+
}
58103
}

vortex-array/src/arrays/scalar_fn/vtable/canonical.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@
44
use itertools::Itertools;
55
use vortex_error::VortexExpect;
66

7-
use crate::Array;
8-
use crate::Canonical;
9-
use crate::arrays::LEGACY_SESSION;
107
use crate::arrays::scalar_fn::array::ScalarFnArray;
118
use crate::arrays::scalar_fn::vtable::ScalarFnVTable;
9+
use crate::arrays::LEGACY_SESSION;
1210
use crate::executor::VectorExecutor;
1311
use crate::expr::ExecutionArgs;
1412
use crate::vectors::VectorIntoArray;
1513
use crate::vtable::CanonicalVTable;
14+
use crate::Array;
15+
use crate::Canonical;
1616

1717
impl CanonicalVTable<ScalarFnVTable> for ScalarFnVTable {
1818
fn canonicalize(array: &ScalarFnArray) -> Canonical {
@@ -34,12 +34,12 @@ impl CanonicalVTable<ScalarFnVTable> for ScalarFnVTable {
3434
return_dtype: array.dtype.clone(),
3535
};
3636

37+
let len = array.len;
3738
let result_vector = array
3839
.scalar_fn
3940
.execute(ctx)
4041
.vortex_expect("Canonicalize should be fallible")
41-
.into_vector()
42-
.vortex_expect("Canonicalize should return a vector");
42+
.ensure_vector(len);
4343

4444
result_vector.into_array(&array.dtype).to_canonical()
4545
}

vortex-array/src/expr/exprs/get_item.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,18 @@ use vortex_dtype::DType;
99
use vortex_dtype::FieldName;
1010
use vortex_dtype::FieldPath;
1111
use vortex_dtype::Nullability;
12+
use vortex_error::vortex_err;
1213
use vortex_error::VortexExpect;
1314
use vortex_error::VortexResult;
14-
use vortex_error::vortex_err;
15+
use vortex_mask::Mask;
1516
use vortex_proto::expr as pb;
1617
use vortex_vector::Datum;
1718
use vortex_vector::ScalarOps;
1819
use vortex_vector::VectorOps;
1920

20-
use crate::ArrayRef;
21-
use crate::ToCanonical;
2221
use crate::compute::mask;
22+
use crate::expr::exprs::root::root;
23+
use crate::expr::stats::Stat;
2324
use crate::expr::Arity;
2425
use crate::expr::ChildName;
2526
use crate::expr::ExecutionArgs;
@@ -30,8 +31,8 @@ use crate::expr::SimplifyCtx;
3031
use crate::expr::StatsCatalog;
3132
use crate::expr::VTable;
3233
use crate::expr::VTableExt;
33-
use crate::expr::exprs::root::root;
34-
use crate::expr::stats::Stat;
34+
use crate::ArrayRef;
35+
use crate::ToCanonical;
3536

3637
pub struct GetItem;
3738

@@ -189,6 +190,11 @@ impl VTable for GetItem {
189190
// If this type-checks its infallible.
190191
false
191192
}
193+
194+
fn cost_estimate(&self, _options: &Self::Options, _selection: &Mask) -> f64 {
195+
// This is largely a metadata-only operation.
196+
0.0
197+
}
192198
}
193199

194200
/// Creates an expression that accesses a field from the root array.
@@ -226,15 +232,15 @@ mod tests {
226232
use vortex_dtype::StructFields;
227233
use vortex_scalar::Scalar;
228234

229-
use crate::Array;
230-
use crate::IntoArray;
231235
use crate::arrays::StructArray;
232236
use crate::expr::exprs::binary::checked_add;
233237
use crate::expr::exprs::get_item::get_item;
234238
use crate::expr::exprs::literal::lit;
235239
use crate::expr::exprs::pack::pack;
236240
use crate::expr::exprs::root::root;
237241
use crate::validity::Validity;
242+
use crate::Array;
243+
use crate::IntoArray;
238244

239245
fn test_array() -> StructArray {
240246
StructArray::from_fields(&[

vortex-array/src/expr/exprs/merge.rs

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,16 @@ use vortex_dtype::DType;
1111
use vortex_dtype::FieldNames;
1212
use vortex_dtype::Nullability;
1313
use vortex_dtype::StructFields;
14+
use vortex_error::vortex_bail;
1415
use vortex_error::VortexExpect;
1516
use vortex_error::VortexResult;
16-
use vortex_error::vortex_bail;
17+
use vortex_mask::Mask;
1718
use vortex_utils::aliases::hash_set::HashSet;
1819
use vortex_vector::Datum;
1920

20-
use crate::Array;
21-
use crate::ArrayRef;
22-
use crate::IntoArray as _;
23-
use crate::ToCanonical;
2421
use crate::arrays::StructArray;
22+
use crate::expr::get_item;
23+
use crate::expr::pack;
2524
use crate::expr::Arity;
2625
use crate::expr::ChildName;
2726
use crate::expr::ExecutionArgs;
@@ -30,9 +29,11 @@ use crate::expr::Expression;
3029
use crate::expr::SimplifyCtx;
3130
use crate::expr::VTable;
3231
use crate::expr::VTableExt;
33-
use crate::expr::get_item;
34-
use crate::expr::pack;
3532
use crate::validity::Validity;
33+
use crate::Array;
34+
use crate::ArrayRef;
35+
use crate::IntoArray as _;
36+
use crate::ToCanonical;
3637

3738
/// Merge zero or more expressions that ALL return structs.
3839
///
@@ -185,6 +186,11 @@ impl VTable for Merge {
185186
todo!()
186187
}
187188

189+
fn cost_estimate(&self, _options: &Self::Options, _selection: &Mask) -> f64 {
190+
// This is largely a metadata-only operation.
191+
0.0
192+
}
193+
188194
fn simplify(
189195
&self,
190196
options: &Self::Options,
@@ -298,21 +304,21 @@ mod tests {
298304
use vortex_dtype::PType::I64;
299305
use vortex_dtype::PType::U32;
300306
use vortex_dtype::PType::U64;
301-
use vortex_error::VortexResult;
302307
use vortex_error::vortex_bail;
308+
use vortex_error::VortexResult;
303309

304310
use super::merge;
305-
use crate::Array;
306-
use crate::IntoArray;
307-
use crate::ToCanonical;
308311
use crate::arrays::PrimitiveArray;
309312
use crate::arrays::StructArray;
310-
use crate::expr::Expression;
311-
use crate::expr::Pack;
312313
use crate::expr::exprs::get_item::get_item;
313-
use crate::expr::exprs::merge::DuplicateHandling;
314314
use crate::expr::exprs::merge::merge_opts;
315+
use crate::expr::exprs::merge::DuplicateHandling;
315316
use crate::expr::exprs::root::root;
317+
use crate::expr::Expression;
318+
use crate::expr::Pack;
319+
use crate::Array;
320+
use crate::IntoArray;
321+
use crate::ToCanonical;
316322

317323
fn primitive_field(array: &dyn Array, field_path: &[&str]) -> VortexResult<PrimitiveArray> {
318324
let mut field_path = field_path.iter();

0 commit comments

Comments
 (0)