Skip to content

Commit dd834f0

Browse files
committed
Sparse pipeline driver
Signed-off-by: Nicholas Gates <[email protected]>
1 parent 41362bb commit dd834f0

File tree

22 files changed

+136
-83
lines changed

22 files changed

+136
-83
lines changed

vortex-array/src/array/operator.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33

44
use std::sync::Arc;
55

6-
use vortex_error::{vortex_panic, VortexResult};
6+
use vortex_error::{VortexResult, vortex_panic};
77
use vortex_mask::Mask;
8-
use vortex_vector::{vector_matches_dtype, Vector, VectorOps};
8+
use vortex_vector::{Vector, VectorOps, vector_matches_dtype};
99

1010
use crate::execution::{BatchKernelRef, BindCtx, DummyExecutionCtx, ExecutionCtx};
11-
use crate::pipeline::driver::PipelineDriver;
1211
use crate::pipeline::PipelinedNode;
12+
use crate::pipeline::driver::PipelineDriver;
1313
use crate::vtable::{OperatorVTable, VTable};
1414
use crate::{Array, ArrayAdapter, ArrayRef};
1515

vortex-array/src/pipeline/bit_view.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
use std::borrow::Cow;
55
use std::fmt::{Debug, Formatter};
66

7-
use crate::pipeline::{N, N_BYTES, N_WORDS};
87
use vortex_buffer::BitBuffer;
98
use vortex_error::VortexResult;
109

10+
use crate::pipeline::{N, N_BYTES, N_WORDS};
11+
1112
/// A borrowed fixed-size bit vector of length `N` bits, represented as an array of usize words.
1213
///
1314
/// This struct is designed to provide a view over a Vortex [`vortex_buffer::BitBuffer`], therefore

vortex-array/src/pipeline/driver/allocation.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,13 @@
33

44
//! Vector allocation strategy for pipelines
55
6-
use crate::pipeline::driver::{Node, NodeId};
7-
use crate::pipeline::{PipelineVector, VectorId, N};
8-
use crate::Array;
96
use vortex_error::{VortexExpect, VortexResult};
107
use vortex_vector::VectorMut;
118

9+
use crate::Array;
10+
use crate::pipeline::driver::{Node, NodeId};
11+
use crate::pipeline::{N, PipelineVector, VectorId};
12+
1213
#[derive(Debug)]
1314
pub struct VectorAllocation {
1415
/// Where each node writes its output

vortex-array/src/pipeline/driver/bind.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
use vortex_error::{VortexExpect, VortexResult};
5+
use vortex_vector::Vector;
6+
47
use crate::array::ArrayOperator;
5-
use crate::pipeline::driver::allocation::VectorAllocation;
68
use crate::pipeline::driver::Node;
9+
use crate::pipeline::driver::allocation::VectorAllocation;
710
use crate::pipeline::{BindContext, Kernel, VectorId};
8-
use vortex_error::{VortexExpect, VortexResult};
9-
use vortex_vector::Vector;
1011

1112
pub(crate) fn bind_kernels(
1213
dag: &[Node],

vortex-array/src/pipeline/driver/mod.rs

Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,24 @@
33

44
pub mod allocation;
55
mod bind;
6-
mod input;
76
mod toposort;
87

9-
use std::any::Any;
108
use std::hash::{BuildHasher, Hash, Hasher};
119

12-
use crate::pipeline::bit_view::{BitView, BitViewExt};
13-
use crate::pipeline::driver::allocation::{allocate_vectors, OutputTarget};
14-
use crate::pipeline::driver::bind::bind_kernels;
15-
use crate::pipeline::driver::toposort::topological_sort;
16-
use crate::pipeline::{ElementPosition, Kernel, KernelCtx, PipelineInputs, PipelineVector, N};
17-
use crate::{Array, ArrayEq, ArrayHash, ArrayOperator, ArrayRef, ArrayVisitor, Precision};
1810
use itertools::Itertools;
19-
use vortex_dtype::{DType, NativePType};
20-
use vortex_error::{vortex_bail, VortexExpect, VortexResult};
11+
use vortex_dtype::DType;
12+
use vortex_error::{VortexResult, vortex_bail};
2113
use vortex_mask::Mask;
2214
use vortex_utils::aliases::hash_map::{HashMap, RandomState};
2315
use vortex_vector::{Vector, VectorMut, VectorMutOps};
2416

17+
use crate::pipeline::bit_view::{BitView, BitViewExt};
18+
use crate::pipeline::driver::allocation::{OutputTarget, allocate_vectors};
19+
use crate::pipeline::driver::bind::bind_kernels;
20+
use crate::pipeline::driver::toposort::topological_sort;
21+
use crate::pipeline::{ElementPosition, Kernel, KernelCtx, N, PipelineInputs, PipelineVector};
22+
use crate::{Array, ArrayEq, ArrayHash, ArrayOperator, ArrayRef, ArrayVisitor, Precision};
23+
2524
/// A pipeline driver takes a Vortex array and executes it into a canonical vector.
2625
///
2726
/// The driver builds up a DAG of pipeline nodes from the array tree up to the edges of this
@@ -50,6 +49,7 @@ struct Node {
5049
// This node's underlying array.
5150
array: ArrayRef,
5251
/// The type of pipeline node.
52+
#[allow(dead_code)] // TODO(ngates): pipeline execute does not yet use this
5353
kind: NodeKind,
5454
// The indices of the pipelined children nodes in the `nodes` vector.
5555
children: Vec<NodeId>,
@@ -68,8 +68,8 @@ enum NodeKind {
6868
/// A source node provides input to the pipeline by writing into mutable output vectors one
6969
/// batch at a time.
7070
Source,
71+
/// A transform node takes pipelined inputs from its children and produces output vectors
7172
Transform,
72-
Zip,
7373
}
7474

7575
impl PipelineDriver {
@@ -247,21 +247,21 @@ impl Pipeline {
247247
// The number of _full_ chunks we need to process.
248248
let nchunks = selection.len() / N;
249249
for _ in 0..nchunks {
250-
self.step(&mut self.ctx, &BitView::all_true(), &mut output)?;
250+
self.step(&BitView::all_true(), &mut output)?;
251251
}
252252

253253
// Now process the final partial chunk, if any.
254254
let remaining = selection.len() % N;
255255
if remaining > 0 {
256256
let selection_view = BitView::with_prefix(remaining);
257-
self.step(&mut self.ctx, &selection_view, &mut output)?;
257+
self.step(&selection_view, &mut output)?;
258258
}
259259
}
260260
Mask::Values(mask_values) => {
261261
// Loop over each chunk of N elements in the mask as a bit view.
262262
let selection_bits = mask_values.bit_buffer();
263263
for selection_view in selection_bits.iter_bit_views() {
264-
self.step(&mut self.ctx, &selection_view, &mut output)?;
264+
self.step(&selection_view, &mut output)?;
265265
}
266266
}
267267
}
@@ -270,12 +270,7 @@ impl Pipeline {
270270
}
271271

272272
/// Perform a single step of the pipeline.
273-
fn step(
274-
&self,
275-
ctx: &mut KernelCtx,
276-
selection: &BitView,
277-
output: &mut VectorMut,
278-
) -> VortexResult<()> {
273+
fn step(&mut self, selection: &BitView, output: &mut VectorMut) -> VortexResult<()> {
279274
// Loop over the kernels in toposorted execution order
280275
for &node_idx in self.exec_order.iter() {
281276
let kernel = &mut self.kernels[node_idx];
@@ -284,12 +279,12 @@ impl Pipeline {
284279
// take the intermediate vector and write into that.
285280
match &self.output_targets[node_idx] {
286281
OutputTarget::ExternalOutput => {
287-
let output_len = output.len();
288-
let position = kernel.step(ctx, selection, output)?;
289-
if output.len() != output_len + N {
282+
let prev_output_len = output.len();
283+
let position = kernel.step(&self.ctx, selection, output)?;
284+
if output.len() != prev_output_len + N {
290285
vortex_bail!(
291286
"Kernel produced incorrect number of output elements, expected {}, got {}",
292-
output_len + N,
287+
prev_output_len + N,
293288
output.len()
294289
);
295290
}
@@ -304,16 +299,15 @@ impl Pipeline {
304299
ElementPosition::Compact => {
305300
// The output is already compacted, we just need to adjust the length
306301
// to cover only the selected elements.
307-
assert!(selection.true_count() <= N);
308-
unsafe { output.set_len(output_len + selection.true_count()) }
302+
output.truncate(prev_output_len + selection.true_count());
309303
}
310304
}
311305
}
312306
OutputTarget::IntermediateVector(vector_id) => {
313-
let mut out_vector = VectorMut::from(ctx.take_output(vector_id));
307+
let mut out_vector = VectorMut::from(self.ctx.take_output(vector_id));
314308
out_vector.clear();
315309

316-
let position = kernel.step(ctx, selection, &mut out_vector)?;
310+
let position = kernel.step(&self.ctx, selection, &mut out_vector)?;
317311
if out_vector.len() != N {
318312
vortex_bail!(
319313
"Kernel produced incorrect number of output elements, expected {}, got {}",
@@ -325,7 +319,7 @@ impl Pipeline {
325319
// Wrap the output vector back into a PipelineVector, indicating which position
326320
// the elements are in.
327321
let out_vector = PipelineVector::from_position(position, out_vector);
328-
ctx.replace_output(vector_id, out_vector)
322+
self.ctx.replace_output(vector_id, out_vector)
329323
}
330324
};
331325
}

vortex-array/src/pipeline/driver/toposort.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
use std::collections::VecDeque;
55

6-
use vortex_error::{vortex_bail, VortexResult};
6+
use vortex_error::{VortexResult, vortex_bail};
77

88
use crate::pipeline::driver::{Node, NodeId};
99

vortex-array/src/pipeline/mod.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,10 @@
44
pub mod bit_view;
55
pub mod driver;
66

7-
use std::ops::Deref;
8-
97
use vortex_error::{VortexExpect, VortexResult};
10-
use vortex_vector::{Vector, VectorMut, VectorMutOps};
8+
use vortex_vector::{Vector, VectorMut};
119

1210
use crate::pipeline::bit_view::BitView;
13-
use crate::Array;
1411

1512
/// The number of elements in each step of a Vortex evaluation operator.
1613
pub const N: usize = 1024;
@@ -28,7 +25,7 @@ pub trait PipelinedNode {
2825
fn inputs(&self) -> PipelineInputs;
2926

3027
/// Bind the node into a [`Kernel`] for pipelined execution.
31-
fn bind(&self, ctx: &mut dyn BindContext) -> VortexResult<Box<dyn Kernel>>;
28+
fn bind(&self, ctx: &dyn BindContext) -> VortexResult<Box<dyn Kernel>>;
3229
}
3330

3431
/// Describes the type of pipeline node and its input information.
@@ -144,11 +141,6 @@ impl VectorId {
144141
fn new(idx: usize) -> Self {
145142
VectorId(idx)
146143
}
147-
148-
// Non-public getter to keep the type opaque to end users.
149-
fn as_usize(&self) -> usize {
150-
self.0
151-
}
152144
}
153145

154146
/// A pipeline vector passed into and out of pipeline kernels.

vortex-array/src/vtable/operator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use vortex_error::{vortex_bail, VortexResult};
4+
use vortex_error::{VortexResult, vortex_bail};
55
use vortex_mask::Mask;
66
use vortex_vector::Vector;
77

8+
use crate::ArrayRef;
89
use crate::array::IntoArray;
910
use crate::execution::{BatchKernelRef, BindCtx, ExecutionCtx};
1011
use crate::pipeline::PipelinedNode;
1112
use crate::vtable::{NotSupported, VTable};
12-
use crate::ArrayRef;
1313

1414
/// A vtable for the new operator-based array functionality. Eventually this vtable will be
1515
/// merged into the main `VTable`, but for now it is kept separate to allow for incremental

vortex-buffer/src/bit/buf_mut.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,8 @@ impl BitBufferMut {
267267
}
268268

269269
/// Truncate the buffer to the given length.
270+
///
271+
/// If the given length is greater than the current length, this is a no-op.
270272
pub fn truncate(&mut self, len: usize) {
271273
if len > self.len {
272274
return;

vortex-vector/src/binaryview/vector_mut.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66
use std::sync::Arc;
77

88
use vortex_buffer::{BufferMut, ByteBuffer, ByteBufferMut};
9-
use vortex_error::{vortex_ensure, VortexExpect, VortexResult};
9+
use vortex_error::{VortexExpect, VortexResult, vortex_ensure};
1010
use vortex_mask::MaskMut;
1111

12-
use crate::binaryview::vector::BinaryViewVector;
13-
use crate::binaryview::view::{validate_views, BinaryView};
1412
use crate::binaryview::BinaryViewType;
13+
use crate::binaryview::vector::BinaryViewVector;
14+
use crate::binaryview::view::{BinaryView, validate_views};
1515
use crate::{VectorMutOps, VectorOps};
1616

1717
// Default capacity for new string data buffers of 2MiB.
@@ -207,6 +207,11 @@ impl<T: BinaryViewType> VectorMutOps for BinaryViewVectorMut<T> {
207207
self.open_buffer = None;
208208
}
209209

210+
fn truncate(&mut self, len: usize) {
211+
self.views.truncate(len);
212+
self.validity.truncate(len);
213+
}
214+
210215
fn extend_from_vector(&mut self, other: &BinaryViewVector<T>) {
211216
// Close any existing views into a new buffer
212217
self.flush_open_buffer();
@@ -261,7 +266,7 @@ mod tests {
261266
use std::ops::Deref;
262267
use std::sync::Arc;
263268

264-
use vortex_buffer::{buffer, buffer_mut, ByteBuffer};
269+
use vortex_buffer::{ByteBuffer, buffer, buffer_mut};
265270
use vortex_mask::{Mask, MaskMut};
266271

267272
use crate::binaryview::view::BinaryView;

0 commit comments

Comments
 (0)