Skip to content

Commit afc449a

Browse files
committed
ALP pipeline kernel
Signed-off-by: Nicholas Gates <[email protected]>
2 parents 72586b2 + bfb7477 commit afc449a

File tree

4 files changed

+69
-151
lines changed

4 files changed

+69
-151
lines changed

encodings/alp/src/alp/operator.rs

Lines changed: 30 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ use crate::{match_each_alp_float_ptype, ALPArray, ALPFloat, ALPVTable, Exponents
55
use std::marker::PhantomData;
66
use vortex_array::pipeline::bit_view::BitView;
77
use vortex_array::pipeline::{
8-
BindContext, Kernel, KernelCtx, PipelineInputs, PipelinedNode, Position, VectorId, N,
8+
BindContext, Kernel, KernelCtx, PipelineInputs, PipelinedNode, VectorId, N,
99
};
1010
use vortex_array::vtable::OperatorVTable;
1111
use vortex_dtype::PTypeDowncastExt;
1212
use vortex_error::{vortex_bail, VortexResult};
13-
use vortex_vector::VectorMut;
13+
use vortex_vector::{VectorMut, VectorMutOps};
1414

1515
impl OperatorVTable<ALPVTable> for ALPVTable {
1616
fn pipeline_node(array: &ALPArray) -> Option<&dyn PipelinedNode> {
@@ -69,49 +69,37 @@ impl<A: ALPFloat> Kernel for UnpatchedALPKernel<A> {
6969

7070
let decoded = out.as_primitive_mut().downcast::<A>();
7171

72-
match encoded.position() {
73-
Position::InPlace => {
74-
// TODO(ngates): tune the threshold
75-
if selection.true_count() < (N / 8) {
76-
// Operate only over the selected elements, appending `true_count` elements
77-
unsafe {
78-
decoded
79-
.validity_mut()
80-
.append_n(true, selection.true_count())
81-
};
82-
unsafe { decoded.elements_mut().set_len(selection.true_count()) };
83-
let decoded_buf = unsafe { decoded.elements_mut() };
72+
// If our input is in-place, and we have only a few selected elements, then iterate only
73+
// the selected elements and write them to the output.
74+
if encoded_buf.len() == N && selection.true_count() < (N / 8) {
75+
// Reserve capacity for the true_count elements.
76+
decoded.reserve(selection.true_count().saturating_sub(decoded.capacity()));
8477

85-
let mut out_pos = 0;
86-
selection.iter_ones(|idx| {
87-
let encoded = unsafe { encoded_buf.get_unchecked(idx) };
88-
let decoded = A::decode_single(*encoded, self.exponents);
89-
*unsafe { decoded_buf.get_unchecked_mut(out_pos) } = decoded;
90-
out_pos += 1;
91-
})
92-
} else {
93-
// Operate over all N elements, appending N elements
94-
assert_eq!(encoded_buf.len(), N);
95-
decoded.extend(
96-
encoded_buf
97-
.iter()
98-
.map(|e| A::decode_single(*e, self.exponents)),
99-
);
100-
}
101-
}
102-
Position::Compact => {
103-
// Loop over the compacted input elements
104-
decoded.extend(
105-
encoded
106-
.as_primitive()
107-
.downcast::<A::ALPInt>()
108-
.elements()
109-
.iter()
110-
.map(|e| A::decode_single(*e, self.exponents)),
111-
)
112-
}
78+
// SAFETY: we set_len and append_validity ensuring elements len matches validity len.
79+
unsafe { decoded.validity_mut() }.append_n(true, selection.true_count());
80+
unsafe { decoded.elements_mut().set_len(selection.true_count()) };
81+
82+
// SAFETY: we reserved capacity above.
83+
let elements = unsafe { decoded.elements_mut() };
84+
85+
let mut out_pos = 0;
86+
selection.iter_ones(|idx| {
87+
let encoded = unsafe { encoded_buf.get_unchecked(idx) };
88+
let decoded_value = A::decode_single(*encoded, self.exponents);
89+
unsafe { *elements.get_unchecked_mut(out_pos) = decoded_value };
90+
out_pos += 1;
91+
});
92+
93+
debug_assert_eq!(decoded.validity().len(), decoded.elements().len());
94+
return Ok(());
11395
}
11496

97+
// Otherwise, iterate the entire input.
98+
decoded.extend(
99+
encoded_buf
100+
.iter()
101+
.map(|e| A::decode_single(*e, self.exponents)),
102+
);
115103
Ok(())
116104
}
117105
}

vortex-array/src/pipeline/driver/allocation.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@ use vortex_error::{VortexExpect, VortexResult};
77
use vortex_vector::VectorMut;
88

99
use crate::pipeline::driver::{Node, NodeId};
10-
use crate::pipeline::{PipelineVector, VectorId, N};
10+
use crate::pipeline::{VectorId, N};
1111
use crate::Array;
1212

1313
#[derive(Debug)]
1414
pub struct VectorAllocation {
1515
/// Where each node writes its output
1616
pub(crate) output_targets: Vec<OutputTarget>,
1717
/// The actual allocated vectors
18-
pub(crate) vectors: Vec<PipelineVector>,
18+
pub(crate) vectors: Vec<VectorMut>,
1919
}
2020

2121
// TODO(joe): support in-place view operations
@@ -86,7 +86,7 @@ pub(super) fn allocate_vectors(
8686
.collect(),
8787
vectors: allocation_types
8888
.into_iter()
89-
.map(|dtype| PipelineVector::new_compact(VectorMut::with_capacity(dtype, N)))
89+
.map(|dtype| VectorMut::with_capacity(dtype, N))
9090
.collect(),
9191
})
9292
}

vortex-array/src/pipeline/driver/mod.rs

Lines changed: 20 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::pipeline::bit_view::{BitView, BitViewExt};
1919
use crate::pipeline::driver::allocation::{allocate_vectors, OutputTarget};
2020
use crate::pipeline::driver::bind::bind_kernels;
2121
use crate::pipeline::driver::toposort::topological_sort;
22-
use crate::pipeline::{Kernel, KernelCtx, PipelineInputs, PipelineVector, N};
22+
use crate::pipeline::{Kernel, KernelCtx, PipelineInputs, N};
2323
use crate::{Array, ArrayEq, ArrayHash, ArrayOperator, ArrayRef, ArrayVisitor, Precision};
2424

2525
/// A pipeline driver takes a Vortex array and executes it into a canonical vector.
@@ -278,58 +278,38 @@ impl Pipeline {
278278
// take the intermediate vector and write into that.
279279
match &self.output_targets[node_idx] {
280280
OutputTarget::ExternalOutput => {
281-
assert!(
282-
output.capacity() >= N,
283-
"Insufficient capacity in external output vector"
284-
);
285-
286-
let prev_output_len = output.len();
287-
kernel.step(&self.ctx, selection, output)?;
288-
289-
let added_len = output.len() - prev_output_len;
290-
match added_len {
291-
N => {
292-
// If the kernel added N elements, the output is in-place.
293-
// TODO(ngates): we need to filter if the true count is not N.
294-
}
295-
_ if added_len == selection.true_count() => {
296-
// If the kernel added exactly the number of selected elements,
297-
// the output is already compacted into the start of the vector.
298-
}
299-
_ => vortex_bail!(
300-
"Kernel produced incorrect number of output elements, expected to append either {} or {}, got {}",
281+
// We split off the next N elements of capacity from the external output vector.
282+
let mut tail = output.split_off(output.len());
283+
assert!(tail.is_empty());
284+
285+
kernel.step(&self.ctx, selection, &mut tail)?;
286+
if tail.len() != N && tail.len() != selection.true_count() {
287+
vortex_bail!(
288+
"Kernel produced incorrect number of output elements, expected either {} or {}, got {}",
301289
N,
302290
selection.true_count(),
303-
added_len
304-
),
291+
tail.len()
292+
);
305293
}
294+
295+
// Now we append the produced output back to the main output vector.
296+
output.unsplit(tail);
306297
}
307298
OutputTarget::IntermediateVector(vector_id) => {
308-
let mut out_vector = self.ctx.take_output(vector_id).into_vector();
299+
let mut out_vector = self.ctx.take_output(vector_id);
309300
out_vector.clear();
310-
assert!(
311-
out_vector.capacity() >= N,
312-
"Insufficient capacity in intermediate vector"
313-
);
314301

302+
assert!(out_vector.is_empty());
315303
kernel.step(&self.ctx, selection, &mut out_vector)?;
316304

317305
match out_vector.len() {
318-
N => {
306+
// Valid cases are all N elements, or only the selected elements.
307+
n if n == N || n == selection.true_count() => {
319308
// If the kernel added N elements, the output is in-place.
320-
self.ctx.replace_output(
321-
vector_id,
322-
PipelineVector::new_in_place(out_vector),
323-
);
324-
}
325-
_ if out_vector.len() == selection.true_count() => {
326-
// If the kernel added exactly the number of selected elements,
327-
// the output is already compacted into the start of the vector.
328-
self.ctx
329-
.replace_output(vector_id, PipelineVector::new_compact(out_vector));
309+
self.ctx.replace_output(vector_id, out_vector);
330310
}
331311
_ => vortex_bail!(
332-
"Kernel produced incorrect number of output elements, expected to append either {} or {}, got {}",
312+
"Kernel produced incorrect number of output elements, expected either {} or {}, got {}",
333313
N,
334314
selection.true_count(),
335315
out_vector.len()

vortex-array/src/pipeline/mod.rs

Lines changed: 16 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -68,19 +68,19 @@ pub trait BindContext {
6868
/// Each step of the kernel processes zero or more input vectors, and writes output to a
6969
/// pre-allocated mutable output vector.
7070
///
71-
/// Input vectors are provided via the [`KernelCtx`] and indicate the position of their elements
72-
/// as either [`PipelineVector::InPlace`] or [`PipelineVector::Compact`] based on whether
73-
/// the selected elements are in their original positions or compacted at the start of the vector
74-
/// respectively.
71+
/// Input vectors will either have length [`N`], indicating that all elements from the step are
72+
/// present. Or they will have length equal to the [`BitView::true_count`] of the selection mask,
73+
/// in which case only the selected elements are present.
7574
///
76-
/// The provided mutable output vector is guaranteed to have at least `N` elements of capacity.
77-
/// The kernel **must** append either [`BitView::true_count`] elements to the output vector (in
78-
/// which case the output elements are considered to be in the "Compact" position), or it must
79-
/// append `N` elements (in which case the output elements are considered to be in their "InPlace"
80-
/// positions). The pipeline driver will assert these conditions after each step.
75+
/// Output vectors will always be passed with length zero.
8176
///
82-
/// Note that the output vector may not be empty at the start of the step. The kernel must append
83-
/// its output to the existing contents of the output vector, rather than replacing it.
77+
/// Kernels may choose to output either all `N` elements in their original positions, or output
78+
/// only the selected elements to the first `true_count` positions of the output vector. When
79+
/// emitting `N` elements in-place, the kernel may omit expensive computations over the unselected
80+
/// elements, provided that the output elements in those positions are still valid (i.e. typically
81+
/// zeroed, rather than undefined).
82+
///
83+
/// The pipeline driver will verify these conditions before and after each step.
8484
pub trait Kernel: Send {
8585
/// Perform a single step of the kernel.
8686
fn step(
@@ -93,11 +93,11 @@ pub trait Kernel: Send {
9393

9494
/// The context provided to kernels during execution to access input vectors.
9595
pub struct KernelCtx {
96-
vectors: Vec<Option<PipelineVector>>,
96+
vectors: Vec<Option<VectorMut>>,
9797
}
9898

9999
impl KernelCtx {
100-
fn new(vectors: Vec<PipelineVector>) -> Self {
100+
fn new(vectors: Vec<VectorMut>) -> Self {
101101
Self {
102102
vectors: vectors.into_iter().map(Some).collect(),
103103
}
@@ -113,21 +113,21 @@ impl KernelCtx {
113113
///
114114
/// If the input vector at the given index is not available (typically because the vector
115115
/// happens to be currently borrowed as an output vector!).
116-
pub fn input(&self, id: VectorId) -> &PipelineVector {
116+
pub fn input(&self, id: VectorId) -> &VectorMut {
117117
self.vectors[id.0]
118118
.as_ref()
119119
.vortex_expect("Input vector at index is not available")
120120
}
121121

122122
#[inline]
123-
fn take_output(&mut self, id: &VectorId) -> PipelineVector {
123+
fn take_output(&mut self, id: &VectorId) -> VectorMut {
124124
self.vectors[id.0]
125125
.take()
126126
.vortex_expect("Output vector at index is not available")
127127
}
128128

129129
#[inline]
130-
fn replace_output(&mut self, id: &VectorId, vec: PipelineVector) {
130+
fn replace_output(&mut self, id: &VectorId, vec: VectorMut) {
131131
self.vectors[id.0] = Some(vec);
132132
}
133133
}
@@ -141,53 +141,3 @@ impl VectorId {
141141
VectorId(idx)
142142
}
143143
}
144-
145-
/// A pipeline vector passed into and out of pipeline kernels.
146-
#[derive(Debug)]
147-
pub struct PipelineVector {
148-
vector: VectorMut,
149-
position: Position,
150-
}
151-
152-
/// Describes the position of the selected elements in a pipeline vector.
153-
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
154-
pub enum Position {
155-
/// `InPlace` indicates that elements are in their original positions, where the selected
156-
/// elements are identified by true values in the selection mask.
157-
InPlace,
158-
/// Compact indicates that the selected elements are compacted at the start of the vector in
159-
/// positions `0..true_count`.
160-
Compact,
161-
}
162-
163-
impl PipelineVector {
164-
pub fn new_in_place(vector: VectorMut) -> Self {
165-
Self {
166-
vector,
167-
position: Position::InPlace,
168-
}
169-
}
170-
171-
pub fn new_compact(vector: VectorMut) -> Self {
172-
Self {
173-
vector,
174-
position: Position::Compact,
175-
}
176-
}
177-
178-
pub fn position(&self) -> Position {
179-
self.position
180-
}
181-
182-
pub fn into_vector(self) -> VectorMut {
183-
self.vector
184-
}
185-
}
186-
187-
impl Deref for PipelineVector {
188-
type Target = VectorMut;
189-
190-
fn deref(&self) -> &Self::Target {
191-
&self.vector
192-
}
193-
}

0 commit comments

Comments
 (0)