Skip to content

Commit 66cde77

Browse files
committed
merge
Signed-off-by: Nicholas Gates <[email protected]>
1 parent c9e7553 commit 66cde77

File tree

3 files changed

+79
-127
lines changed

3 files changed

+79
-127
lines changed

encodings/alp/src/alp/operator.rs

Lines changed: 8 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,13 @@
33

44
use crate::{match_each_alp_float_ptype, ALPArray, ALPFloat, ALPVTable, Exponents};
55
use std::marker::PhantomData;
6-
use vortex_array::pipeline::{
7-
BindContext, KernelContext, PipelineTransform, TransformKernel, VectorId,
8-
};
6+
use vortex_array::pipeline::{BindContext, PipelineTransform, TransformKernel};
97
use vortex_array::vtable::{OperatorVTable, PipelineNode};
108
use vortex_buffer::Buffer;
119
use vortex_dtype::{match_each_integer_ptype, NativePType, PTypeDowncastExt};
1210
use vortex_error::VortexResult;
1311
use vortex_vector::primitive::PVector;
14-
use vortex_vector::VectorMut;
12+
use vortex_vector::{Vector, VectorMut};
1513

1614
impl OperatorVTable<ALPVTable> for ALPVTable {
1715
fn pipeline_node(array: &ALPArray) -> Option<PipelineNode<'_>> {
@@ -20,22 +18,17 @@ impl OperatorVTable<ALPVTable> for ALPVTable {
2018
}
2119

2220
impl PipelineTransform for ALPArray {
23-
fn is_pipelined_child(&self, child_idx: usize) -> bool {
24-
match child_idx {
25-
0 => true, // encoded array
26-
_ => false, // patch indices + patch values
27-
}
21+
fn pipelined_child(&self) -> usize {
22+
0 // The encoded vector is the first child
2823
}
2924

3025
fn bind(&self, ctx: &mut dyn BindContext) -> VortexResult<Box<dyn TransformKernel>> {
31-
let encoded_vector_id = ctx.pipelined_input(0);
3226
let exponents = self.exponents();
3327

3428
match self.patches() {
3529
None => {
3630
match_each_alp_float_ptype!(self.ptype(), |A| {
3731
Ok(Box::new(ALPKernel::<A> {
38-
encoded_vector_id,
3932
exponents,
4033
_phantom: PhantomData,
4134
}))
@@ -50,7 +43,6 @@ impl PipelineTransform for ALPArray {
5043
let patch_indices: Buffer<P> = P::downcast(patch_idxs).into_buffer();
5144
let patch_values: PVector<A> = A::downcast(patch_vals);
5245
Ok(Box::new(PatchedALPKernel {
53-
encoded_vector_id,
5446
exponents,
5547
patch_indices,
5648
patch_values,
@@ -63,20 +55,14 @@ impl PipelineTransform for ALPArray {
6355
}
6456

6557
struct ALPKernel<A: ALPFloat> {
66-
// The encoded vector that returns `A::ALPInt` values
67-
encoded_vector_id: VectorId,
6858
// The ALP exponents
6959
exponents: Exponents,
7060
_phantom: PhantomData<A>,
7161
}
7262

7363
impl<A: ALPFloat> TransformKernel for ALPKernel<A> {
74-
fn step(&mut self, ctx: &KernelContext, out: &mut VectorMut) -> VortexResult<()> {
75-
let encoded = ctx
76-
.vector(self.encoded_vector_id)
77-
.into_primitive()
78-
.downcast::<A::ALPInt>()
79-
.into_buffer();
64+
fn step(&mut self, input: &VectorMut, out: &mut VectorMut) -> VortexResult<()> {
65+
let encoded = input.into_primitive().downcast::<A::ALPInt>().into_buffer();
8066

8167
let mut decoded = A::downcast(out.into_primitive());
8268
decoded.extend(
@@ -89,8 +75,6 @@ impl<A: ALPFloat> TransformKernel for ALPKernel<A> {
8975
}
9076

9177
struct PatchedALPKernel<A: ALPFloat, P: NativePType> {
92-
// The encoded vector that returns `A::ALPInt` values
93-
encoded_vector_id: VectorId,
9478
// The ALP exponents
9579
exponents: Exponents,
9680
// The patch indices and values
@@ -99,12 +83,8 @@ struct PatchedALPKernel<A: ALPFloat, P: NativePType> {
9983
}
10084

10185
impl<A: ALPFloat, P: NativePType> TransformKernel for PatchedALPKernel<A, P> {
102-
fn step(&mut self, ctx: &KernelContext, out: &mut VectorMut) -> VortexResult<()> {
103-
let encoded = ctx
104-
.vector(self.encoded_vector_id)
105-
.into_primitive()
106-
.downcast::<A::ALPInt>()
107-
.into_buffer();
86+
fn step(&mut self, input: &Vector, out: &mut VectorMut) -> VortexResult<()> {
87+
let encoded = input.into_primitive().downcast::<A::ALPInt>().into_buffer();
10888

10989
let mut decoded = out.into_primitive().downcast::<A>();
11090
decoded.extend(

vortex-array/src/pipeline/mod.rs

Lines changed: 66 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ pub mod source_driver;
66

77
use std::ops::Deref;
88

9-
use bit_view::BitView;
109
use vortex_error::VortexResult;
1110
use vortex_vector::{Vector, VectorMut, VectorMutOps};
1211

@@ -21,10 +20,21 @@ pub const N_BYTES: usize = N / 8;
2120
/// Number of usize words needed to store N bits
2221
pub const N_WORDS: usize = N / usize::BITS as usize;
2322

23+
/// A pipeline source node has zero pipelined inputs and produces data to feed into a pipeline.
24+
///
25+
/// All children of the array are considered to be batch inputs and will be fully computed before
26+
/// pipelined execution begins.
27+
pub trait PipelineSource: Deref<Target = dyn Array> {
28+
/// Bind the operator into a [`SourceKernel`] for pipelined execution.
29+
///
30+
/// The provided [`BindContext`] can be used to obtain vector IDs for pipelined children and
31+
/// batch IDs for batch children. Each child can only be bound once.
32+
fn bind(&self, ctx: &mut dyn BindContext) -> VortexResult<Box<dyn SourceKernel>>;
33+
}
34+
2435
/// Indicates that an array supports acting as a transformation node in a pipelined execution.
2536
///
26-
/// That is, it has one or more child arrays for which each input element produces a single output
27-
/// element. See [`PipelineSource`] for nodes that have zero pipelined children.
37+
/// Transform nodes have exactly one pipelined input, with zero or more batch inputs.
2838
pub trait PipelineTransform: Deref<Target = dyn Array> {
2939
// Whether this operator works by mutating its first child in-place.
3040
//
@@ -35,12 +45,8 @@ pub trait PipelineTransform: Deref<Target = dyn Array> {
3545
// false
3646
// }
3747

38-
/// Returns whether the nth child of this array should be passed to the kernel as a pipelined
39-
/// input vector, 1024 elements at a time.
40-
///
41-
/// Any child that reports `false` will be treated as a batch input, and the full vector will be
42-
/// computed before pipelined execution begins.
43-
fn is_pipelined_child(&self, child_idx: usize) -> bool;
48+
/// Returns the index of the array child that should be passed as a pipelined input
49+
fn pipelined_child(&self) -> usize;
4450

4551
/// Bind the operator into a [`TransformKernel`] for pipelined execution.
4652
///
@@ -49,125 +55,89 @@ pub trait PipelineTransform: Deref<Target = dyn Array> {
4955
fn bind(&self, ctx: &mut dyn BindContext) -> VortexResult<Box<dyn TransformKernel>>;
5056
}
5157

52-
/// Indicates that an array supports acting as a source node in a pipelined execution.
53-
pub trait PipelineSource: Deref<Target = dyn Array> {
54-
/// Bind the operator into a [`SourceKernel`] for pipelined execution.
58+
/// Indicates that an array supports acting as a transformation node in a pipelined execution
59+
/// with multiple pipelined inputs and zero or more batch inputs.
60+
pub trait PipelineZipTransform: Deref<Target = dyn Array> {
61+
/// Returns the index of the array child that should be passed as a pipelined input
62+
fn is_pipelined_child(&self, child_idx: usize) -> bool;
63+
64+
/// Bind the operator into a [`TransformKernel`] for pipelined execution.
5565
///
5666
/// The provided [`BindContext`] can be used to obtain vector IDs for pipelined children and
5767
/// batch IDs for batch children. Each child can only be bound once.
58-
fn bind(&self, ctx: &mut dyn BindContext) -> VortexResult<Box<dyn SourceKernel>>;
68+
fn bind(&self, ctx: &mut dyn BindContext) -> VortexResult<Box<dyn TransformKernel>>;
5969
}
6070

6171
/// The context used when binding an operator for execution.
6272
pub trait BindContext {
63-
/// Returns a [`VectorId`] that can be passed to the [`KernelContext`] within the body of
64-
/// the kernel to access the given child as a pipelined input vector.
65-
///
66-
/// # Panics
67-
///
68-
/// If the child index requested here was not marked as a pipelined child in
69-
/// [`PipelineTransform::is_pipelined_child`].
70-
fn pipelined_input(&self, child_idx: usize) -> VectorId;
71-
7273
/// Returns the batch input vector for the given child.
7374
///
7475
/// # Panics
7576
///
7677
/// If the child index requested here was marked as a pipelined child in
7778
/// [`PipelineTransform::is_pipelined_child`].
78-
fn batch_input(&self, child_idx: usize) -> BatchId;
79+
fn batch_input(&self, child_idx: usize) -> Vector;
7980
}
8081

81-
/// The ID of the vector to use.
82-
// TODO(ngates): make these opaque
83-
pub type VectorId = usize;
84-
pub type BatchId = usize;
85-
86-
/// A kernel implements the physical compute required for pipelined execution. It is driven in a
87-
/// push-based way, typically as part of a larger pipeline of kernels.
88-
///
89-
/// By passing multiple vector computations through the same operator, we can amortize
90-
/// the setup costs (such as DType validation, stats short-circuiting, etc.), and to make better
91-
/// use of CPU caches by performing all operations while the data is hot.
82+
/// A source kernel produces data to feed into pipelined execution.
9283
///
93-
/// The [`SourceKernel::step`] method will be invoked repeatedly to process chunks of data, [`N`]
94-
/// elements at a time. Each invocation is passed a selection mask indicating which elements of the
95-
/// chunk should be written to the start of the output vector.
84+
/// The kernel is provided with a mutable output vector that is guaranteed to have capacity for at
85+
/// least `2 * N` elements. Each invocation of the kernel is expected to append between `N` and
86+
/// `2 * N` elements to the output vector, except when the end of the data is reached.
9687
///
97-
/// The mutable output vector is **guaranteed** to have a capacity of at least [`N`] elements. The
98-
/// caller makes no guarantee about the initial length of the output vector; and the kernel is
99-
/// expected to append `selection.true_count()` elements.
88+
/// Vectors of `N` elements will be propagated throughout the pipeline, and any remaining elements
89+
/// will be passed back to the kernel on the next iteration and appear at the start of the output
90+
/// vector.
10091
///
101-
/// The pipeline may invoke the `SourceKernel::skip` method to skip over some number of chunks of data.
102-
/// The kernel should mutate any internal state as necessary to account for the skipped data.
92+
/// This kerfuffle allows kernels that are optimized for 1024-element chunks to operate efficiently,
93+
/// while avoiding passing very sparsely selected vectors throughout the pipeline.
10394
pub trait SourceKernel: Send {
104-
/// Skip over the given number of chunks of data.
105-
///
106-
/// For example, if `n` is 3, then the kernel should skip over `3 * N` elements of input data.
107-
fn skip(&mut self, n: usize);
108-
109-
/// Attempts to perform a single step of the operator, appending data to the output vector.
110-
///
111-
/// The provided selection mask indicates which elements of the current chunk should be
112-
/// appended to the output vector.
113-
///
114-
/// The provided output vector is guaranteed to have at least `N` elements of capacity.
115-
fn step(
116-
&mut self,
117-
ctx: &KernelContext,
118-
selection: &BitView,
119-
out: &mut VectorMut,
120-
) -> VortexResult<()>;
95+
/// Perform a single step of the kernel.
96+
fn step(&mut self, out: &mut VectorMut) -> VortexResult<()>;
12197
}
12298

99+
/// A transform kernel processes one or more input vectors and produces an output vector.
100+
///
101+
/// Besides the final chunk of data, each invocation of the kernel will be passed vectors of
102+
/// exactly `N` elements. The kernel **must** append exactly the same number of elements to its
103+
/// output vector.
104+
///
105+
/// The output vector is guaranteed to have at least `N` elements of capacity.
123106
pub trait TransformKernel: Send {
124-
/// Attempts to perform a single step of the operator, appending data to the output vector.
125-
///
126-
/// The input vectors can be accessed via the provided `KernelContext`.
127-
///
128-
/// The provided output vector is guaranteed to have at least `N` elements of capacity.
129-
fn step(&mut self, ctx: &KernelContext, out: &mut VectorMut) -> VortexResult<()>;
107+
/// Perform a single step of the kernel.
108+
fn step(&mut self, input: &Vector, out: &mut VectorMut) -> VortexResult<()>;
130109
}
131110

132-
/// Context passed to kernels during execution, providing access to vectors.
133-
pub struct KernelContext {
134-
/// The allocated vectors for intermediate results.
135-
pub(crate) vectors: Vec<Vector>,
136-
/// The batch input vectors.
137-
pub(crate) batch_inputs: Vec<Vector>,
111+
/// A transform kernel that takes multiple input vectors and produces an output vector.
112+
///
113+
/// The pipeline driver will ensure that each invocation of the kernel is passed vectors of equal
114+
/// length.
115+
///
116+
/// The output vector is guaranteed to have at least `N` elements of capacity.
117+
pub trait ZipTransformKernel: Send {
118+
/// Perform a single step of the kernel.
119+
fn step(&mut self, inputs: &[Vector], out: &mut VectorMut) -> VortexResult<()>;
138120
}
139121

140-
impl KernelContext {
141-
pub fn empty() -> Self {
142-
Self {
143-
vectors: Vec::new(),
144-
}
145-
}
146-
147-
/// Get a vector by its ID.
148-
pub fn vector(&self, vector_id: VectorId) -> &Vector {
149-
&self.vectors[vector_id]
150-
}
122+
/// A sink kernel consumes input vectors without producing output.
123+
pub trait SinkKernel: Send {
124+
/// Perform a single step of the kernel.
125+
fn step(&mut self, input: &Vector) -> VortexResult<()>;
151126

152-
/// Get a batch input by its ID.
153-
pub fn batch_input(&self, batch_id: BatchId) -> &Vector {
154-
&self.batch_inputs[batch_id]
155-
}
127+
/// Finalize the sink after all input has been processed.
128+
fn finalize(&mut self) -> VortexResult<Vector>;
156129
}
157130

158131
/// A general implementation of a source kernel that produces all null values.
159-
pub struct AllNullSourceKernel;
132+
pub struct AllNullSourceKernel {
133+
remaining: usize,
134+
}
160135

161136
impl SourceKernel for AllNullSourceKernel {
162-
fn skip(&mut self, _n: usize) {}
163-
164-
fn step(
165-
&mut self,
166-
_ctx: &KernelContext,
167-
selection: &BitView,
168-
out: &mut VectorMut,
169-
) -> VortexResult<()> {
170-
out.append_nulls(selection.true_count());
137+
fn step(&mut self, out: &mut VectorMut) -> VortexResult<()> {
138+
let to_produce = self.remaining.min(N);
139+
self.remaining -= to_produce;
140+
out.append_nulls(to_produce);
171141
Ok(())
172142
}
173143
}

vortex-array/src/vtable/operator.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use vortex_error::{VortexResult, vortex_bail};
4+
use vortex_error::{vortex_bail, VortexResult};
55
use vortex_mask::Mask;
66
use vortex_vector::Vector;
77

8-
use crate::ArrayRef;
98
use crate::array::IntoArray;
109
use crate::execution::{BatchKernelRef, BindCtx, ExecutionCtx};
11-
use crate::pipeline::{PipelineSource, PipelineTransform};
10+
use crate::pipeline::{PipelineSource, PipelineTransform, PipelineZipTransform};
1211
use crate::vtable::{NotSupported, VTable};
12+
use crate::ArrayRef;
1313

1414
/// A vtable for the new operator-based array functionality. Eventually this vtable will be
1515
/// merged into the main `VTable`, but for now it is kept separate to allow for incremental
@@ -110,6 +110,8 @@ pub enum PipelineNode<'a> {
110110
Source(&'a dyn PipelineSource),
111111
/// This node is a transformation node in a pipeline.
112112
Transform(&'a dyn PipelineTransform),
113+
/// This node is a zip transformation node in a pipeline.
114+
ZipTransform(&'a dyn PipelineZipTransform),
113115
}
114116

115117
impl<V: VTable> OperatorVTable<V> for NotSupported {

0 commit comments

Comments
 (0)