Skip to content

Commit df2f2a9

Browse files
committed
Zero-copy pipeline input
Signed-off-by: Nicholas Gates <[email protected]>
1 parent 26fe604 commit df2f2a9

File tree

12 files changed

+126
-82
lines changed

12 files changed

+126
-82
lines changed

encodings/alp/benches/alp_compress.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66
use divan::Bencher;
77
use rand::rngs::StdRng;
88
use rand::{Rng, SeedableRng as _};
9-
use vortex_alp::{alp_encode, decompress, ALPFloat, ALPRDFloat, RDEncoder};
9+
use vortex_alp::{ALPFloat, ALPRDFloat, RDEncoder, alp_encode, decompress};
10+
use vortex_array::IntoArray;
1011
use vortex_array::arrays::PrimitiveArray;
1112
use vortex_array::compute::warm_up_vtables;
1213
use vortex_array::validity::Validity;
13-
use vortex_array::IntoArray;
14-
use vortex_buffer::{buffer, Buffer};
14+
use vortex_buffer::{Buffer, buffer};
1515
use vortex_dtype::NativePType;
1616

1717
fn main() {
@@ -118,7 +118,13 @@ fn decompress_alp_pipeline<T: ALPFloat + NativePType>(bencher: Bencher, args: (u
118118
bencher
119119
.with_inputs(|| {
120120
alp_encode(
121-
&PrimitiveArray::new(Buffer::copy_from(&values), validity.clone()),
121+
&PrimitiveArray::new(
122+
Buffer::copy_from(&values),
123+
validity
124+
.as_array()
125+
.map(|a| Validity::copy_from_array(a))
126+
.unwrap_or(validity.clone()),
127+
),
122128
None,
123129
)
124130
.unwrap()

encodings/alp/src/alp/operator.rs

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@ use std::marker::PhantomData;
55

66
use vortex_array::pipeline::bit_view::BitView;
77
use vortex_array::pipeline::{
8-
BindContext, Kernel, KernelCtx, PipelineInputs, PipelinedNode, VectorId, N,
8+
BindContext, Kernel, KernelCtx, N, PipelineInputs, PipelinedNode, VectorId,
99
};
1010
use vortex_array::vtable::OperatorVTable;
1111
use vortex_dtype::PTypeDowncastExt;
12-
use vortex_error::{vortex_bail, VortexResult};
12+
use vortex_error::{VortexResult, vortex_bail};
1313
use vortex_vector::primitive::PVectorMut;
1414
use vortex_vector::{VectorMut, VectorMutOps};
1515

16-
use crate::{match_each_alp_float_ptype, ALPArray, ALPFloat, ALPVTable, Exponents};
16+
use crate::{ALPArray, ALPFloat, ALPVTable, Exponents, match_each_alp_float_ptype};
1717

1818
impl OperatorVTable<ALPVTable> for ALPVTable {
1919
fn pipeline_node(array: &ALPArray) -> Option<&dyn PipelinedNode> {
@@ -139,11 +139,14 @@ fn sparse_alp<A: ALPFloat>(
139139

140140
#[cfg(test)]
141141
mod test {
142+
use rand::prelude::StdRng;
143+
use rand::{Rng, SeedableRng};
144+
use vortex_array::IntoArray;
142145
use vortex_array::arrays::PrimitiveArray;
143146
use vortex_array::validity::Validity;
144-
use vortex_array::IntoArray;
145-
use vortex_buffer::buffer;
147+
use vortex_buffer::{Buffer, buffer, buffer_mut};
146148
use vortex_dtype::PTypeDowncastExt;
149+
use vortex_vector::VectorOps;
147150

148151
use crate::alp_encode;
149152

@@ -161,4 +164,31 @@ mod test {
161164

162165
assert_eq!(decoded.elements(), &buffer);
163166
}
167+
168+
#[test]
169+
fn test_alp_into_mut() {
170+
let (n, fraction_patch, fraction_valid) = (10_000, 0.0, 1.0);
171+
let mut rng = StdRng::seed_from_u64(0);
172+
let mut values = buffer_mut![1.234f32; n];
173+
if fraction_patch > 0.0 {
174+
for index in 0..values.len() {
175+
if rng.random_bool(fraction_patch) {
176+
values[index] = 1000.0f32
177+
}
178+
}
179+
}
180+
let validity = if fraction_valid < 1.0 {
181+
Validity::from_iter((0..values.len()).map(|_| rng.random_bool(fraction_valid)))
182+
} else {
183+
Validity::NonNullable
184+
};
185+
let values = values.freeze();
186+
187+
// We take a copy of the values to ensure we can into_mut
188+
let array = PrimitiveArray::new(Buffer::copy_from(&values), validity.clone());
189+
let array = alp_encode(&array, None).unwrap().into_array();
190+
191+
let vector = array.execute().unwrap();
192+
assert_eq!(vector.len(), values.len());
193+
}
164194
}

vortex-array/src/array/operator.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,16 +129,17 @@ impl BindCtx for () {
129129
}
130130

131131
impl dyn Array + '_ {
132-
pub fn execute(&self) -> VortexResult<Vector> {
133-
self.execute_with_selection(&Mask::new_true(self.len()))
132+
pub fn execute(self: Arc<Self>) -> VortexResult<Vector> {
133+
let selection = Mask::new_true(self.len());
134+
self.execute_with_selection(&selection)
134135
}
135136

136-
pub fn execute_with_selection(&self, selection: &Mask) -> VortexResult<Vector> {
137+
pub fn execute_with_selection(self: Arc<Self>, selection: &Mask) -> VortexResult<Vector> {
137138
assert_eq!(self.len(), selection.len());
138139

139140
// Check if the array is a pipeline node
140141
if self.as_pipelined().is_some() {
141-
return PipelineDriver::new(self.to_array()).execute(selection);
142+
return PipelineDriver::new(self).execute(selection);
142143
}
143144

144145
self.execute_batch(selection, &mut DummyExecutionCtx)

vortex-array/src/arrays/listview/vtable/operator.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ mod tests {
5858
fn test_listview_operator_basic() {
5959
// Test basic ListView execution without selection.
6060
// ListView: [[0,1,2], [3,4], [5,6], [7,8,9]]
61-
let listview = create_basic_listview();
61+
let listview = create_basic_listview().into_array();
6262

6363
// Execute without selection.
6464
let result = listview.execute().unwrap();
@@ -94,7 +94,8 @@ mod tests {
9494
.into_array();
9595
let offsets = PrimitiveArray::from_iter([0u32, 2, 4, 6, 8, 10]).into_array();
9696
let sizes = PrimitiveArray::from_iter([2u32, 2, 2, 2, 2, 2]).into_array();
97-
let listview = ListViewArray::new(elements, offsets, sizes, Validity::AllValid);
97+
let listview =
98+
ListViewArray::new(elements, offsets, sizes, Validity::AllValid).into_array();
9899

99100
// Create selection mask: [true, false, true, false, true, false].
100101
let selection = Mask::from_iter([true, false, true, false, true, false]);
@@ -126,7 +127,7 @@ mod tests {
126127
#[test]
127128
fn test_listview_operator_with_nulls_and_selection() {
128129
// Use the nullable listview: [[10,20], null, [50]]
129-
let listview = create_nullable_listview();
130+
let listview = create_nullable_listview().into_array();
130131

131132
// Create selection mask: [true, true, false].
132133
let selection = Mask::from_iter([true, true, false]);
@@ -159,7 +160,7 @@ mod tests {
159160
#[test]
160161
fn test_listview_operator_overlapping_with_selection() {
161162
// Use the overlapping listview: [[5,6,7], [2,3], [8,9], [0,1], [1,2,3,4]]
162-
let listview = create_overlapping_listview();
163+
let listview = create_overlapping_listview().into_array();
163164

164165
// Create selection mask: [true, false, true, true, false].
165166
let selection = Mask::from_iter([true, false, true, true, false]);

vortex-array/src/arrays/primitive/vtable/operator.rs

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,34 +5,25 @@ use vortex_buffer::Buffer;
55
use vortex_compute::filter::Filter;
66
use vortex_dtype::match_each_native_ptype;
77
use vortex_error::VortexResult;
8+
use vortex_mask::Mask;
9+
use vortex_vector::Vector;
810
use vortex_vector::primitive::PVector;
911

1012
use crate::arrays::{MaskedVTable, PrimitiveArray, PrimitiveVTable};
11-
use crate::execution::{BatchKernelRef, BindCtx, kernel};
13+
use crate::execution::ExecutionCtx;
1214
use crate::vtable::{OperatorVTable, ValidityHelper};
1315
use crate::{ArrayRef, IntoArray};
1416

1517
impl OperatorVTable<PrimitiveVTable> for PrimitiveVTable {
16-
fn bind(
18+
fn execute_batch(
1719
array: &PrimitiveArray,
18-
selection: Option<&ArrayRef>,
19-
ctx: &mut dyn BindCtx,
20-
) -> VortexResult<BatchKernelRef> {
21-
let mask = ctx.bind_selection(array.len(), selection)?;
22-
let validity = ctx.bind_validity(array.validity(), array.len(), selection)?;
23-
20+
selection: &Mask,
21+
_ctx: &mut dyn ExecutionCtx,
22+
) -> VortexResult<Vector> {
23+
let validity = array.validity_mask().filter(selection);
2424
match_each_native_ptype!(array.ptype(), |P| {
25-
let elements = array.buffer::<P>();
26-
Ok(kernel(move || {
27-
let mask = mask.execute()?;
28-
let validity = validity.execute()?;
29-
30-
// Note that validity already has the mask applied so we only need to apply it to
31-
// the elements.
32-
let elements = elements.filter(&mask);
33-
34-
Ok(PVector::<P>::try_new(elements, validity)?.into())
35-
}))
25+
let elements = array.buffer::<P>().filter(selection);
26+
Ok(PVector::<P>::try_new(elements, validity)?.into())
3627
})
3728
}
3829

vortex-array/src/arrays/struct_/vtable/operator.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ mod tests {
6161
5,
6262
Validity::AllValid,
6363
)
64-
.unwrap();
64+
.unwrap()
65+
.into_array();
6566

6667
// Execute without selection.
6768
let result = struct_array.execute().unwrap();
@@ -94,7 +95,8 @@ mod tests {
9495
6,
9596
Validity::AllValid,
9697
)
97-
.unwrap();
98+
.unwrap()
99+
.into_array();
98100

99101
// Create a selection mask that selects indices 0, 2, 4 (alternating pattern).
100102
let selection = Mask::from_iter([true, false, true, false, true, false]);
@@ -146,7 +148,8 @@ mod tests {
146148
6,
147149
struct_validity,
148150
)
149-
.unwrap();
151+
.unwrap()
152+
.into_array();
150153

151154
// Create a selection mask that selects indices 0, 1, 2, 4, 5.
152155
let selection = Mask::from_iter([true, true, true, false, true, true]);

vortex-array/src/compute/arrays/is_not_null.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,10 @@ mod tests {
128128
)
129129
.into_array();
130130

131-
let result = IsNotNullArray::new(array).execute()?.into_bool();
131+
let result = IsNotNullArray::new(array)
132+
.into_array()
133+
.execute()?
134+
.into_bool();
132135
assert!(result.validity().all_true());
133136
assert_eq!(result.bits(), &validity);
134137

vortex-array/src/compute/arrays/is_null.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ mod tests {
131131
)
132132
.into_array();
133133

134-
let result = IsNullArray::new(array).execute()?.into_bool();
134+
let result = IsNullArray::new(array).into_array().execute()?.into_bool();
135135
assert!(result.validity().all_true());
136136
assert_eq!(result.bits(), &validity.not());
137137

vortex-array/src/pipeline/driver/allocation.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@
66
use vortex_error::{VortexExpect, VortexResult};
77
use vortex_vector::VectorMut;
88

9-
use crate::pipeline::driver::{Node, NodeId};
10-
use crate::pipeline::{VectorId, N};
119
use crate::Array;
10+
use crate::pipeline::driver::{Node, NodeId};
11+
use crate::pipeline::{N, VectorId};
1212

1313
#[derive(Debug)]
1414
pub struct VectorAllocation {
15-
/// Where each node writes its output
15+
/// Where each node writes its output, indexed by [`NodeId`].
1616
pub(crate) output_targets: Vec<OutputTarget>,
1717
/// The actual allocated vectors
1818
pub(crate) vectors: Vec<VectorMut>,
@@ -22,7 +22,7 @@ pub struct VectorAllocation {
2222
// Node mutates its input in-place (input node index, vector idx)
2323
// add variant InPlace.
2424
/// Tracks which vector a node outputs to
25-
#[derive(Debug, Clone)]
25+
#[derive(Debug, Clone, Copy)]
2626
pub(crate) enum OutputTarget {
2727
/// Node writes to the top-level provided output
2828
ExternalOutput,
@@ -52,8 +52,8 @@ pub(super) fn allocate_vectors(
5252
let mut allocation_types = Vec::new();
5353

5454
// Process nodes in reverse execution order (top-down for output propagation)
55-
for &node_idx in execution_order.iter().rev() {
56-
let node = &dag[node_idx];
55+
for &node_id in execution_order.iter().rev() {
56+
let node = &dag[node_id];
5757
let array = &node.array;
5858

5959
// Determine output target
@@ -76,7 +76,7 @@ pub(super) fn allocate_vectors(
7676
OutputTarget::IntermediateVector(vector_id)
7777
};
7878

79-
output_targets[node_idx] = Some(output_target);
79+
output_targets[node_id] = Some(output_target);
8080
}
8181

8282
Ok(VectorAllocation {

vortex-array/src/pipeline/driver/bind.rs

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,41 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
use std::iter;
5+
46
use vortex_error::{VortexExpect, VortexResult};
57
use vortex_vector::{Vector, VectorOps};
68

79
use crate::array::ArrayOperator;
810
use crate::pipeline::driver::allocation::VectorAllocation;
911
use crate::pipeline::driver::input::InputKernel;
10-
use crate::pipeline::driver::{Node, NodeKind};
12+
use crate::pipeline::driver::{Node, NodeId, NodeKind};
1113
use crate::pipeline::{BindContext, Kernel, VectorId};
1214

1315
// We consume the DAG and the batch inputs such that our into_mut calls are safe
1416
pub(crate) fn bind_kernels(
1517
dag: Vec<Node>,
18+
execution_order: &[NodeId],
1619
allocation_plan: &VectorAllocation,
1720
mut all_batch_inputs: Vec<Option<Vector>>,
1821
) -> VortexResult<Vec<Box<dyn Kernel>>> {
19-
let mut kernels = Vec::with_capacity(dag.len());
20-
for node in dag {
22+
// We construct kernels in top-down order (i.e. inverse execution order) such that any arrays
23+
// in parent nodes are dropped prior to constructing their children. This gives us the best
24+
// chance of performing zero-copy into_mut calls on the pipeline input arrays.
25+
26+
let mut dag: Vec<_> = dag.into_iter().map(Some).collect();
27+
let mut kernels: Vec<Option<Box<dyn Kernel>>> =
28+
iter::repeat_with(|| None).take(dag.len()).collect();
29+
30+
// Note that the execution order is bottom-up, so to make sure we consume an array's parent
31+
// before construct the array's own kernel, we must construct kernels in reverse order.
32+
for node_id in execution_order.iter().copied().rev() {
33+
let node = dag[node_id].take().vortex_expect("already processed");
34+
2135
let input_ids = node
2236
.children
2337
.iter()
24-
.map(|node_id| {
25-
allocation_plan.output_targets[*node_id]
26-
.vector_id()
27-
.vortex_expect("Input node must have an output vector ID")
28-
})
38+
.map(|child_node_id| allocation_plan.output_targets[*child_node_id].vector_id())
2939
.collect::<Vec<_>>();
3040

3141
let mut batch_inputs: Vec<_> = node
@@ -34,7 +44,7 @@ pub(crate) fn bind_kernels(
3444
.map(|idx| all_batch_inputs[*idx].take())
3545
.collect();
3646

37-
kernels.push(match node.array.as_pipelined() {
47+
kernels[node_id] = Some(match node.array.as_pipelined() {
3848
None => {
3949
// If the node cannot be pipelined, it must be an input node
4050
assert_eq!(node.kind, NodeKind::Input);
@@ -61,17 +71,22 @@ pub(crate) fn bind_kernels(
6171
}
6272
});
6373
}
64-
Ok(kernels)
74+
75+
Ok(kernels
76+
.into_iter()
77+
.map(|k| k.vortex_expect("missing kernel"))
78+
.collect())
6579
}
6680

6781
struct PipelineBindContext<'a> {
68-
children: &'a [VectorId],
82+
children: &'a [Option<VectorId>],
6983
batch_inputs: &'a mut [Option<Vector>],
7084
}
7185

7286
impl BindContext for PipelineBindContext<'_> {
7387
fn pipelined_input(&self, pipelined_child_idx: usize) -> VectorId {
7488
self.children[pipelined_child_idx]
89+
.vortex_expect("In-place transforms do not have VectorIDs for their zero'th child.")
7590
}
7691

7792
fn batch_input(&mut self, batch_child_idx: usize) -> Vector {

0 commit comments

Comments
 (0)