Skip to content

Commit e262a87

Browse files
authored
Pipeline Calling Convention 2 (#5291)
* Guarantees that output vectors have length == 0 * Does not guarantee capacity. * Inputs are either N or n elements, and defines that in-place but non-selected elements _must_ be valid. Signed-off-by: Nicholas Gates <[email protected]>
1 parent 69377e1 commit e262a87

File tree

3 files changed

+39
-77
lines changed

3 files changed

+39
-77
lines changed

vortex-array/src/pipeline/driver/allocation.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ use vortex_vector::VectorMut;
88

99
use crate::Array;
1010
use crate::pipeline::driver::{Node, NodeId};
11-
use crate::pipeline::{N, PipelineVector, VectorId};
11+
use crate::pipeline::{N, VectorId};
1212

1313
#[derive(Debug)]
1414
pub struct VectorAllocation {
1515
/// Where each node writes its output
1616
pub(crate) output_targets: Vec<OutputTarget>,
1717
/// The actual allocated vectors
18-
pub(crate) vectors: Vec<PipelineVector>,
18+
pub(crate) vectors: Vec<VectorMut>,
1919
}
2020

2121
// TODO(joe): support in-place view operations
@@ -86,7 +86,7 @@ pub(super) fn allocate_vectors(
8686
.collect(),
8787
vectors: allocation_types
8888
.into_iter()
89-
.map(|dtype| PipelineVector::Compact(VectorMut::with_capacity(dtype, N)))
89+
.map(|dtype| VectorMut::with_capacity(dtype, N))
9090
.collect(),
9191
})
9292
}

vortex-array/src/pipeline/driver/mod.rs

Lines changed: 20 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::pipeline::bit_view::{BitView, BitViewExt};
1919
use crate::pipeline::driver::allocation::{OutputTarget, allocate_vectors};
2020
use crate::pipeline::driver::bind::bind_kernels;
2121
use crate::pipeline::driver::toposort::topological_sort;
22-
use crate::pipeline::{Kernel, KernelCtx, N, PipelineInputs, PipelineVector};
22+
use crate::pipeline::{Kernel, KernelCtx, N, PipelineInputs};
2323
use crate::{Array, ArrayEq, ArrayHash, ArrayOperator, ArrayRef, ArrayVisitor, Precision};
2424

2525
/// A pipeline driver takes a Vortex array and executes it into a canonical vector.
@@ -278,56 +278,38 @@ impl Pipeline {
278278
// take the intermediate vector and write into that.
279279
match &self.output_targets[node_idx] {
280280
OutputTarget::ExternalOutput => {
281-
assert!(
282-
output.capacity() >= N,
283-
"Insufficient capacity in external output vector"
284-
);
285-
286-
let prev_output_len = output.len();
287-
kernel.step(&self.ctx, selection, output)?;
288-
289-
let added_len = output.len() - prev_output_len;
290-
match added_len {
291-
N => {
292-
// If the kernel added N elements, the output is in-place.
293-
// TODO(ngates): we need to filter if the true count is not N.
294-
}
295-
_ if added_len == selection.true_count() => {
296-
// If the kernel added exactly the number of selected elements,
297-
// the output is already compacted into the start of the vector.
298-
}
299-
_ => vortex_bail!(
300-
"Kernel produced incorrect number of output elements, expected to append either {} or {}, got {}",
281+
// We split off the next N elements of capacity from the external output vector.
282+
let mut tail = output.split_off(output.len());
283+
assert!(tail.is_empty());
284+
285+
kernel.step(&self.ctx, selection, &mut tail)?;
286+
if tail.len() != N && tail.len() != selection.true_count() {
287+
vortex_bail!(
288+
"Kernel produced incorrect number of output elements, expected either {} or {}, got {}",
301289
N,
302290
selection.true_count(),
303-
added_len
304-
),
291+
tail.len()
292+
);
305293
}
294+
295+
// Now we append the produced output back to the main output vector.
296+
output.unsplit(tail);
306297
}
307298
OutputTarget::IntermediateVector(vector_id) => {
308-
let mut out_vector = VectorMut::from(self.ctx.take_output(vector_id));
299+
let mut out_vector = self.ctx.take_output(vector_id);
309300
out_vector.clear();
310-
assert!(
311-
out_vector.capacity() >= N,
312-
"Insufficient capacity in intermediate vector"
313-
);
314301

302+
assert!(out_vector.is_empty());
315303
kernel.step(&self.ctx, selection, &mut out_vector)?;
316304

317305
match out_vector.len() {
318-
N => {
306+
// Valid cases are all N elements, or only the selected elements.
307+
n if n == N || n == selection.true_count() => {
319308
// If the kernel added N elements, the output is in-place.
320-
self.ctx
321-
.replace_output(vector_id, PipelineVector::InPlace(out_vector));
322-
}
323-
_ if out_vector.len() == selection.true_count() => {
324-
// If the kernel added exactly the number of selected elements,
325-
// the output is already compacted into the start of the vector.
326-
self.ctx
327-
.replace_output(vector_id, PipelineVector::Compact(out_vector));
309+
self.ctx.replace_output(vector_id, out_vector);
328310
}
329311
_ => vortex_bail!(
330-
"Kernel produced incorrect number of output elements, expected to append either {} or {}, got {}",
312+
"Kernel produced incorrect number of output elements, expected either {} or {}, got {}",
331313
N,
332314
selection.true_count(),
333315
out_vector.len()

vortex-array/src/pipeline/mod.rs

Lines changed: 16 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -67,19 +67,19 @@ pub trait BindContext {
6767
/// Each step of the kernel processes zero or more input vectors, and writes output to a
6868
/// pre-allocated mutable output vector.
6969
///
70-
/// Input vectors are provided via the [`KernelCtx`] and indicate the position of their elements
71-
/// as either [`PipelineVector::InPlace`] or [`PipelineVector::Compact`] based on whether
72-
/// the selected elements are in their original positions or compacted at the start of the vector
73-
/// respectively.
70+
/// Input vectors will either have length [`N`], indicating that all elements from the step are
71+
/// present. Or they will have length equal to the [`BitView::true_count`] of the selection mask,
72+
/// in which case only the selected elements are present.
7473
///
75-
/// The provided mutable output vector is guaranteed to have at least `N` elements of capacity.
76-
/// The kernel **must** append either [`BitView::true_count`] elements to the output vector (in
77-
/// which case the output elements are considered to be in the "Compact" position), or it must
78-
/// append `N` elements (in which case the output elements are considered to be in their "InPlace"
79-
/// positions). The pipeline driver will assert these conditions after each step.
74+
/// Output vectors will always be passed with length zero.
8075
///
81-
/// Note that the output vector may not be empty at the start of the step. The kernel must append
82-
/// its output to the existing contents of the output vector, rather than replacing it.
76+
/// Kernels may choose to output either all `N` elements in their original positions, or output
77+
/// only the selected elements to the first `true_count` positions of the output vector. When
78+
/// emitting `N` elements in-place, the kernel may omit expensive computations over the unselected
79+
/// elements, provided that the output elements in those positions are still valid (i.e. typically
80+
/// zeroed, rather than undefined).
81+
///
82+
/// The pipeline driver will verify these conditions before and after each step.
8383
pub trait Kernel: Send {
8484
/// Perform a single step of the kernel.
8585
fn step(
@@ -92,11 +92,11 @@ pub trait Kernel: Send {
9292

9393
/// The context provided to kernels during execution to access input vectors.
9494
pub struct KernelCtx {
95-
vectors: Vec<Option<PipelineVector>>,
95+
vectors: Vec<Option<VectorMut>>,
9696
}
9797

9898
impl KernelCtx {
99-
fn new(vectors: Vec<PipelineVector>) -> Self {
99+
fn new(vectors: Vec<VectorMut>) -> Self {
100100
Self {
101101
vectors: vectors.into_iter().map(Some).collect(),
102102
}
@@ -112,21 +112,21 @@ impl KernelCtx {
112112
///
113113
/// If the input vector at the given index is not available (typically because the vector
114114
/// happens to be currently borrowed as an output vector!).
115-
pub fn input(&mut self, id: VectorId) -> &PipelineVector {
115+
pub fn input(&mut self, id: VectorId) -> &VectorMut {
116116
self.vectors[id.0]
117117
.as_ref()
118118
.vortex_expect("Input vector at index is not available")
119119
}
120120

121121
#[inline]
122-
fn take_output(&mut self, id: &VectorId) -> PipelineVector {
122+
fn take_output(&mut self, id: &VectorId) -> VectorMut {
123123
self.vectors[id.0]
124124
.take()
125125
.vortex_expect("Output vector at index is not available")
126126
}
127127

128128
#[inline]
129-
fn replace_output(&mut self, id: &VectorId, vec: PipelineVector) {
129+
fn replace_output(&mut self, id: &VectorId, vec: VectorMut) {
130130
self.vectors[id.0] = Some(vec);
131131
}
132132
}
@@ -140,23 +140,3 @@ impl VectorId {
140140
VectorId(idx)
141141
}
142142
}
143-
144-
/// A pipeline vector passed into and out of pipeline kernels.
145-
#[derive(Debug)]
146-
pub enum PipelineVector {
147-
/// `InPlace` indicates that elements are in their original positions, where the selected
148-
/// elements are identified by true values in the selection mask.
149-
InPlace(VectorMut),
150-
/// Compact indicates that the selected elements are compacted at the start of the vector in
151-
/// positions `0..true_count`.
152-
Compact(VectorMut),
153-
}
154-
155-
impl From<PipelineVector> for VectorMut {
156-
fn from(value: PipelineVector) -> Self {
157-
match value {
158-
PipelineVector::InPlace(vec) => vec,
159-
PipelineVector::Compact(vec) => vec,
160-
}
161-
}
162-
}

0 commit comments

Comments
 (0)