Skip to content

Commit 4fa4ec5

Browse files
authored
Pipeline execution (#5251)
Signed-off-by: Nicholas Gates <[email protected]>
1 parent 45cd6c1 commit 4fa4ec5

File tree

39 files changed

+1033
-3409
lines changed

39 files changed

+1033
-3409
lines changed

encodings/fastlanes/src/bitpacking/vtable/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ mod array;
1010
mod canonical;
1111
mod encode;
1212
mod operations;
13-
mod operator;
1413
mod serde;
1514
mod validity;
1615
mod visitor;
@@ -29,7 +28,7 @@ impl VTable for BitPackedVTable {
2928
type ComputeVTable = NotSupported;
3029
type EncodeVTable = Self;
3130
type SerdeVTable = Self;
32-
type OperatorVTable = Self;
31+
type OperatorVTable = NotSupported;
3332

3433
fn id(_encoding: &Self::Encoding) -> EncodingId {
3534
EncodingId::new_ref("fastlanes.bitpacked")

encodings/fastlanes/src/for/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use vortex_scalar::Scalar;
1818
mod compress;
1919
mod compute;
2020
mod ops;
21-
mod pipeline;
2221
mod serde;
2322

2423
vtable!(FoR);
@@ -35,7 +34,7 @@ impl VTable for FoRVTable {
3534
type ComputeVTable = NotSupported;
3635
type EncodeVTable = Self;
3736
type SerdeVTable = Self;
38-
type OperatorVTable = Self;
37+
type OperatorVTable = NotSupported;
3938

4039
fn id(_encoding: &Self::Encoding) -> EncodingId {
4140
EncodingId::new_ref("fastlanes.for")

encodings/fastlanes/src/for/pipeline.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use std::marker::PhantomData;
77
use std::sync::Arc;
88

99
use num_traits::WrappingAdd;
10-
use vortex_array::Array;
1110
use vortex_array::operator::{
1211
LengthBounds, Operator, OperatorEq, OperatorHash, OperatorId, OperatorRef,
1312
};
@@ -17,8 +16,9 @@ use vortex_array::pipeline::{
1716
BindContext, Element, Kernel, KernelContext, PipelinedOperator, RowSelection, VectorId,
1817
};
1918
use vortex_array::vtable::OperatorVTable;
20-
use vortex_dtype::{DType, NativePType, PType, match_each_integer_ptype};
21-
use vortex_error::{VortexExpect, VortexResult, vortex_bail};
19+
use vortex_array::Array;
20+
use vortex_dtype::{match_each_integer_ptype, DType, NativePType, PType};
21+
use vortex_error::{vortex_bail, VortexExpect, VortexResult};
2222
use vortex_scalar::Scalar;
2323

2424
use crate::{FoRArray, FoRVTable};
@@ -150,7 +150,7 @@ impl PipelinedOperator for FoROperator {
150150
match_each_integer_ptype!(ptype, |T| {
151151
match_each_integer_ptype!(self.encoded_ptype, |E| {
152152
Ok(Box::new(FoRKernel::<T, E> {
153-
child: ctx.children()[0],
153+
child: ctx.pipelined_input()[0],
154154
reference: self
155155
.reference
156156
.as_primitive()

encodings/fsst/src/array.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ impl VTable for FSSTVTable {
3232
type ComputeVTable = NotSupported;
3333
type EncodeVTable = Self;
3434
type SerdeVTable = Self;
35-
type OperatorVTable = Self;
35+
type OperatorVTable = NotSupported;
3636

3737
fn id(_encoding: &Self::Encoding) -> EncodingId {
3838
EncodingId::new_ref("vortex.fsst")

encodings/fsst/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ mod array;
1515
mod canonical;
1616
mod compress;
1717
mod compute;
18-
mod operator;
1918
mod ops;
2019
mod serde;
2120
#[cfg(test)]

encodings/fsst/src/operator.rs

Lines changed: 0 additions & 194 deletions
This file was deleted.

vortex-array/src/array/mod.rs

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,11 @@ use crate::arrays::{
2626
};
2727
use crate::builders::ArrayBuilder;
2828
use crate::compute::{ComputeFn, Cost, InvocationArgs, IsConstantOpts, Output, is_constant_opts};
29-
use crate::operator::OperatorRef;
3029
use crate::serde::ArrayChildren;
3130
use crate::stats::{Precision, Stat, StatsProviderExt, StatsSetRef};
3231
use crate::vtable::{
33-
ArrayVTable, CanonicalVTable, ComputeVTable, OperationsVTable, OperatorVTable, SerdeVTable,
34-
VTable, ValidityVTable, VisitorVTable,
32+
ArrayVTable, CanonicalVTable, ComputeVTable, OperationsVTable, SerdeVTable, VTable,
33+
ValidityVTable, VisitorVTable,
3534
};
3635
use crate::{
3736
ArrayEq, ArrayHash, Canonical, DynArrayEq, DynArrayHash, EncodingId, EncodingRef,
@@ -168,11 +167,6 @@ pub trait Array:
168167
/// call.
169168
fn invoke(&self, compute_fn: &ComputeFn, args: &InvocationArgs)
170169
-> VortexResult<Option<Output>>;
171-
172-
/// Convert the array to an operator if supported by the encoding.
173-
///
174-
/// Returns `None` if the encoding does not support operator operations.
175-
fn to_operator(&self) -> VortexResult<Option<OperatorRef>>;
176170
}
177171

178172
impl Array for Arc<dyn Array> {
@@ -275,10 +269,6 @@ impl Array for Arc<dyn Array> {
275269
) -> VortexResult<Option<Output>> {
276270
self.as_ref().invoke(compute_fn, args)
277271
}
278-
279-
fn to_operator(&self) -> VortexResult<Option<OperatorRef>> {
280-
self.as_ref().to_operator()
281-
}
282272
}
283273

284274
/// A reference counted pointer to a dynamic [`Array`] trait object.
@@ -649,10 +639,6 @@ impl<V: VTable> Array for ArrayAdapter<V> {
649639
) -> VortexResult<Option<Output>> {
650640
<V::ComputeVTable as ComputeVTable<V>>::invoke(&self.0, compute_fn, args)
651641
}
652-
653-
fn to_operator(&self) -> VortexResult<Option<OperatorRef>> {
654-
<V::OperatorVTable as OperatorVTable<V>>::to_operator(&self.0)
655-
}
656642
}
657643

658644
impl<V: VTable> ArrayHash for ArrayAdapter<V> {

vortex-array/src/array/operator.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ use vortex_mask::Mask;
88
use vortex_vector::{Vector, VectorOps, vector_matches_dtype};
99

1010
use crate::execution::{BatchKernelRef, BindCtx, DummyExecutionCtx, ExecutionCtx};
11-
use crate::vtable::{OperatorVTable, VTable};
11+
use crate::pipeline::source_driver::PipelineSourceDriver;
12+
use crate::vtable::{OperatorVTable, PipelineNode, VTable};
1213
use crate::{Array, ArrayAdapter, ArrayRef};
1314

1415
/// Array functions as provided by the `OperatorVTable`.
@@ -21,7 +22,7 @@ pub trait ArrayOperator: 'static + Send + Sync {
2122
/// # Panics
2223
///
2324
/// If the mask length does not match the array length.
24-
/// If the array's implementation returns an invalid vector (wrong length, wrong type, etc).
25+
/// If the array's implementation returns an invalid vector (wrong length, wrong type, etc.).
2526
fn execute_batch(&self, selection: &Mask, ctx: &mut dyn ExecutionCtx) -> VortexResult<Vector>;
2627

2728
/// Optimize the array by running the optimization rules.
@@ -62,6 +63,14 @@ impl ArrayOperator for Arc<dyn Array> {
6263

6364
impl<V: VTable> ArrayOperator for ArrayAdapter<V> {
6465
fn execute_batch(&self, selection: &Mask, ctx: &mut dyn ExecutionCtx) -> VortexResult<Vector> {
66+
// Check if the array is a pipeline node
67+
if let Some(pipeline_node) =
68+
<V::OperatorVTable as OperatorVTable<V>>::pipeline_node(&self.0)
69+
&& let PipelineNode::Source(source) = pipeline_node
70+
{
71+
return PipelineSourceDriver::new(source).execute(selection);
72+
}
73+
6574
let vector =
6675
<V::OperatorVTable as OperatorVTable<V>>::execute_batch(&self.0, selection, ctx)?;
6776

vortex-array/src/arrays/varbin/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@ mod array;
55
pub use array::VarBinArray;
66

77
mod compute;
8-
pub(crate) use compute::varbin_compute_min_max; // For use in `varbinview`.
8+
pub(crate) use compute::varbin_compute_min_max;
9+
// For use in `varbinview`.
910

1011
mod vtable;
1112
pub use vtable::{VarBinEncoding, VarBinVTable};
1213

1314
pub mod builder;
1415

1516
mod accessor;
16-
mod operator;
1717

1818
use vortex_buffer::ByteBuffer;
1919
use vortex_dtype::DType;

0 commit comments

Comments
 (0)