Skip to content

Commit 54ae1e4

Browse files
committed
pipelined execution
Signed-off-by: Nicholas Gates <[email protected]>
1 parent fe16812 commit 54ae1e4

File tree

4 files changed

+47
-195
lines changed

4 files changed

+47
-195
lines changed
Lines changed: 2 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,18 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use vortex_buffer::{BitBuffer, Buffer};
4+
use vortex_buffer::Buffer;
55
use vortex_compute::filter::Filter;
6-
use vortex_dtype::{NativePType, PTypeDowncastExt, match_each_native_ptype};
6+
use vortex_dtype::match_each_native_ptype;
77
use vortex_error::VortexResult;
88
use vortex_vector::primitive::PVector;
9-
use vortex_vector::{VectorMut, VectorMutOps};
109

1110
use crate::arrays::{MaskedVTable, PrimitiveArray, PrimitiveVTable};
1211
use crate::execution::{BatchKernelRef, BindCtx, kernel};
13-
use crate::pipeline::bit_view::{BitSlice, BitView};
14-
use crate::pipeline::{
15-
AllNullSourceKernel, BindContext, KernelContext, N, PipelinedSource, SourceKernel,
16-
};
17-
use crate::validity::Validity;
1812
use crate::vtable::{OperatorVTable, ValidityHelper};
1913
use crate::{ArrayRef, IntoArray};
2014

2115
impl OperatorVTable<PrimitiveVTable> for PrimitiveVTable {
22-
fn as_pipelined_source(array: &PrimitiveArray) -> Option<&dyn PipelinedSource> {
23-
Some(array)
24-
}
25-
2616
fn bind(
2717
array: &PrimitiveArray,
2818
selection: Option<&ArrayRef>,
@@ -71,117 +61,3 @@ impl OperatorVTable<PrimitiveVTable> for PrimitiveVTable {
7161
Ok(None)
7262
}
7363
}
74-
75-
impl PipelinedSource for PrimitiveArray {
76-
fn bind_source(&self, ctx: &mut dyn BindContext) -> VortexResult<Box<dyn SourceKernel>> {
77-
match self.validity() {
78-
Validity::NonNullable | Validity::AllValid => {
79-
match_each_native_ptype!(self.ptype(), |T| {
80-
let primitive_kernel = NonNullablePrimitiveKernel {
81-
buffer: self.buffer::<T>(),
82-
offset: 0,
83-
};
84-
Ok(Box::new(primitive_kernel))
85-
})
86-
}
87-
Validity::AllInvalid => Ok(Box::new(AllNullSourceKernel)),
88-
Validity::Array(_) => {
89-
let validity = ctx.batch_input(0).into_bool();
90-
// Validity is non-nullable, so we extract the inner bit buffer.
91-
let (validity, _) = validity.into_parts();
92-
93-
match_each_native_ptype!(self.ptype(), |T| {
94-
let primitive_kernel = NullablePrimitiveKernel {
95-
buffer: self.buffer::<T>(),
96-
validity,
97-
offset: 0,
98-
};
99-
Ok(Box::new(primitive_kernel))
100-
})
101-
}
102-
}
103-
}
104-
}
105-
106-
struct NonNullablePrimitiveKernel<T: NativePType> {
107-
buffer: Buffer<T>,
108-
offset: usize,
109-
}
110-
111-
impl<T: NativePType> SourceKernel for NonNullablePrimitiveKernel<T> {
112-
fn skip(&mut self, n: usize) {
113-
self.offset += n * N;
114-
}
115-
116-
fn step(
117-
&mut self,
118-
_ctx: &KernelContext,
119-
selection: &BitView,
120-
out: &mut VectorMut,
121-
) -> VortexResult<()> {
122-
let out = out.as_primitive_mut().downcast::<T>();
123-
124-
// SAFETY: we know the output has sufficient capacity.
125-
unsafe {
126-
out.validity_mut().append_n(true, selection.true_count());
127-
let prev_len = out.len();
128-
out.elements_mut()
129-
.set_len(prev_len + selection.true_count());
130-
}
131-
132-
let source = &self.buffer.as_slice()[self.offset..];
133-
let mut out_pos = 0;
134-
selection.iter_slices(|BitSlice { start, len }| {
135-
out.as_mut()[out_pos..][..len].copy_from_slice(&source[start..][..len]);
136-
out_pos += len;
137-
});
138-
139-
Ok(())
140-
}
141-
}
142-
143-
struct NullablePrimitiveKernel<T: NativePType> {
144-
buffer: Buffer<T>,
145-
#[allow(dead_code)] // TODO(ngates): implement appending validity bits
146-
validity: BitBuffer,
147-
offset: usize,
148-
}
149-
150-
impl<T: NativePType> SourceKernel for NullablePrimitiveKernel<T> {
151-
fn skip(&mut self, n: usize) {
152-
self.offset += n * N;
153-
}
154-
155-
fn step(
156-
&mut self,
157-
_ctx: &KernelContext,
158-
selection: &BitView,
159-
out: &mut VectorMut,
160-
) -> VortexResult<()> {
161-
let out = out.as_primitive_mut().downcast::<T>();
162-
163-
// SAFETY: we know the output has sufficient capacity. We just have to append nulls
164-
// separately from copying over the elements.
165-
unsafe {
166-
out.validity_mut().append_n(true, selection.true_count());
167-
let prev_len = out.len();
168-
out.elements_mut()
169-
.set_len(prev_len + selection.true_count());
170-
}
171-
172-
let source = &self.buffer.as_slice()[self.offset..];
173-
174-
let mut out_pos = 0;
175-
selection.iter_slices(|BitSlice { start, len }| {
176-
// Copy over the elements.
177-
out.as_mut()[out_pos..][..len].copy_from_slice(&source[start..][..len]);
178-
out_pos += len;
179-
180-
// Append the validity bits.
181-
let _validity = unsafe { out.validity_mut() };
182-
todo!("Append validity bits correctly and optimally!");
183-
});
184-
185-
Ok(())
186-
}
187-
}

vortex-array/src/pipeline/mod.rs

Lines changed: 28 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,6 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
//! Vortex crate containing vectorized operator processing.
5-
//!
6-
//! This module contains experiments into pipelined data processing within Vortex.
7-
//!
8-
//! Arrays (and eventually Layouts) will be convertible into a [`Kernel`] that can then be
9-
//! exported into a [`ViewMut`] one chunk of [`N`] elements at a time. This allows us to keep
10-
//! compute largely within the L1 cache, as well as to write out canonical data into externally
11-
//! provided buffers.
12-
//!
13-
//! Each chunk is represented in a canonical physical form, as determined by the logical
14-
//! [`vortex_dtype::DType`] of the array. This provides a predicate base on which to perform
15-
//! compute. Unlike DuckDB and other vectorized systems, we force a single canonical representation
16-
//! instead of supporting multiple encodings because compute push-down is applied a priori to the
17-
//! logical representation.
18-
//!
19-
//! It is a work-in-progress and is not yet used in production.
20-
214
pub mod bit_view;
225
pub mod source_driver;
236

@@ -38,8 +21,11 @@ pub const N_BYTES: usize = N / 8;
3821
/// Number of usize words needed to store N bits
3922
pub const N_WORDS: usize = N / usize::BITS as usize;
4023

41-
/// Returned by an array to indicate that it can be executed in a pipelined fashion.
42-
pub trait PipelinedOperator: Array {
24+
/// Indicates that an array supports acting as a transformation node in a pipelined execution.
25+
///
26+
/// That is, it has one or more child arrays for which each input element produces a single output
27+
/// element. See [`PipelineSource`] for nodes that have zero pipelined children.
28+
pub trait PipelineTransform: Deref<Target = dyn Array> {
4329
// Whether this operator works by mutating its first child in-place.
4430
//
4531
// If `true`, the operator is invoked with the first child's input data passed via the
@@ -56,36 +42,39 @@ pub trait PipelinedOperator: Array {
5642
/// computed before pipelined execution begins.
5743
fn is_pipelined_child(&self, child_idx: usize) -> bool;
5844

59-
/// Bind the operator into a [`Kernel`] for pipelined execution.
45+
/// Bind the operator into a [`TransformKernel`] for pipelined execution.
6046
///
6147
/// The provided [`BindContext`] can be used to obtain vector IDs for pipelined children and
6248
/// batch IDs for batch children. Each child can only be bound once.
63-
fn bind(&self, ctx: &mut dyn BindContext) -> VortexResult<Box<dyn OperatorKernel>>;
49+
fn bind(&self, ctx: &mut dyn BindContext) -> VortexResult<Box<dyn TransformKernel>>;
6450
}
6551

66-
pub trait PipelinedSource: Deref<Target = dyn Array> {
67-
/// Bind the operator into a [`Kernel`] for pipelined execution.
52+
/// Indicates that an array supports acting as a source node in a pipelined execution.
53+
pub trait PipelineSource: Deref<Target = dyn Array> {
54+
/// Bind the operator into a [`SourceKernel`] for pipelined execution.
6855
///
6956
/// The provided [`BindContext`] can be used to obtain vector IDs for pipelined children and
7057
/// batch IDs for batch children. Each child can only be bound once.
71-
fn bind_source(&self, ctx: &mut dyn BindContext) -> VortexResult<Box<dyn SourceKernel>>;
58+
fn bind(&self, ctx: &mut dyn BindContext) -> VortexResult<Box<dyn SourceKernel>>;
7259
}
7360

7461
/// The context used when binding an operator for execution.
7562
pub trait BindContext {
7663
/// Returns a [`VectorId`] that can be passed to the [`KernelContext`] within the body of
77-
/// the [`Kernel`] to access the given child as a pipelined input vector.
64+
/// the kernel to access the given child as a pipelined input vector.
7865
///
7966
/// # Panics
8067
///
81-
/// If the child index requested here was not listed in [`Pipelined::pipelined_children`].
68+
/// If the child index requested here was not marked as a pipelined child in
69+
/// [`PipelineTransform::is_pipelined_child`].
8270
fn pipelined_input(&self, child_idx: usize) -> VectorId;
8371

8472
/// Returns the batch input vector for the given child.
8573
///
8674
/// # Panics
8775
///
88-
/// If the child index requested here was listed in [`Pipelined::pipelined_children`].
76+
/// If the child index requested here was marked as a pipelined child in
77+
/// [`PipelineTransform::is_pipelined_child`].
8978
fn batch_input(&self, child_idx: usize) -> Vector;
9079
}
9180

@@ -115,7 +104,12 @@ pub trait SourceKernel: Send {
115104
/// For example, if `n` is 3, then the kernel should skip over `3 * N` elements of input data.
116105
fn skip(&mut self, n: usize);
117106

118-
/// Attempts to perform a single step of the operator, writing data to the output vector.
107+
/// Attempts to perform a single step of the operator, appending data to the output vector.
108+
///
109+
/// The provided selection mask indicates which elements of the current chunk should be
110+
/// appended to the output vector.
111+
///
112+
/// The provided output vector is guaranteed to have at least `N` elements of capacity.
119113
fn step(
120114
&mut self,
121115
ctx: &KernelContext,
@@ -124,12 +118,13 @@ pub trait SourceKernel: Send {
124118
) -> VortexResult<()>;
125119
}
126120

127-
pub trait OperatorKernel: Send {
128-
/// Attempts to perform a single step of the operator, writing data to the output vector.
121+
pub trait TransformKernel: Send {
122+
/// Attempts to perform a single step of the operator, appending data to the output vector.
123+
///
124+
/// The input vectors can be accessed via the provided `KernelContext`.
129125
///
130-
/// The output vector has length equal to the number of valid elements in the input vectors.
131-
/// This number of values should be written to the output vector.
132-
fn step(&self, ctx: &KernelContext, out: &mut VectorMut) -> VortexResult<()>;
126+
/// The provided output vector is guaranteed to have at least `N` elements of capacity.
127+
fn step(&mut self, ctx: &KernelContext, out: &mut VectorMut) -> VortexResult<()>;
133128
}
134129

135130
/// Context passed to kernels during execution, providing access to vectors.

vortex-array/src/pipeline/source_driver.rs

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@ use vortex_mask::Mask;
77
use vortex_vector::{Vector, VectorMut, VectorMutOps};
88

99
use crate::pipeline::bit_view::{BitView, BitViewExt};
10-
use crate::pipeline::{BindContext, KernelContext, N, PipelinedSource, VectorId};
10+
use crate::pipeline::{BindContext, KernelContext, N, PipelineSource, VectorId};
1111

1212
/// Temporary driver for executing a single source array in a pipelined fashion.
1313
pub struct PipelineSourceDriver<'a> {
14-
array: &'a dyn PipelinedSource,
14+
array: &'a dyn PipelineSource,
1515
}
1616

1717
impl<'a> PipelineSourceDriver<'a> {
18-
pub fn new(array: &'a dyn PipelinedSource) -> Self {
18+
pub fn new(array: &'a dyn PipelineSource) -> Self {
1919
Self { array }
2020
}
2121

@@ -34,7 +34,7 @@ impl<'a> PipelineSourceDriver<'a> {
3434
let mut bind_ctx = PipelineSourceBindCtx {
3535
batch_inputs: &batch_inputs,
3636
};
37-
let mut kernel = self.array.bind_source(&mut bind_ctx)?;
37+
let mut kernel = self.array.bind(&mut bind_ctx)?;
3838
let kernel_ctx = KernelContext::empty();
3939

4040
// Allocate an output vector, with up to N bytes of padding to ensure every call to
@@ -46,7 +46,6 @@ impl<'a> PipelineSourceDriver<'a> {
4646
selection.true_count().next_multiple_of(N) + N,
4747
);
4848

49-
// TODO(ngates): change behaviour based on the density of the selection mask.
5049
match selection {
5150
Mask::AllTrue(_) => {
5251
// Select everything, so we can just run the kernel in a tight loop.
@@ -101,31 +100,3 @@ impl BindContext for PipelineSourceBindCtx<'_> {
101100
self.batch_inputs[child_idx].clone()
102101
}
103102
}
104-
105-
#[cfg(test)]
106-
mod test {
107-
use vortex_buffer::buffer;
108-
use vortex_dtype::PTypeDowncastExt;
109-
use vortex_mask::Mask;
110-
use vortex_vector::VectorOps;
111-
112-
use crate::arrays::PrimitiveArray;
113-
use crate::pipeline::source_driver::PipelineSourceDriver;
114-
use crate::validity::Validity;
115-
116-
#[test]
117-
fn test_primitive() {
118-
let array = PrimitiveArray::new::<u32>(buffer![0..100000u32], Validity::AllValid);
119-
120-
// Create a selection mask with some ranges.
121-
let mask = Mask::from_iter((0..100000).map(|i| i % 30 < 20));
122-
123-
let out = PipelineSourceDriver::new(&array)
124-
.execute(&mask)
125-
.unwrap()
126-
.into_primitive()
127-
.downcast::<u32>();
128-
129-
assert_eq!(out.len(), mask.true_count());
130-
}
131-
}

vortex-array/src/vtable/operator.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use vortex_vector::Vector;
88
use crate::ArrayRef;
99
use crate::array::IntoArray;
1010
use crate::execution::{BatchKernelRef, BindCtx, ExecutionCtx};
11-
use crate::pipeline::PipelinedSource;
11+
use crate::pipeline::{PipelineSource, PipelineTransform};
1212
use crate::vtable::{NotSupported, VTable};
1313

1414
/// A vtable for the new operator-based array functionality. Eventually this vtable will be
@@ -40,8 +40,10 @@ pub trait OperatorVTable<V: VTable> {
4040
Self::bind(array, Some(&selection.clone().into_array()), &mut ())?.execute()
4141
}
4242

43-
/// Downcast this array into a [`PipelinedSource`] if it supports pipelined execution.
44-
fn as_pipelined_source(_array: &V::Array) -> Option<&dyn PipelinedSource> {
43+
/// Downcast this array into a [`PipelineNode`] if it supports pipelined execution.
44+
///
45+
/// Each node is either a source node or a transformation node.
46+
fn pipeline_node(_array: &V::Array) -> Option<PipelineNode<'_>> {
4547
None
4648
}
4749

@@ -102,6 +104,14 @@ pub trait OperatorVTable<V: VTable> {
102104
}
103105
}
104106

107+
/// An enum over the types of pipeline nodes.
108+
pub enum PipelineNode<'a> {
109+
/// This node is a source node in a pipeline.
110+
Source(&'a dyn PipelineSource),
111+
/// This node is a transformation node in a pipeline.
112+
Transform(&'a dyn PipelineTransform),
113+
}
114+
105115
impl<V: VTable> OperatorVTable<V> for NotSupported {
106116
fn bind(
107117
array: &V::Array,

0 commit comments

Comments
 (0)