Skip to content

Commit 156d071

Browse files
committed
Clarify docs in session accessors
Signed-off-by: Nicholas Gates <[email protected]>
2 parents 8d28f51 + c79936a commit 156d071

File tree

45 files changed

+995
-242
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+995
-242
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

encodings/runend/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ vortex-dtype = { workspace = true }
2424
vortex-error = { workspace = true }
2525
vortex-mask = { workspace = true }
2626
vortex-scalar = { workspace = true }
27+
vortex-session = { workspace = true }
2728

2829
[lints]
2930
workspace = true

encodings/runend/src/lib.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub mod compress;
1111
mod compute;
1212
mod iter;
1313
mod ops;
14+
mod rules;
1415

1516
#[doc(hidden)]
1617
pub mod _benchmarking {
@@ -23,11 +24,17 @@ pub mod _benchmarking {
2324
use vortex_array::ArrayBufferVisitor;
2425
use vortex_array::ArrayChildVisitor;
2526
use vortex_array::Canonical;
27+
use vortex_array::session::ArraySession;
28+
use vortex_array::session::ArraySessionExt;
29+
use vortex_array::vtable::ArrayVTableExt;
2630
use vortex_array::vtable::EncodeVTable;
2731
use vortex_array::vtable::VisitorVTable;
2832
use vortex_error::VortexResult;
33+
use vortex_session::SessionExt;
34+
use vortex_session::VortexSession;
2935

3036
use crate::compress::runend_encode;
37+
use crate::rules::RunEndScalarFnRule;
3138

3239
impl EncodeVTable<RunEndVTable> for RunEndVTable {
3340
fn encode(
@@ -59,6 +66,15 @@ impl VisitorVTable<RunEndVTable> for RunEndVTable {
5966
}
6067
}
6168

69+
/// Initialize run-end encoding in the given session.
70+
pub fn initialize(session: &mut VortexSession) {
71+
session.arrays().register(RunEndVTable.as_vtable());
72+
session
73+
.get_mut::<ArraySession>()
74+
.optimizer_mut()
75+
.register_parent_rule(RunEndScalarFnRule);
76+
}
77+
6278
#[cfg(test)]
6379
mod tests {
6480
use vortex_array::ProstMetadata;

encodings/runend/src/rules.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use vortex_array::ArrayRef;
5+
use vortex_array::IntoArray;
6+
use vortex_array::arrays::AnyScalarFn;
7+
use vortex_array::arrays::ConstantArray;
8+
use vortex_array::arrays::ConstantVTable;
9+
use vortex_array::arrays::ScalarFnArray;
10+
use vortex_array::optimizer::rules::ArrayParentReduceRule;
11+
use vortex_array::optimizer::rules::Exact;
12+
use vortex_dtype::DType;
13+
use vortex_error::VortexResult;
14+
15+
use crate::RunEndArray;
16+
use crate::RunEndVTable;
17+
18+
/// A rule to push down scalar functions through run-end encoding into the values array.
19+
///
20+
/// This only works if all other children of the scalar function array are constants.
21+
#[derive(Debug)]
22+
pub(crate) struct RunEndScalarFnRule;
23+
24+
impl ArrayParentReduceRule<Exact<RunEndVTable>, AnyScalarFn> for RunEndScalarFnRule {
25+
fn child(&self) -> Exact<RunEndVTable> {
26+
Exact::from(&RunEndVTable)
27+
}
28+
29+
fn parent(&self) -> AnyScalarFn {
30+
AnyScalarFn
31+
}
32+
33+
fn reduce_parent(
34+
&self,
35+
run_end: &RunEndArray,
36+
parent: &ScalarFnArray,
37+
child_idx: usize,
38+
) -> VortexResult<Option<ArrayRef>> {
39+
for (idx, child) in parent.children().iter().enumerate() {
40+
if idx == child_idx {
41+
// Skip ourselves
42+
continue;
43+
}
44+
45+
if !child.is::<ConstantVTable>() {
46+
// We can only push down if all other children are constants
47+
return Ok(None);
48+
}
49+
}
50+
51+
// TODO(ngates): relax this constraint and implement run-end decoding for all vector types.
52+
if !matches!(parent.dtype(), DType::Bool(_) | DType::Primitive(..)) {
53+
return Ok(None);
54+
}
55+
56+
let values_len = run_end.values().len();
57+
let mut new_children = parent.children();
58+
for (idx, child) in new_children.iter_mut().enumerate() {
59+
if idx == child_idx {
60+
// Replace ourselves with run end values
61+
*child = run_end.values().clone();
62+
continue;
63+
}
64+
65+
// Replace other children with their constant scalar value with length adjusted
66+
// to the length of the run end values.
67+
let constant = child.as_::<ConstantVTable>();
68+
*child = ConstantArray::new(constant.scalar().clone(), values_len).into_array();
69+
}
70+
71+
let new_values =
72+
ScalarFnArray::try_new(parent.scalar_fn().clone(), new_children, values_len)?
73+
.into_array();
74+
75+
Ok(Some(
76+
RunEndArray::try_new_offset_length(
77+
run_end.ends().clone(),
78+
new_values,
79+
run_end.offset(),
80+
run_end.len(),
81+
)?
82+
.into_array(),
83+
))
84+
}
85+
}

vortex-array/src/arrays/chunked/vtable/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use vortex_dtype::PType;
99
use vortex_error::VortexResult;
1010
use vortex_error::vortex_bail;
1111
use vortex_error::vortex_err;
12+
use vortex_mask::Mask;
1213
use vortex_vector::Vector;
1314
use vortex_vector::VectorMut;
1415
use vortex_vector::VectorMutOps;
@@ -20,6 +21,7 @@ use crate::arrays::PrimitiveArray;
2021
use crate::kernel::BindCtx;
2122
use crate::kernel::Kernel;
2223
use crate::kernel::KernelRef;
24+
use crate::kernel::PushDownResult;
2325
use crate::serde::ArrayChildren;
2426
use crate::validity::Validity;
2527
use crate::vtable;
@@ -157,4 +159,8 @@ impl Kernel for ChunkedKernel {
157159
}
158160
Ok(vector.freeze())
159161
}
162+
163+
fn push_down_filter(self: Box<Self>, _selection: &Mask) -> VortexResult<PushDownResult> {
164+
Ok(PushDownResult::NotPushed(self))
165+
}
160166
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use vortex_compute::filter::Filter;
5+
use vortex_error::VortexResult;
6+
use vortex_mask::Mask;
7+
use vortex_vector::Vector;
8+
9+
use crate::kernel::Kernel;
10+
use crate::kernel::KernelRef;
11+
use crate::kernel::PushDownResult;
12+
13+
#[derive(Debug)]
14+
pub struct FilterKernel {
15+
child: KernelRef,
16+
mask: Mask,
17+
}
18+
19+
impl FilterKernel {
20+
pub fn new(child: KernelRef, mask: Mask) -> Self {
21+
Self { child, mask }
22+
}
23+
}
24+
25+
impl Kernel for FilterKernel {
26+
fn execute(self: Box<Self>) -> VortexResult<Vector> {
27+
Ok(Filter::filter(&self.child.execute()?, &self.mask))
28+
}
29+
30+
fn push_down_filter(self: Box<Self>, selection: &Mask) -> VortexResult<PushDownResult> {
31+
let new_mask = self.mask.intersect_by_rank(selection);
32+
Ok(match self.child.push_down_filter(&new_mask)? {
33+
PushDownResult::NotPushed(k) => PushDownResult::NotPushed(Box::new(FilterKernel {
34+
child: k,
35+
mask: new_mask,
36+
})),
37+
PushDownResult::Pushed(new_k) => PushDownResult::Pushed(new_k),
38+
})
39+
}
40+
}

vortex-array/src/arrays/filter/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
mod array;
5+
mod kernel;
56
mod vtable;
67

78
pub use array::*;
9+
pub use kernel::*;
810
pub use vtable::*;

vortex-array/src/arrays/filter/vtable.rs

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ use crate::IntoArray;
2424
use crate::Precision;
2525
use crate::arrays::LEGACY_SESSION;
2626
use crate::arrays::filter::array::FilterArray;
27+
use crate::arrays::filter::kernel::FilterKernel;
2728
use crate::kernel::BindCtx;
2829
use crate::kernel::KernelRef;
29-
use crate::kernel::kernel;
30+
use crate::kernel::PushDownResult;
3031
use crate::serde::ArrayChildren;
3132
use crate::stats::StatsSetRef;
3233
use crate::vectors::VectorIntoArray;
@@ -82,22 +83,46 @@ impl VTable for FilterVTable {
8283
&self,
8384
dtype: &DType,
8485
len: usize,
85-
metadata: &Self::Metadata,
86+
selection_mask: &Mask,
8687
_buffers: &[BufferHandle],
8788
children: &dyn ArrayChildren,
8889
) -> VortexResult<Self::Array> {
89-
let child = children.get(0, dtype, len)?;
90+
assert_eq!(len, selection_mask.true_count());
91+
let child = children.get(0, dtype, selection_mask.len())?;
9092
Ok(FilterArray {
9193
child,
92-
mask: metadata.clone(),
94+
mask: selection_mask.clone(),
9395
stats: Default::default(),
9496
})
9597
}
9698

9799
fn bind_kernel(array: &Self::Array, ctx: &mut BindCtx) -> VortexResult<KernelRef> {
98-
let child = array.child.bind_kernel(ctx)?;
100+
let mut child = array.child.bind_kernel(ctx)?;
99101
let mask = array.mask.clone();
100-
Ok(kernel(move || Ok(Filter::filter(&child.execute()?, &mask))))
102+
103+
// NOTE(ngates): for now we keep the same behavior as develop where we push-down any
104+
// query with <20% true values.
105+
let pushdown = array.mask.density() < 0.2;
106+
107+
if pushdown {
108+
// Try to push down the filter to the child if it's cheaper.
109+
child = match child.push_down_filter(&mask)? {
110+
PushDownResult::Pushed(new_k) => {
111+
tracing::debug!("Filter push down kernel:\n{:?}", new_k);
112+
return Ok(new_k);
113+
}
114+
PushDownResult::NotPushed(child) => {
115+
tracing::warn!(
116+
"Filter pushdown was cheaper but not supported by child array {}",
117+
array.child.display_tree()
118+
);
119+
child
120+
}
121+
};
122+
}
123+
124+
// Otherwise, wrap up the child in a filter kernel.
125+
Ok(Box::new(FilterKernel::new(child, mask)))
101126
}
102127
}
103128

vortex-array/src/arrays/scalar_fn/array.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,9 @@ impl ScalarFnArray {
4444
stats: Default::default(),
4545
})
4646
}
47+
48+
/// Get the scalar function bound to this array.
49+
pub fn scalar_fn(&self) -> &ScalarFn {
50+
&self.scalar_fn
51+
}
4752
}

vortex-array/src/arrays/scalar_fn/kernel.rs

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use itertools::Itertools;
54
use vortex_dtype::DType;
65
use vortex_error::VortexResult;
6+
use vortex_mask::Mask;
77
use vortex_vector::Datum;
88
use vortex_vector::Scalar;
99
use vortex_vector::Vector;
@@ -12,6 +12,7 @@ use crate::expr::ExecutionArgs;
1212
use crate::expr::ScalarFn;
1313
use crate::kernel::Kernel;
1414
use crate::kernel::KernelRef;
15+
use crate::kernel::PushDownResult;
1516

1617
/// A CPU kernel for executing scalar functions.
1718
#[derive(Debug)]
@@ -37,14 +38,17 @@ pub(super) enum KernelInput {
3738

3839
impl Kernel for ScalarFnKernel {
3940
fn execute(self: Box<Self>) -> VortexResult<Vector> {
40-
let datums: Vec<_> = self
41-
.inputs
42-
.into_iter()
43-
.map(|input| match input {
44-
KernelInput::Scalar(s) => Ok(Datum::Scalar(s)),
45-
KernelInput::Vector(k) => k.execute().map(Datum::Vector),
46-
})
47-
.try_collect()?;
41+
let mut datums: Vec<Datum> = Vec::with_capacity(self.inputs.len());
42+
for input in self.inputs {
43+
match input {
44+
KernelInput::Scalar(s) => {
45+
datums.push(Datum::Scalar(s));
46+
}
47+
KernelInput::Vector(kernel) => {
48+
datums.push(Datum::Vector(kernel.execute()?));
49+
}
50+
}
51+
}
4852

4953
let args = ExecutionArgs {
5054
datums,
@@ -55,4 +59,26 @@ impl Kernel for ScalarFnKernel {
5559

5660
Ok(self.scalar_fn.execute(args)?.ensure_vector(self.row_count))
5761
}
62+
63+
fn push_down_filter(self: Box<Self>, selection: &Mask) -> VortexResult<PushDownResult> {
64+
let mut new_inputs = Vec::with_capacity(self.inputs.len());
65+
for input in self.inputs {
66+
match input {
67+
KernelInput::Scalar(s) => {
68+
new_inputs.push(KernelInput::Scalar(s.clone()));
69+
}
70+
KernelInput::Vector(k) => {
71+
new_inputs.push(KernelInput::Vector(k.force_push_down_filter(selection)?));
72+
}
73+
}
74+
}
75+
76+
Ok(PushDownResult::Pushed(Box::new(ScalarFnKernel {
77+
scalar_fn: self.scalar_fn,
78+
inputs: new_inputs,
79+
input_dtypes: self.input_dtypes,
80+
row_count: selection.true_count(),
81+
return_dtype: self.return_dtype,
82+
})))
83+
}
5884
}

0 commit comments

Comments
 (0)