Skip to content

Commit cef3f51

Browse files
committed
pipelined execution
Signed-off-by: Nicholas Gates <[email protected]>
1 parent f6815c2 commit cef3f51

File tree

5 files changed

+32
-17
lines changed

5 files changed

+32
-17
lines changed

vortex-array/src/array/operator.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33

44
use std::sync::Arc;
55

6-
use vortex_error::{VortexResult, vortex_panic};
6+
use vortex_error::{vortex_panic, VortexResult};
77
use vortex_mask::Mask;
8-
use vortex_vector::{Vector, VectorOps, vector_matches_dtype};
8+
use vortex_vector::{vector_matches_dtype, Vector, VectorOps};
99

1010
use crate::execution::{BatchKernelRef, BindCtx, DummyExecutionCtx, ExecutionCtx};
11+
use crate::pipeline::source_driver::PipelineSourceDriver;
1112
use crate::vtable::{OperatorVTable, VTable};
1213
use crate::{Array, ArrayAdapter, ArrayRef};
1314

@@ -62,6 +63,13 @@ impl ArrayOperator for Arc<dyn Array> {
6263

6364
impl<V: VTable> ArrayOperator for ArrayAdapter<V> {
6465
fn execute_batch(&self, selection: &Mask, ctx: &mut dyn ExecutionCtx) -> VortexResult<Vector> {
66+
// Check if the array is a pipeline source, and if so use the single-node driver for now.
67+
if let Some(pipeline_source) =
68+
<V::OperatorVTable as OperatorVTable<V>>::as_pipelined_source(&self.0)
69+
{
70+
return PipelineSourceDriver::new(pipeline_source).execute(selection);
71+
}
72+
6573
let vector =
6674
<V::OperatorVTable as OperatorVTable<V>>::execute_batch(&self.0, selection, ctx)?;
6775

vortex-array/src/arrays/primitive/vtable/operator.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,16 @@
33

44
use vortex_buffer::Buffer;
55
use vortex_compute::filter::Filter;
6-
use vortex_dtype::{NativePType, PTypeDowncastExt, match_each_native_ptype};
6+
use vortex_dtype::{match_each_native_ptype, NativePType, PTypeDowncastExt};
77
use vortex_error::VortexResult;
88
use vortex_mask::Mask;
9-
use vortex_vector::VectorMut;
109
use vortex_vector::primitive::PVector;
10+
use vortex_vector::{VectorMut, VectorMutOps};
1111

1212
use crate::arrays::{MaskedVTable, PrimitiveArray, PrimitiveVTable};
13-
use crate::execution::{BatchKernelRef, BindCtx, kernel};
13+
use crate::execution::{kernel, BatchKernelRef, BindCtx};
1414
use crate::pipeline::bit_view::BitView;
15-
use crate::pipeline::{BindContext, KernelContext, N, PipelinedSource, SourceKernel};
15+
use crate::pipeline::{BindContext, KernelContext, PipelinedSource, SourceKernel, N};
1616
use crate::vtable::{OperatorVTable, ValidityHelper};
1717
use crate::{ArrayRef, IntoArray};
1818

@@ -106,16 +106,16 @@ impl<T: NativePType> SourceKernel for PrimitiveKernel<T> {
106106
// separately from copying over the elements.
107107
unsafe {
108108
out.validity_mut().append_n(true, selection.true_count());
109-
out.elements_mut().set_len(selection.true_count());
109+
let prev_len = out.len();
110+
out.elements_mut()
111+
.set_len(prev_len + selection.true_count());
110112
}
111113

112114
let source = &self.buffer.as_slice()[self.offset..];
113115

114116
let mut out_pos = 0;
115-
selection.iter_slices(|(start, end)| {
116-
print!("Slicing {} to {}\n", start, end);
117-
let len = end - start;
118-
out.as_mut()[out_pos..][..len].copy_from_slice(&source[start..end]);
117+
selection.iter_slices(|(start, len)| {
118+
out.as_mut()[out_pos..][..len].copy_from_slice(&source[start..][..len]);
119119
out_pos += len;
120120
});
121121

vortex-array/src/pipeline/bit_view.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,8 @@ impl<'a> BitView<'a> {
214214

215215
pub fn iter_slices<F>(&self, mut f: F)
216216
where
217+
// FIXME(ngates): I have repeatedly assumed this to be a (start, end) slice, not a
218+
// (start, len)... I think we should wrap this in a struct to avoid confusion.
217219
F: FnMut((usize, usize)),
218220
{
219221
if self.true_count == 0 {

vortex-array/src/pipeline/source_driver.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
use itertools::Itertools;
5-
use vortex_error::{VortexResult, vortex_panic};
5+
use vortex_error::{vortex_panic, VortexResult};
66
use vortex_mask::Mask;
77
use vortex_vector::{Vector, VectorMut, VectorMutOps};
88

99
use crate::pipeline::bit_view::{BitView, BitViewExt};
10-
use crate::pipeline::{BindContext, KernelContext, N, PipelinedSource, VectorId};
10+
use crate::pipeline::{BindContext, KernelContext, PipelinedSource, VectorId, N};
1111

12-
/// Temporary driver for executing a single array in a pipelined fashion.
12+
/// Temporary driver for executing a single source array in a pipelined fashion.
1313
pub struct PipelineSourceDriver<'a> {
1414
array: &'a dyn PipelinedSource,
1515
}

vortex-buffer/src/buffer_mut.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::ops::{Deref, DerefMut};
99

1010
use bytes::buf::UninitSlice;
1111
use bytes::{Buf, BufMut, BytesMut};
12-
use vortex_error::{VortexExpect, vortex_panic};
12+
use vortex_error::{vortex_panic, VortexExpect};
1313

1414
use crate::debug::TruncatedDebug;
1515
use crate::trusted_len::TrustedLen;
@@ -241,10 +241,15 @@ impl<T> BufferMut<T> {
241241
}
242242
}
243243

244+
/// Sets the length of the buffer.
245+
///
244246
/// # Safety
245-
/// The caller must ensure that the buffer was properly initialized up to `len`.
247+
///
248+
/// The caller must ensure that there is sufficient capacity in the buffer and that the values
249+
/// are valid up to `len`.
246250
#[inline]
247251
pub unsafe fn set_len(&mut self, len: usize) {
252+
debug_assert!(len <= self.capacity());
248253
unsafe { self.bytes.set_len(len * size_of::<T>()) };
249254
self.length = len;
250255
}
@@ -726,7 +731,7 @@ impl Write for ByteBufferMut {
726731
mod test {
727732
use bytes::{Buf, BufMut};
728733

729-
use crate::{Alignment, BufferMut, ByteBufferMut, buffer_mut};
734+
use crate::{buffer_mut, Alignment, BufferMut, ByteBufferMut};
730735

731736
#[test]
732737
fn capacity() {

0 commit comments

Comments
 (0)