Skip to content

Commit 41362bb

Browse files
committed
Sparse pipeline driver
Signed-off-by: Nicholas Gates <[email protected]>
1 parent 374e089 commit 41362bb

File tree

22 files changed

+662
-1143
lines changed

22 files changed

+662
-1143
lines changed

vortex-array/src/array/operator.rs

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@
33

44
use std::sync::Arc;
55

6-
use vortex_error::{VortexResult, vortex_panic};
6+
use vortex_error::{vortex_panic, VortexResult};
77
use vortex_mask::Mask;
8-
use vortex_vector::{Vector, VectorOps, vector_matches_dtype};
8+
use vortex_vector::{vector_matches_dtype, Vector, VectorOps};
99

1010
use crate::execution::{BatchKernelRef, BindCtx, DummyExecutionCtx, ExecutionCtx};
11-
use crate::pipeline::source_driver::PipelineSourceDriver;
12-
use crate::vtable::{OperatorVTable, PipelineNode, VTable};
11+
use crate::pipeline::driver::PipelineDriver;
12+
use crate::pipeline::PipelinedNode;
13+
use crate::vtable::{OperatorVTable, VTable};
1314
use crate::{Array, ArrayAdapter, ArrayRef};
1415

1516
/// Array functions as provided by the `OperatorVTable`.
@@ -31,6 +32,9 @@ pub trait ArrayOperator: 'static + Send + Sync {
3132
/// Optimize the array by pushing down a parent array.
3233
fn reduce_parent(&self, parent: &ArrayRef, child_idx: usize) -> VortexResult<Option<ArrayRef>>;
3334

35+
/// Returns the array as a pipeline node, if supported.
36+
fn as_pipelined(&self) -> Option<&dyn PipelinedNode>;
37+
3438
/// Bind the array to a batch kernel. This is an internal function
3539
fn bind(
3640
&self,
@@ -52,6 +56,10 @@ impl ArrayOperator for Arc<dyn Array> {
5256
self.as_ref().reduce_parent(parent, child_idx)
5357
}
5458

59+
fn as_pipelined(&self) -> Option<&dyn PipelinedNode> {
60+
self.as_ref().as_pipelined()
61+
}
62+
5563
fn bind(
5664
&self,
5765
selection: Option<&ArrayRef>,
@@ -63,14 +71,6 @@ impl ArrayOperator for Arc<dyn Array> {
6371

6472
impl<V: VTable> ArrayOperator for ArrayAdapter<V> {
6573
fn execute_batch(&self, selection: &Mask, ctx: &mut dyn ExecutionCtx) -> VortexResult<Vector> {
66-
// Check if the array is a pipeline node
67-
if let Some(pipeline_node) =
68-
<V::OperatorVTable as OperatorVTable<V>>::pipeline_node(&self.0)
69-
&& let PipelineNode::Source(source) = pipeline_node
70-
{
71-
return PipelineSourceDriver::new(source).execute(selection);
72-
}
73-
7474
let vector =
7575
<V::OperatorVTable as OperatorVTable<V>>::execute_batch(&self.0, selection, ctx)?;
7676

@@ -104,6 +104,10 @@ impl<V: VTable> ArrayOperator for ArrayAdapter<V> {
104104
<V::OperatorVTable as OperatorVTable<V>>::reduce_parent(&self.0, parent, child_idx)
105105
}
106106

107+
fn as_pipelined(&self) -> Option<&dyn PipelinedNode> {
108+
<V::OperatorVTable as OperatorVTable<V>>::pipeline_node(&self.0)
109+
}
110+
107111
fn bind(
108112
&self,
109113
selection: Option<&ArrayRef>,
@@ -126,11 +130,17 @@ impl BindCtx for () {
126130

127131
impl dyn Array + '_ {
128132
pub fn execute(&self) -> VortexResult<Vector> {
129-
self.execute_batch(&Mask::new_true(self.len()), &mut DummyExecutionCtx)
133+
self.execute_with_selection(&Mask::new_true(self.len()))
130134
}
131135

132-
pub fn execute_with_selection(&self, mask: &Mask) -> VortexResult<Vector> {
133-
assert_eq!(self.len(), mask.len());
134-
self.execute_batch(mask, &mut DummyExecutionCtx)
136+
pub fn execute_with_selection(&self, selection: &Mask) -> VortexResult<Vector> {
137+
assert_eq!(self.len(), selection.len());
138+
139+
// Check if the array is a pipeline node
140+
if self.as_pipelined().is_some() {
141+
return PipelineDriver::new(self.to_array()).execute(selection);
142+
}
143+
144+
self.execute_batch(selection, &mut DummyExecutionCtx)
135145
}
136146
}

vortex-array/src/pipeline/bit_view.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@
44
use std::borrow::Cow;
55
use std::fmt::{Debug, Formatter};
66

7+
use crate::pipeline::{N, N_BYTES, N_WORDS};
78
use vortex_buffer::BitBuffer;
89
use vortex_error::VortexResult;
910

10-
use crate::pipeline::{N, N_BYTES, N_WORDS};
11-
1211
/// A borrowed fixed-size bit vector of length `N` bits, represented as an array of usize words.
1312
///
1413
/// This struct is designed to provide a view over a Vortex [`vortex_buffer::BitBuffer`], therefore

vortex-array/src/pipeline/operator/buffers.rs renamed to vortex-array/src/pipeline/driver/allocation.rs

Lines changed: 18 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,18 @@
33

44
//! Vector allocation strategy for pipelines
55
6-
#![allow(dead_code)]
7-
8-
use std::cell::RefCell;
9-
6+
use crate::pipeline::driver::{Node, NodeId};
7+
use crate::pipeline::{PipelineVector, VectorId, N};
8+
use crate::Array;
109
use vortex_error::{VortexExpect, VortexResult};
11-
12-
use crate::pipeline::operator::{NodeId, PipelineNode};
13-
use crate::pipeline::vec::Vector;
14-
use crate::pipeline::{VType, VectorId};
10+
use vortex_vector::VectorMut;
1511

1612
#[derive(Debug)]
17-
pub struct VectorAllocationPlan {
13+
pub struct VectorAllocation {
1814
/// Where each node writes its output
1915
pub(crate) output_targets: Vec<OutputTarget>,
2016
/// The actual allocated vectors
21-
pub(crate) vectors: Vec<RefCell<Vector>>,
17+
pub(crate) vectors: Vec<PipelineVector>,
2218
}
2319

2420
// TODO(joe): support in-place view operations
@@ -30,41 +26,34 @@ pub(crate) enum OutputTarget {
3026
/// Node writes to the top-level provided output
3127
ExternalOutput,
3228
/// Node writes to an allocated intermediate vector
33-
IntermediateVector(usize), // vector idx
29+
IntermediateVector(VectorId), // vector idx
3430
}
3531

3632
impl OutputTarget {
3733
pub fn vector_id(&self) -> Option<VectorId> {
3834
match self {
39-
OutputTarget::IntermediateVector(idx) => Some(*idx),
35+
OutputTarget::IntermediateVector(vector_id) => Some(*vector_id),
4036
OutputTarget::ExternalOutput => None,
4137
}
4238
}
4339
}
4440

45-
/// Represents an allocated vector that can be reused
46-
#[derive(Debug, Clone)]
47-
struct VectorAllocation {
48-
/// Type of elements in this vector
49-
element_type: VType,
50-
}
51-
5241
// ============================================================================
5342
// Improved Pipeline with vector allocation
5443
// ============================================================================
5544

5645
/// Allocate vectors with lifetime analysis and zero-copy optimization
5746
pub(super) fn allocate_vectors(
58-
dag: &[PipelineNode],
47+
dag: &[Node],
5948
execution_order: &[NodeId],
60-
) -> VortexResult<VectorAllocationPlan> {
49+
) -> VortexResult<VectorAllocation> {
6150
let mut output_targets: Vec<Option<OutputTarget>> = vec![None; dag.len()];
62-
let mut allocations = Vec::new();
51+
let mut allocation_types = Vec::new();
6352

6453
// Process nodes in reverse execution order (top-down for output propagation)
6554
for &node_idx in execution_order.iter().rev() {
6655
let node = &dag[node_idx];
67-
let operator = &node.operator;
56+
let array = &node.array;
6857

6958
// Determine output target
7059
let output_target = if node.parents.is_empty() {
@@ -81,24 +70,22 @@ pub(super) fn allocate_vectors(
8170
// 3. Verify no conflicts with parallel execution paths
8271
// 4. Ensure proper vector lifetime management
8372

84-
let alloc_id = allocations.len();
85-
allocations.push(VectorAllocation {
86-
element_type: operator.dtype().into(),
87-
});
88-
OutputTarget::IntermediateVector(alloc_id)
73+
let vector_id = VectorId::new(allocation_types.len());
74+
allocation_types.push(array.dtype());
75+
OutputTarget::IntermediateVector(vector_id)
8976
};
9077

9178
output_targets[node_idx] = Some(output_target);
9279
}
9380

94-
Ok(VectorAllocationPlan {
81+
Ok(VectorAllocation {
9582
output_targets: output_targets
9683
.into_iter()
9784
.map(|target| target.vortex_expect("missing target"))
9885
.collect(),
99-
vectors: allocations
86+
vectors: allocation_types
10087
.into_iter()
101-
.map(|alloc| RefCell::new(Vector::new_with_vtype(alloc.element_type)))
88+
.map(|dtype| PipelineVector::Compact(VectorMut::with_capacity(dtype, 2 * N)))
10289
.collect(),
10390
})
10491
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use crate::array::ArrayOperator;
5+
use crate::pipeline::driver::allocation::VectorAllocation;
6+
use crate::pipeline::driver::Node;
7+
use crate::pipeline::{BindContext, Kernel, VectorId};
8+
use vortex_error::{VortexExpect, VortexResult};
9+
use vortex_vector::Vector;
10+
11+
pub(crate) fn bind_kernels(
12+
dag: &[Node],
13+
allocation_plan: &VectorAllocation,
14+
all_batch_inputs: &[Vector],
15+
) -> VortexResult<Vec<Box<dyn Kernel>>> {
16+
let mut kernels = Vec::with_capacity(dag.len());
17+
for node in dag {
18+
let input_ids = node
19+
.children
20+
.iter()
21+
.map(|node_id| {
22+
allocation_plan.output_targets[*node_id]
23+
.vector_id()
24+
.vortex_expect("Input node must have an output vector ID")
25+
})
26+
.collect::<Vec<_>>();
27+
28+
let batch_inputs: Vec<_> = node
29+
.batch_inputs
30+
.iter()
31+
.map(|idx| all_batch_inputs[*idx].clone())
32+
.collect();
33+
34+
let bind_context = PipelineBindContext {
35+
children: &input_ids,
36+
batch_inputs: &batch_inputs,
37+
};
38+
39+
let pipelined = node
40+
.array
41+
.as_pipelined()
42+
.vortex_expect("Array in pipeline DAG does not support pipelined execution");
43+
44+
kernels.push(pipelined.bind(&bind_context)?);
45+
}
46+
Ok(kernels)
47+
}
48+
49+
struct PipelineBindContext<'a> {
50+
children: &'a [VectorId],
51+
batch_inputs: &'a [Vector],
52+
}
53+
54+
impl BindContext for PipelineBindContext<'_> {
55+
fn pipelined_input(&self, pipelined_child_idx: usize) -> VectorId {
56+
self.children[pipelined_child_idx]
57+
}
58+
59+
fn batch_input(&self, batch_child_idx: usize) -> Vector {
60+
self.batch_inputs[batch_child_idx].clone()
61+
}
62+
}

0 commit comments

Comments
 (0)