Skip to content

Commit c4240f1

Browse files
authored
API and partial implementation of sparse pipelined exection (#5284)
Signed-off-by: Nicholas Gates <[email protected]>
1 parent 374e089 commit c4240f1

File tree

25 files changed

+705
-1133
lines changed

25 files changed

+705
-1133
lines changed

vortex-array/src/array/operator.rs

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ use vortex_mask::Mask;
88
use vortex_vector::{Vector, VectorOps, vector_matches_dtype};
99

1010
use crate::execution::{BatchKernelRef, BindCtx, DummyExecutionCtx, ExecutionCtx};
11-
use crate::pipeline::source_driver::PipelineSourceDriver;
12-
use crate::vtable::{OperatorVTable, PipelineNode, VTable};
11+
use crate::pipeline::PipelinedNode;
12+
use crate::pipeline::driver::PipelineDriver;
13+
use crate::vtable::{OperatorVTable, VTable};
1314
use crate::{Array, ArrayAdapter, ArrayRef};
1415

1516
/// Array functions as provided by the `OperatorVTable`.
@@ -31,6 +32,9 @@ pub trait ArrayOperator: 'static + Send + Sync {
3132
/// Optimize the array by pushing down a parent array.
3233
fn reduce_parent(&self, parent: &ArrayRef, child_idx: usize) -> VortexResult<Option<ArrayRef>>;
3334

35+
/// Returns the array as a pipeline node, if supported.
36+
fn as_pipelined(&self) -> Option<&dyn PipelinedNode>;
37+
3438
/// Bind the array to a batch kernel. This is an internal function
3539
fn bind(
3640
&self,
@@ -52,6 +56,10 @@ impl ArrayOperator for Arc<dyn Array> {
5256
self.as_ref().reduce_parent(parent, child_idx)
5357
}
5458

59+
fn as_pipelined(&self) -> Option<&dyn PipelinedNode> {
60+
self.as_ref().as_pipelined()
61+
}
62+
5563
fn bind(
5664
&self,
5765
selection: Option<&ArrayRef>,
@@ -63,14 +71,6 @@ impl ArrayOperator for Arc<dyn Array> {
6371

6472
impl<V: VTable> ArrayOperator for ArrayAdapter<V> {
6573
fn execute_batch(&self, selection: &Mask, ctx: &mut dyn ExecutionCtx) -> VortexResult<Vector> {
66-
// Check if the array is a pipeline node
67-
if let Some(pipeline_node) =
68-
<V::OperatorVTable as OperatorVTable<V>>::pipeline_node(&self.0)
69-
&& let PipelineNode::Source(source) = pipeline_node
70-
{
71-
return PipelineSourceDriver::new(source).execute(selection);
72-
}
73-
7474
let vector =
7575
<V::OperatorVTable as OperatorVTable<V>>::execute_batch(&self.0, selection, ctx)?;
7676

@@ -104,6 +104,10 @@ impl<V: VTable> ArrayOperator for ArrayAdapter<V> {
104104
<V::OperatorVTable as OperatorVTable<V>>::reduce_parent(&self.0, parent, child_idx)
105105
}
106106

107+
fn as_pipelined(&self) -> Option<&dyn PipelinedNode> {
108+
<V::OperatorVTable as OperatorVTable<V>>::pipeline_node(&self.0)
109+
}
110+
107111
fn bind(
108112
&self,
109113
selection: Option<&ArrayRef>,
@@ -126,11 +130,17 @@ impl BindCtx for () {
126130

127131
impl dyn Array + '_ {
128132
pub fn execute(&self) -> VortexResult<Vector> {
129-
self.execute_batch(&Mask::new_true(self.len()), &mut DummyExecutionCtx)
133+
self.execute_with_selection(&Mask::new_true(self.len()))
130134
}
131135

132-
pub fn execute_with_selection(&self, mask: &Mask) -> VortexResult<Vector> {
133-
assert_eq!(self.len(), mask.len());
134-
self.execute_batch(mask, &mut DummyExecutionCtx)
136+
pub fn execute_with_selection(&self, selection: &Mask) -> VortexResult<Vector> {
137+
assert_eq!(self.len(), selection.len());
138+
139+
// Check if the array is a pipeline node
140+
if self.as_pipelined().is_some() {
141+
return PipelineDriver::new(self.to_array()).execute(selection);
142+
}
143+
144+
self.execute_batch(selection, &mut DummyExecutionCtx)
135145
}
136146
}

vortex-array/src/pipeline/operator/buffers.rs renamed to vortex-array/src/pipeline/driver/allocation.rs

Lines changed: 18 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,19 @@
33

44
//! Vector allocation strategy for pipelines
55
6-
#![allow(dead_code)]
7-
8-
use std::cell::RefCell;
9-
106
use vortex_error::{VortexExpect, VortexResult};
7+
use vortex_vector::VectorMut;
118

12-
use crate::pipeline::operator::{NodeId, PipelineNode};
13-
use crate::pipeline::vec::Vector;
14-
use crate::pipeline::{VType, VectorId};
9+
use crate::Array;
10+
use crate::pipeline::driver::{Node, NodeId};
11+
use crate::pipeline::{N, PipelineVector, VectorId};
1512

1613
#[derive(Debug)]
17-
pub struct VectorAllocationPlan {
14+
pub struct VectorAllocation {
1815
/// Where each node writes its output
1916
pub(crate) output_targets: Vec<OutputTarget>,
2017
/// The actual allocated vectors
21-
pub(crate) vectors: Vec<RefCell<Vector>>,
18+
pub(crate) vectors: Vec<PipelineVector>,
2219
}
2320

2421
// TODO(joe): support in-place view operations
@@ -30,41 +27,34 @@ pub(crate) enum OutputTarget {
3027
/// Node writes to the top-level provided output
3128
ExternalOutput,
3229
/// Node writes to an allocated intermediate vector
33-
IntermediateVector(usize), // vector idx
30+
IntermediateVector(VectorId), // vector idx
3431
}
3532

3633
impl OutputTarget {
3734
pub fn vector_id(&self) -> Option<VectorId> {
3835
match self {
39-
OutputTarget::IntermediateVector(idx) => Some(*idx),
36+
OutputTarget::IntermediateVector(vector_id) => Some(*vector_id),
4037
OutputTarget::ExternalOutput => None,
4138
}
4239
}
4340
}
4441

45-
/// Represents an allocated vector that can be reused
46-
#[derive(Debug, Clone)]
47-
struct VectorAllocation {
48-
/// Type of elements in this vector
49-
element_type: VType,
50-
}
51-
5242
// ============================================================================
5343
// Improved Pipeline with vector allocation
5444
// ============================================================================
5545

5646
/// Allocate vectors with lifetime analysis and zero-copy optimization
5747
pub(super) fn allocate_vectors(
58-
dag: &[PipelineNode],
48+
dag: &[Node],
5949
execution_order: &[NodeId],
60-
) -> VortexResult<VectorAllocationPlan> {
50+
) -> VortexResult<VectorAllocation> {
6151
let mut output_targets: Vec<Option<OutputTarget>> = vec![None; dag.len()];
62-
let mut allocations = Vec::new();
52+
let mut allocation_types = Vec::new();
6353

6454
// Process nodes in reverse execution order (top-down for output propagation)
6555
for &node_idx in execution_order.iter().rev() {
6656
let node = &dag[node_idx];
67-
let operator = &node.operator;
57+
let array = &node.array;
6858

6959
// Determine output target
7060
let output_target = if node.parents.is_empty() {
@@ -81,24 +71,22 @@ pub(super) fn allocate_vectors(
8171
// 3. Verify no conflicts with parallel execution paths
8272
// 4. Ensure proper vector lifetime management
8373

84-
let alloc_id = allocations.len();
85-
allocations.push(VectorAllocation {
86-
element_type: operator.dtype().into(),
87-
});
88-
OutputTarget::IntermediateVector(alloc_id)
74+
let vector_id = VectorId::new(allocation_types.len());
75+
allocation_types.push(array.dtype());
76+
OutputTarget::IntermediateVector(vector_id)
8977
};
9078

9179
output_targets[node_idx] = Some(output_target);
9280
}
9381

94-
Ok(VectorAllocationPlan {
82+
Ok(VectorAllocation {
9583
output_targets: output_targets
9684
.into_iter()
9785
.map(|target| target.vortex_expect("missing target"))
9886
.collect(),
99-
vectors: allocations
87+
vectors: allocation_types
10088
.into_iter()
101-
.map(|alloc| RefCell::new(Vector::new_with_vtype(alloc.element_type)))
89+
.map(|dtype| PipelineVector::Compact(VectorMut::with_capacity(dtype, 2 * N)))
10290
.collect(),
10391
})
10492
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use vortex_error::{VortexExpect, VortexResult};
5+
use vortex_vector::Vector;
6+
7+
use crate::array::ArrayOperator;
8+
use crate::pipeline::driver::Node;
9+
use crate::pipeline::driver::allocation::VectorAllocation;
10+
use crate::pipeline::{BindContext, Kernel, VectorId};
11+
12+
pub(crate) fn bind_kernels(
13+
dag: &[Node],
14+
allocation_plan: &VectorAllocation,
15+
all_batch_inputs: &[Vector],
16+
) -> VortexResult<Vec<Box<dyn Kernel>>> {
17+
let mut kernels = Vec::with_capacity(dag.len());
18+
for node in dag {
19+
let input_ids = node
20+
.children
21+
.iter()
22+
.map(|node_id| {
23+
allocation_plan.output_targets[*node_id]
24+
.vector_id()
25+
.vortex_expect("Input node must have an output vector ID")
26+
})
27+
.collect::<Vec<_>>();
28+
29+
let batch_inputs: Vec<_> = node
30+
.batch_inputs
31+
.iter()
32+
.map(|idx| all_batch_inputs[*idx].clone())
33+
.collect();
34+
35+
let bind_context = PipelineBindContext {
36+
children: &input_ids,
37+
batch_inputs: &batch_inputs,
38+
};
39+
40+
let pipelined = node
41+
.array
42+
.as_pipelined()
43+
.vortex_expect("Array in pipeline DAG does not support pipelined execution");
44+
45+
kernels.push(pipelined.bind(&bind_context)?);
46+
}
47+
Ok(kernels)
48+
}
49+
50+
struct PipelineBindContext<'a> {
51+
children: &'a [VectorId],
52+
batch_inputs: &'a [Vector],
53+
}
54+
55+
impl BindContext for PipelineBindContext<'_> {
56+
fn pipelined_input(&self, pipelined_child_idx: usize) -> VectorId {
57+
self.children[pipelined_child_idx]
58+
}
59+
60+
fn batch_input(&self, batch_child_idx: usize) -> Vector {
61+
self.batch_inputs[batch_child_idx].clone()
62+
}
63+
}

0 commit comments

Comments
 (0)