Skip to content

Commit fe16812

Browse files
committed
pipelined execution
Signed-off-by: Nicholas Gates <[email protected]>
1 parent cef3f51 commit fe16812

File tree

7 files changed

+136
-43
lines changed

7 files changed

+136
-43
lines changed

vortex-array/src/array/operator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33

44
use std::sync::Arc;
55

6-
use vortex_error::{vortex_panic, VortexResult};
6+
use vortex_error::{VortexResult, vortex_panic};
77
use vortex_mask::Mask;
8-
use vortex_vector::{vector_matches_dtype, Vector, VectorOps};
8+
use vortex_vector::{Vector, VectorOps, vector_matches_dtype};
99

1010
use crate::execution::{BatchKernelRef, BindCtx, DummyExecutionCtx, ExecutionCtx};
1111
use crate::pipeline::source_driver::PipelineSourceDriver;

vortex-array/src/arrays/primitive/vtable/operator.rs

Lines changed: 82 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use vortex_buffer::Buffer;
4+
use vortex_buffer::{BitBuffer, Buffer};
55
use vortex_compute::filter::Filter;
6-
use vortex_dtype::{match_each_native_ptype, NativePType, PTypeDowncastExt};
6+
use vortex_dtype::{NativePType, PTypeDowncastExt, match_each_native_ptype};
77
use vortex_error::VortexResult;
8-
use vortex_mask::Mask;
98
use vortex_vector::primitive::PVector;
109
use vortex_vector::{VectorMut, VectorMutOps};
1110

1211
use crate::arrays::{MaskedVTable, PrimitiveArray, PrimitiveVTable};
13-
use crate::execution::{kernel, BatchKernelRef, BindCtx};
14-
use crate::pipeline::bit_view::BitView;
15-
use crate::pipeline::{BindContext, KernelContext, PipelinedSource, SourceKernel, N};
12+
use crate::execution::{BatchKernelRef, BindCtx, kernel};
13+
use crate::pipeline::bit_view::{BitSlice, BitView};
14+
use crate::pipeline::{
15+
AllNullSourceKernel, BindContext, KernelContext, N, PipelinedSource, SourceKernel,
16+
};
17+
use crate::validity::Validity;
1618
use crate::vtable::{OperatorVTable, ValidityHelper};
1719
use crate::{ArrayRef, IntoArray};
1820

@@ -71,25 +73,81 @@ impl OperatorVTable<PrimitiveVTable> for PrimitiveVTable {
7173
}
7274

7375
impl PipelinedSource for PrimitiveArray {
74-
fn bind_source(&self, _ctx: &mut dyn BindContext) -> VortexResult<Box<dyn SourceKernel>> {
75-
match_each_native_ptype!(self.ptype(), |T| {
76-
let primitive_kernel = PrimitiveKernel {
77-
buffer: self.buffer::<T>().clone(),
78-
validity: self.validity_mask(),
79-
offset: 0,
80-
};
81-
Ok(Box::new(primitive_kernel))
82-
})
76+
fn bind_source(&self, ctx: &mut dyn BindContext) -> VortexResult<Box<dyn SourceKernel>> {
77+
match self.validity() {
78+
Validity::NonNullable | Validity::AllValid => {
79+
match_each_native_ptype!(self.ptype(), |T| {
80+
let primitive_kernel = NonNullablePrimitiveKernel {
81+
buffer: self.buffer::<T>(),
82+
offset: 0,
83+
};
84+
Ok(Box::new(primitive_kernel))
85+
})
86+
}
87+
Validity::AllInvalid => Ok(Box::new(AllNullSourceKernel)),
88+
Validity::Array(_) => {
89+
let validity = ctx.batch_input(0).into_bool();
90+
// Validity is non-nullable, so we extract the inner bit buffer.
91+
let (validity, _) = validity.into_parts();
92+
93+
match_each_native_ptype!(self.ptype(), |T| {
94+
let primitive_kernel = NullablePrimitiveKernel {
95+
buffer: self.buffer::<T>(),
96+
validity,
97+
offset: 0,
98+
};
99+
Ok(Box::new(primitive_kernel))
100+
})
101+
}
102+
}
103+
}
104+
}
105+
106+
struct NonNullablePrimitiveKernel<T: NativePType> {
107+
buffer: Buffer<T>,
108+
offset: usize,
109+
}
110+
111+
impl<T: NativePType> SourceKernel for NonNullablePrimitiveKernel<T> {
112+
fn skip(&mut self, n: usize) {
113+
self.offset += n * N;
114+
}
115+
116+
fn step(
117+
&mut self,
118+
_ctx: &KernelContext,
119+
selection: &BitView,
120+
out: &mut VectorMut,
121+
) -> VortexResult<()> {
122+
let out = out.as_primitive_mut().downcast::<T>();
123+
124+
// SAFETY: we know the output has sufficient capacity.
125+
unsafe {
126+
out.validity_mut().append_n(true, selection.true_count());
127+
let prev_len = out.len();
128+
out.elements_mut()
129+
.set_len(prev_len + selection.true_count());
130+
}
131+
132+
let source = &self.buffer.as_slice()[self.offset..];
133+
let mut out_pos = 0;
134+
selection.iter_slices(|BitSlice { start, len }| {
135+
out.as_mut()[out_pos..][..len].copy_from_slice(&source[start..][..len]);
136+
out_pos += len;
137+
});
138+
139+
Ok(())
83140
}
84141
}
85142

86-
struct PrimitiveKernel<T: NativePType> {
143+
struct NullablePrimitiveKernel<T: NativePType> {
87144
buffer: Buffer<T>,
88-
validity: Mask,
145+
#[allow(dead_code)] // TODO(ngates): implement appending validity bits
146+
validity: BitBuffer,
89147
offset: usize,
90148
}
91149

92-
impl<T: NativePType> SourceKernel for PrimitiveKernel<T> {
150+
impl<T: NativePType> SourceKernel for NullablePrimitiveKernel<T> {
93151
fn skip(&mut self, n: usize) {
94152
self.offset += n * N;
95153
}
@@ -114,9 +172,14 @@ impl<T: NativePType> SourceKernel for PrimitiveKernel<T> {
114172
let source = &self.buffer.as_slice()[self.offset..];
115173

116174
let mut out_pos = 0;
117-
selection.iter_slices(|(start, len)| {
175+
selection.iter_slices(|BitSlice { start, len }| {
176+
// Copy over the elements.
118177
out.as_mut()[out_pos..][..len].copy_from_slice(&source[start..][..len]);
119178
out_pos += len;
179+
180+
// Append the validity bits.
181+
let _validity = unsafe { out.validity_mut() };
182+
todo!("Append validity bits correctly and optimally!");
120183
});
121184

122185
Ok(())

vortex-array/src/pipeline/bit_view.rs

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -212,11 +212,13 @@ impl<'a> BitView<'a> {
212212
}
213213
}
214214

215+
/// Runs the provided function `f` for each range of `true` bits in the view.
216+
///
217+
/// The function `f` receives a [`BitSlice`] containing the inclusive `start` bit as well as
218+
/// the length.
215219
pub fn iter_slices<F>(&self, mut f: F)
216220
where
217-
// FIXME(ngates): I have repeatedly assumed this to be a (start, end) slice, not a
218-
// (start, len)... I think we should wrap this in a struct to avoid confusion.
219-
F: FnMut((usize, usize)),
221+
F: FnMut(BitSlice),
220222
{
221223
if self.true_count == 0 {
222224
return;
@@ -231,7 +233,10 @@ impl<'a> BitView<'a> {
231233
0 => {
232234
// If a slice was being tracked, the run ends at the start of this word.
233235
if slice_length > 0 {
234-
f((slice_start_bit, slice_length));
236+
f(BitSlice {
237+
start: slice_start_bit,
238+
len: slice_length,
239+
});
235240
slice_length = 0;
236241
}
237242
}
@@ -250,7 +255,10 @@ impl<'a> BitView<'a> {
250255

251256
// If a run was open, and we hit a zero gap, report the finished slice
252257
if slice_length > 0 && zeros > 0 {
253-
f((slice_start_bit, slice_length));
258+
f(BitSlice {
259+
start: slice_start_bit,
260+
len: slice_length,
261+
});
254262
slice_length = 0; // Reset state for a new slice
255263
}
256264

@@ -284,22 +292,26 @@ impl<'a> BitView<'a> {
284292
}
285293

286294
if slice_length > 0 {
287-
f((slice_start_bit, slice_length));
295+
f(BitSlice {
296+
start: slice_start_bit,
297+
len: slice_length,
298+
});
288299
}
289300
}
290301

291-
/// Runs the provided function `f` for each range of `true` bits in the view.
292-
///
293-
/// The function `f` receives a tuple `(start, len)` where `start` is the index of the first
294-
/// `true` bit and `len` is the number of consecutive `true` bits.
295-
///
296-
/// FIXME(ngates): this code is broken.
297-
298302
pub fn as_raw(&self) -> &[u8; N_BYTES] {
299303
self.bits.as_ref()
300304
}
301305
}
302306

307+
/// A slice of bits within a [`BitBuffer`].
308+
///
309+
/// We use this struct to avoid a common mistake of assuming the slices represent (start, end) ranges,
310+
pub struct BitSlice {
311+
pub start: usize,
312+
pub len: usize,
313+
}
314+
303315
pub trait BitViewExt {
304316
/// Iterate the [`BitBuffer`] in fixed-size chunks of [`BitView`].
305317
///
@@ -319,7 +331,7 @@ impl BitViewExt for BitBuffer {
319331
0,
320332
"BitView iteration requires zero bit offset"
321333
);
322-
let n_views = (self.len() + N - 1) / N;
334+
let n_views = self.len().div_ceil(N);
323335
BitViewIterator {
324336
bits: self.inner().as_ref(),
325337
view_idx: 0,
@@ -365,8 +377,6 @@ impl<'a> Iterator for BitViewIterator<'a> {
365377

366378
#[cfg(test)]
367379
mod tests {
368-
use std::usize;
369-
370380
use super::*;
371381

372382
#[test]
@@ -591,7 +601,6 @@ mod tests {
591601
view.iter_slices(|slice| slices.push(slice));
592602

593603
assert_eq!(slices.len(), 1);
594-
assert_eq!(slices[0], (0, i));
595604
}
596605
}
597606
}

vortex-array/src/pipeline/mod.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use std::ops::Deref;
2525

2626
use bit_view::BitView;
2727
use vortex_error::VortexResult;
28-
use vortex_vector::{Vector, VectorMut};
28+
use vortex_vector::{Vector, VectorMut, VectorMutOps};
2929

3030
use crate::Array;
3131

@@ -150,3 +150,20 @@ impl KernelContext {
150150
&self.vectors[vector_id]
151151
}
152152
}
153+
154+
/// A general implementation of a source kernel that produces all null values.
155+
pub struct AllNullSourceKernel;
156+
157+
impl SourceKernel for AllNullSourceKernel {
158+
fn skip(&mut self, _n: usize) {}
159+
160+
fn step(
161+
&mut self,
162+
_ctx: &KernelContext,
163+
selection: &BitView,
164+
out: &mut VectorMut,
165+
) -> VortexResult<()> {
166+
out.append_nulls(selection.true_count());
167+
Ok(())
168+
}
169+
}

vortex-array/src/pipeline/source_driver.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
use itertools::Itertools;
5-
use vortex_error::{vortex_panic, VortexResult};
5+
use vortex_error::{VortexResult, vortex_panic};
66
use vortex_mask::Mask;
77
use vortex_vector::{Vector, VectorMut, VectorMutOps};
88

99
use crate::pipeline::bit_view::{BitView, BitViewExt};
10-
use crate::pipeline::{BindContext, KernelContext, PipelinedSource, VectorId, N};
10+
use crate::pipeline::{BindContext, KernelContext, N, PipelinedSource, VectorId};
1111

1212
/// Temporary driver for executing a single source array in a pipelined fashion.
1313
pub struct PipelineSourceDriver<'a> {

vortex-buffer/src/buffer_mut.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::ops::{Deref, DerefMut};
99

1010
use bytes::buf::UninitSlice;
1111
use bytes::{Buf, BufMut, BytesMut};
12-
use vortex_error::{vortex_panic, VortexExpect};
12+
use vortex_error::{VortexExpect, vortex_panic};
1313

1414
use crate::debug::TruncatedDebug;
1515
use crate::trusted_len::TrustedLen;
@@ -731,7 +731,7 @@ impl Write for ByteBufferMut {
731731
mod test {
732732
use bytes::{Buf, BufMut};
733733

734-
use crate::{buffer_mut, Alignment, BufferMut, ByteBufferMut};
734+
use crate::{Alignment, BufferMut, ByteBufferMut, buffer_mut};
735735

736736
#[test]
737737
fn capacity() {

vortex-mask/src/mask_mut.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ impl MaskMut {
9595
}
9696

9797
/// Set the length of the mask.
98+
///
99+
/// # Safety
100+
///
101+
/// The caller must ensure that `new_len` is less than the capacity of the mask.
98102
pub unsafe fn set_len(&mut self, new_len: usize) {
99103
debug_assert!(new_len < self.capacity());
100104
match &mut self.0 {

0 commit comments

Comments
 (0)