Skip to content

Commit ef87e49

Browse files
authored
Clarify calling convention for pipeline kernels (#5286)
And implement a zero-copy input kernel --------- Signed-off-by: Nicholas Gates <[email protected]>
1 parent fcf2d98 commit ef87e49

File tree

13 files changed

+252
-87
lines changed

13 files changed

+252
-87
lines changed

vortex-array/src/pipeline/bit_view.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,8 @@ impl<'a> BitView<'a> {
216216
///
217217
/// The function `f` receives a [`BitSlice`] containing the inclusive `start` bit as well as
218218
/// the length.
219+
///
220+
/// FIXME(ngates): this is still broken.
219221
pub fn iter_slices<F>(&self, mut f: F)
220222
where
221223
F: FnMut(BitSlice),

vortex-array/src/pipeline/driver/allocation.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ pub(super) fn allocate_vectors(
8686
.collect(),
8787
vectors: allocation_types
8888
.into_iter()
89-
.map(|dtype| PipelineVector::Compact(VectorMut::with_capacity(dtype, 2 * N)))
89+
.map(|dtype| PipelineVector::Compact(VectorMut::with_capacity(dtype, N)))
9090
.collect(),
9191
})
9292
}

vortex-array/src/pipeline/driver/bind.rs

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,18 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
use vortex_error::{VortexExpect, VortexResult};
5-
use vortex_vector::Vector;
5+
use vortex_vector::{Vector, VectorOps};
66

77
use crate::array::ArrayOperator;
8-
use crate::pipeline::driver::Node;
98
use crate::pipeline::driver::allocation::VectorAllocation;
9+
use crate::pipeline::driver::input::InputKernel;
10+
use crate::pipeline::driver::{Node, NodeKind};
1011
use crate::pipeline::{BindContext, Kernel, VectorId};
1112

1213
pub(crate) fn bind_kernels(
1314
dag: &[Node],
1415
allocation_plan: &VectorAllocation,
15-
all_batch_inputs: &[Vector],
16+
mut all_batch_inputs: Vec<Option<Vector>>,
1617
) -> VortexResult<Vec<Box<dyn Kernel>>> {
1718
let mut kernels = Vec::with_capacity(dag.len());
1819
for node in dag {
@@ -26,38 +27,51 @@ pub(crate) fn bind_kernels(
2627
})
2728
.collect::<Vec<_>>();
2829

29-
let batch_inputs: Vec<_> = node
30+
let mut batch_inputs: Vec<_> = node
3031
.batch_inputs
3132
.iter()
32-
.map(|idx| all_batch_inputs[*idx].clone())
33+
.map(|idx| all_batch_inputs[*idx].take())
3334
.collect();
3435

35-
let bind_context = PipelineBindContext {
36-
children: &input_ids,
37-
batch_inputs: &batch_inputs,
38-
};
36+
kernels.push(match node.array.as_pipelined() {
37+
None => {
38+
// If the node cannot be pipelined, it must be an input node
39+
assert_eq!(node.kind, NodeKind::Input);
40+
assert_eq!(node.batch_inputs.len(), 1);
41+
let batch_id = node.batch_inputs[0];
3942

40-
let pipelined = node
41-
.array
42-
.as_pipelined()
43-
.vortex_expect("Array in pipeline DAG does not support pipelined execution");
43+
let batch = batch_inputs[batch_id]
44+
.take()
45+
.vortex_expect("Batch input vector has already been consumed")
46+
.into_mut();
4447

45-
kernels.push(pipelined.bind(&bind_context)?);
48+
Box::new(InputKernel::new(batch))
49+
}
50+
Some(pipelined) => {
51+
let bind_context = PipelineBindContext {
52+
children: &input_ids,
53+
batch_inputs: &mut batch_inputs,
54+
};
55+
pipelined.bind(&bind_context)?
56+
}
57+
});
4658
}
4759
Ok(kernels)
4860
}
4961

5062
struct PipelineBindContext<'a> {
5163
children: &'a [VectorId],
52-
batch_inputs: &'a [Vector],
64+
batch_inputs: &'a mut [Option<Vector>],
5365
}
5466

5567
impl BindContext for PipelineBindContext<'_> {
5668
fn pipelined_input(&self, pipelined_child_idx: usize) -> VectorId {
5769
self.children[pipelined_child_idx]
5870
}
5971

60-
fn batch_input(&self, batch_child_idx: usize) -> Vector {
61-
self.batch_inputs[batch_child_idx].clone()
72+
fn batch_input(&mut self, batch_child_idx: usize) -> Vector {
73+
self.batch_inputs[batch_child_idx]
74+
.take()
75+
.vortex_expect("Batch input vector has already been consumed")
6276
}
6377
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use vortex_error::{VortexExpect, VortexResult};
5+
use vortex_vector::{VectorMut, VectorMutOps, VectorOps};
6+
7+
use crate::pipeline::bit_view::BitView;
8+
use crate::pipeline::{Kernel, KernelCtx, N};
9+
10+
/// A kernel that feeds a batch vector into the pipeline in chunks of size `N` with zero-copy.
11+
pub(super) struct InputKernel {
12+
// The batch vector to be fed into the pipeline.
13+
batch: Option<VectorMut>,
14+
}
15+
16+
impl InputKernel {
17+
/// Create a new input kernel with the given batch vector.
18+
pub(super) fn new(batch: VectorMut) -> Self {
19+
Self { batch: Some(batch) }
20+
}
21+
}
22+
23+
impl Kernel for InputKernel {
24+
fn step(
25+
&mut self,
26+
_ctx: &KernelCtx,
27+
selection: &BitView,
28+
out: &mut VectorMut,
29+
) -> VortexResult<()> {
30+
let mut batch = self
31+
.batch
32+
.take()
33+
.vortex_expect("Input kernel has already been exhausted");
34+
let remaining = batch.len();
35+
36+
// The ideal thing to do here is to split off a chunk of size N from our owned batch vector,
37+
// and then unsplit it onto the output vector. This should be a zero-copy operation in both
38+
// cases, regardless of whether the output vector is the root output of the pipeline or an
39+
// intermediate vector that gets cleared on each iteration.
40+
//
41+
// The only case this doesn't work, is when we have fewer than N elements left in our batch
42+
// vector, _and_ the selection vector is not simply a dense prefix. In this case, we copy
43+
// the remaining elements into the output.
44+
if remaining < N && selection.true_count() < remaining {
45+
// TODO(ngates): this is slow. We should instead unsplit the vector, and then manually
46+
// run a compaction over the vector.
47+
let immutable = batch.freeze();
48+
selection.iter_ones(|idx| {
49+
out.extend_from_vector(&immutable.slice(idx..idx + 1));
50+
});
51+
return Ok(());
52+
}
53+
54+
// We split off from our owned batch vector in chunks of size N, and then unsplit onto the
55+
// output vector. Both of these operations should be zero-copy.
56+
let mut split = batch.split_off(N.min(remaining));
57+
58+
// Split-off leaves [0, at) in self.batch, and returns [at, ..)
59+
// So we swap the remainder back into self.batch for the next iteration
60+
std::mem::swap(&mut split, &mut batch);
61+
62+
// If the output vector is the end of the pipeline, then each step we will be given back
63+
// the same output to append to, and unsplit will be zero-copy.
64+
// If the output vector is an intermediate vector, then it will be empty at the start of
65+
// each step, and unsplit will also be zero-copy.
66+
out.unsplit(split);
67+
68+
self.batch = Some(batch);
69+
70+
Ok(())
71+
}
72+
}
73+
74+
#[cfg(test)]
75+
mod test {
76+
use vortex_buffer::{bitbuffer, buffer};
77+
use vortex_dtype::PTypeDowncastExt;
78+
use vortex_mask::Mask;
79+
80+
use crate::pipeline::driver::PipelineDriver;
81+
use crate::{Array, ArrayOperator, IntoArray};
82+
83+
#[test]
84+
fn test_pipeline_input() {
85+
let array = buffer![123u32; 8000].into_array();
86+
assert!(
87+
array.as_pipelined().is_none(),
88+
"We're explicitly testing non-pipelined arrays"
89+
);
90+
91+
let selection = Mask::new_true(array.len());
92+
let vector = PipelineDriver::new(array)
93+
.execute(&selection)
94+
.unwrap()
95+
.into_primitive()
96+
.downcast::<u32>();
97+
assert_eq!(vector.elements().as_ref(), &[123u32; 8000]);
98+
}
99+
100+
#[test]
101+
fn test_pipeline_input_with_selection() {
102+
let array = buffer![0u32, 1, 2, 3, 4].into_array();
103+
assert!(
104+
array.as_pipelined().is_none(),
105+
"We're explicitly testing non-pipelined arrays"
106+
);
107+
108+
let selection = Mask::from(bitbuffer![1 0 1 0 1]);
109+
let vector = PipelineDriver::new(array)
110+
.execute(&selection)
111+
.unwrap()
112+
.into_primitive()
113+
.downcast::<u32>();
114+
assert_eq!(vector.elements().as_ref(), &[0u32, 2, 4]);
115+
}
116+
}

vortex-array/src/pipeline/driver/mod.rs

Lines changed: 51 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
pub mod allocation;
55
mod bind;
6+
mod input;
67
mod toposort;
78

89
use std::hash::{BuildHasher, Hash, Hasher};
@@ -18,7 +19,7 @@ use crate::pipeline::bit_view::{BitView, BitViewExt};
1819
use crate::pipeline::driver::allocation::{OutputTarget, allocate_vectors};
1920
use crate::pipeline::driver::bind::bind_kernels;
2021
use crate::pipeline::driver::toposort::topological_sort;
21-
use crate::pipeline::{ElementPosition, Kernel, KernelCtx, N, PipelineInputs, PipelineVector};
22+
use crate::pipeline::{Kernel, KernelCtx, N, PipelineInputs, PipelineVector};
2223
use crate::{Array, ArrayEq, ArrayHash, ArrayOperator, ArrayRef, ArrayVisitor, Precision};
2324

2425
/// A pipeline driver takes a Vortex array and executes it into a canonical vector.
@@ -59,12 +60,10 @@ struct Node {
5960
batch_inputs: Vec<BatchId>,
6061
}
6162

62-
#[derive(Debug, Clone, Copy)]
63+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6364
enum NodeKind {
64-
/// A view node acts as a pipeline source, but is fed into the pipeline by taking zero-copy
65-
/// slices of a batch vector. This occurs when a node declares its child as pipelined, but the
66-
/// child itself doesn't support pipelined execution.
67-
View,
65+
/// An input node feeds a batch vector into the pipeline chunk-by-chunk.
66+
Input,
6867
/// A source node provides input to the pipeline by writing into mutable output vectors one
6968
/// batch at a time.
7069
Source,
@@ -102,7 +101,7 @@ impl PipelineDriver {
102101

103102
Node {
104103
array,
105-
kind: NodeKind::View,
104+
kind: NodeKind::Input,
106105
children: vec![],
107106
parents: vec![],
108107
batch_inputs: vec![batch_id],
@@ -200,7 +199,7 @@ impl PipelineDriver {
200199
let batch_inputs: Vec<_> = self
201200
.batch_inputs
202201
.into_iter()
203-
.map(|array| array.execute())
202+
.map(|array| array.execute().map(Some))
204203
.try_collect()?;
205204

206205
// Compute the toposort of the DAG
@@ -210,7 +209,7 @@ impl PipelineDriver {
210209
let allocation_plan = allocate_vectors(&self.dag, &exec_order)?;
211210

212211
// Bind each node in the DAG to create its kernel
213-
let kernels = bind_kernels(&self.dag, &allocation_plan, &batch_inputs)?;
212+
let kernels = bind_kernels(&self.dag, &allocation_plan, batch_inputs)?;
214213

215214
// Construct the kernel execution context
216215
let ctx = KernelCtx::new(allocation_plan.vectors);
@@ -279,47 +278,61 @@ impl Pipeline {
279278
// take the intermediate vector and write into that.
280279
match &self.output_targets[node_idx] {
281280
OutputTarget::ExternalOutput => {
281+
assert!(
282+
output.capacity() >= N,
283+
"Insufficient capacity in external output vector"
284+
);
285+
282286
let prev_output_len = output.len();
283-
let position = kernel.step(&self.ctx, selection, output)?;
284-
if output.len() != prev_output_len + N {
285-
vortex_bail!(
286-
"Kernel produced incorrect number of output elements, expected {}, got {}",
287-
prev_output_len + N,
288-
output.len()
289-
);
290-
}
287+
kernel.step(&self.ctx, selection, output)?;
291288

292-
match position {
293-
ElementPosition::Sparse => {
294-
// The output is in sparse form, we need to compact it based on the
295-
// selection mask.
296-
// TODO(ngates): we need to implement compaction here.
297-
todo!()
289+
let added_len = output.len() - prev_output_len;
290+
match added_len {
291+
N => {
292+
// If the kernel added N elements, the output is in-place.
293+
// TODO(ngates): we need to filter if the true count is not N.
298294
}
299-
ElementPosition::Compact => {
300-
// The output is already compacted, we just need to adjust the length
301-
// to cover only the selected elements.
302-
output.truncate(prev_output_len + selection.true_count());
295+
_ if added_len == selection.true_count() => {
296+
// If the kernel added exactly the number of selected elements,
297+
// the output is already compacted into the start of the vector.
303298
}
299+
_ => vortex_bail!(
300+
"Kernel produced incorrect number of output elements, expected to append either {} or {}, got {}",
301+
N,
302+
selection.true_count(),
303+
added_len
304+
),
304305
}
305306
}
306307
OutputTarget::IntermediateVector(vector_id) => {
307308
let mut out_vector = VectorMut::from(self.ctx.take_output(vector_id));
308309
out_vector.clear();
309-
310-
let position = kernel.step(&self.ctx, selection, &mut out_vector)?;
311-
if out_vector.len() != N {
312-
vortex_bail!(
313-
"Kernel produced incorrect number of output elements, expected {}, got {}",
310+
assert!(
311+
out_vector.capacity() >= N,
312+
"Insufficient capacity in intermediate vector"
313+
);
314+
315+
kernel.step(&self.ctx, selection, &mut out_vector)?;
316+
317+
match out_vector.len() {
318+
N => {
319+
// If the kernel added N elements, the output is in-place.
320+
self.ctx
321+
.replace_output(vector_id, PipelineVector::InPlace(out_vector));
322+
}
323+
_ if out_vector.len() == selection.true_count() => {
324+
// If the kernel added exactly the number of selected elements,
325+
// the output is already compacted into the start of the vector.
326+
self.ctx
327+
.replace_output(vector_id, PipelineVector::Compact(out_vector));
328+
}
329+
_ => vortex_bail!(
330+
"Kernel produced incorrect number of output elements, expected to append either {} or {}, got {}",
314331
N,
332+
selection.true_count(),
315333
out_vector.len()
316-
);
334+
),
317335
}
318-
319-
// Wrap the output vector back into a PipelineVector, indicating which position
320-
// the elements are in.
321-
let out_vector = PipelineVector::from_position(position, out_vector);
322-
self.ctx.replace_output(vector_id, out_vector)
323336
}
324337
};
325338
}

0 commit comments

Comments
 (0)