Skip to content

Commit 35d12a3

Browse files
authored
Faster Pipelines (#5352)
Removes a bunch of the pipeline calling overhead. --------- Signed-off-by: Nicholas Gates <[email protected]>
1 parent 1aac9ca commit 35d12a3

File tree

27 files changed

+1026
-1008
lines changed

27 files changed

+1026
-1008
lines changed

encodings/fastlanes/benches/pipeline_bitpacking.rs

Lines changed: 18 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -66,33 +66,21 @@ pub fn decompress_bitpacking_late_filter<T: NativePType>(bencher: Bencher, fract
6666
.bench_values(|mask| filter(array.to_canonical().as_ref(), &mask).unwrap());
6767
}
6868

69-
// TODO(ngates): bring back benchmarks once operator API is stable.
70-
// #[divan::bench(types = [i8, i16, i32, i64], args = TRUE_COUNT)]
71-
// pub fn decompress_bitpacking_pipeline_filter<T: Element + NativePType>(
72-
// bencher: Bencher,
73-
// fraction_kept: f64,
74-
// ) {
75-
// let mut rng = StdRng::seed_from_u64(0);
76-
// let values = (0..LENGTH)
77-
// .map(|_| T::from(rng.random_range(0..100)).unwrap())
78-
// .collect::<BufferMut<T>>()
79-
// .into_array()
80-
// .to_primitive();
81-
// let array = bitpack_to_best_bit_width(&values).unwrap();
82-
//
83-
// let mask = (0..LENGTH)
84-
// .map(|_| rng.random_bool(fraction_kept))
85-
// .collect::<BooleanBuffer>();
86-
//
87-
// bencher
88-
// .with_inputs(|| Mask::from_buffer(mask.clone()))
89-
// .bench_local_values(|mask| {
90-
// export_canonical_pipeline_expr(
91-
// array.dtype(),
92-
// array.len(),
93-
// array.to_operator().unwrap().unwrap().as_ref(),
94-
// &mask,
95-
// )
96-
// .unwrap()
97-
// });
98-
// }
69+
#[divan::bench(types = [i8, i16, i32, i64], args = TRUE_COUNT)]
70+
pub fn decompress_bitpacking_pipeline_filter<T: NativePType>(bencher: Bencher, fraction_kept: f64) {
71+
let mut rng = StdRng::seed_from_u64(0);
72+
let values = (0..LENGTH)
73+
.map(|_| T::from(rng.random_range(0..100)).unwrap())
74+
.collect::<BufferMut<T>>()
75+
.into_array()
76+
.to_primitive();
77+
let array = bitpack_to_best_bit_width(&values).unwrap();
78+
79+
let mask = (0..LENGTH)
80+
.map(|_| rng.random_bool(fraction_kept))
81+
.collect::<BitBuffer>();
82+
83+
bencher
84+
.with_inputs(|| Mask::from(mask.clone()))
85+
.bench_local_values(|mask| array.execute_with_selection(&mask).unwrap());
86+
}

encodings/fastlanes/benches/pipeline_v2_bitpacking_basic.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,6 @@ const BENCH_PARAMS: &[(usize, f64)] = &[
2626
(10_000, 1.0),
2727
(100_000, 0.5),
2828
(100_000, 1.0),
29-
(1_000_000, 0.5),
30-
(1_000_000, 1.0),
31-
(10_000_000, 0.5),
32-
(10_000_000, 1.0),
3329
];
3430

3531
#[divan::bench(args = BENCH_PARAMS)]

encodings/fastlanes/src/bitpacking/array/bitpack_pipeline.rs

Lines changed: 50 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
use std::mem::{transmute, transmute_copy};
5+
46
use fastlanes::{BitPacking, FastLanes};
57
use static_assertions::const_assert_eq;
68
use vortex_array::pipeline::{
@@ -9,7 +11,7 @@ use vortex_array::pipeline::{
911
use vortex_buffer::Buffer;
1012
use vortex_dtype::{PTypeDowncastExt, PhysicalPType, match_each_integer_ptype};
1113
use vortex_error::VortexResult;
12-
use vortex_mask::Mask;
14+
use vortex_mask::MaskMut;
1315
use vortex_vector::primitive::PVectorMut;
1416
use vortex_vector::{VectorMut, VectorMutOps};
1517

@@ -56,7 +58,9 @@ impl PipelinedNode for BitPackedArray {
5658
Ok(Box::new(AlignedBitPackedKernel::<T>::new(
5759
packed_bit_width,
5860
packed_buffer,
59-
self.validity.to_mask(self.len()),
61+
// FIXME(ngates): if we make sure the mask has offset zero, we know that split_off
62+
// inside the kernel is free.
63+
self.validity.to_mask(self.len()).into_mut(),
6064
)) as Box<dyn Kernel>)
6165
})
6266
}
@@ -85,7 +89,7 @@ pub struct AlignedBitPackedKernel<BP: PhysicalPType<Physical: BitPacking>> {
8589
packed_buffer: Buffer<BP::Physical>,
8690

8791
/// The validity mask for the bitpacked array.
88-
validity: Mask,
92+
validity: MaskMut,
8993

9094
/// The total number of bitpacked chunks we have unpacked.
9195
num_chunks_unpacked: usize,
@@ -94,8 +98,9 @@ pub struct AlignedBitPackedKernel<BP: PhysicalPType<Physical: BitPacking>> {
9498
impl<BP: PhysicalPType<Physical: BitPacking>> AlignedBitPackedKernel<BP> {
9599
pub fn new(
96100
packed_bit_width: usize,
101+
// TODO(ngates): hold an iterator over chunks instead of the full buffer?
97102
packed_buffer: Buffer<BP::Physical>,
98-
validity: Mask,
103+
validity: MaskMut,
99104
) -> Self {
100105
let packed_stride =
101106
packed_bit_width * <<BP as PhysicalPType>::Physical as FastLanes>::LANES;
@@ -119,30 +124,28 @@ impl<BP: PhysicalPType<Physical: BitPacking>> AlignedBitPackedKernel<BP> {
119124
impl<BP: PhysicalPType<Physical: BitPacking>> Kernel for AlignedBitPackedKernel<BP> {
120125
fn step(
121126
&mut self,
122-
_ctx: &KernelCtx,
127+
_ctx: &mut KernelCtx,
123128
selection: &BitView,
124-
out: &mut VectorMut,
125-
) -> VortexResult<()> {
126-
let output_vector: &mut PVectorMut<BP::Physical> = out.as_primitive_mut().downcast();
127-
debug_assert!(output_vector.is_empty());
129+
out: VectorMut,
130+
) -> VortexResult<VectorMut> {
131+
if selection.true_count() == 0 {
132+
debug_assert!(out.is_empty());
133+
return Ok(out);
134+
}
128135

129-
let packed_offset = self.num_chunks_unpacked * self.packed_stride;
130-
let not_yet_unpacked_values = &self.packed_buffer.as_slice()[packed_offset..];
136+
let mut output: PVectorMut<BP> = out.into_primitive().downcast();
137+
debug_assert!(output.is_empty());
131138

132-
let true_count = selection.true_count();
133-
let chunk_offset = self.num_chunks_unpacked * N;
134-
let array_len = self.validity.len();
135-
debug_assert!(chunk_offset < array_len);
139+
let packed_offset = self.num_chunks_unpacked * self.packed_stride;
140+
let packed_bytes = &self.packed_buffer[packed_offset..][..self.packed_stride];
136141

137142
// If the true count is very small (the selection is sparse), we can unpack individual
138143
// elements directly into the output vector.
139-
if true_count < SCALAR_UNPACK_THRESHOLD {
140-
output_vector.reserve(true_count);
141-
debug_assert!(true_count <= output_vector.capacity());
144+
if selection.true_count() < SCALAR_UNPACK_THRESHOLD {
145+
output.reserve(selection.true_count());
142146

143147
selection.iter_ones(|idx| {
144-
let absolute_idx = chunk_offset + idx;
145-
if self.validity.value(absolute_idx) {
148+
if self.validity.value(idx) {
146149
// SAFETY:
147150
// - The documentation for `packed_bit_width` explains that the size is valid.
148151
// - We know that the size of the `next_packed_chunk` we provide is equal to
@@ -151,72 +154,50 @@ impl<BP: PhysicalPType<Physical: BitPacking>> Kernel for AlignedBitPackedKernel<
151154
let unpacked_value = unsafe {
152155
BitPacking::unchecked_unpack_single(
153156
self.packed_bit_width,
154-
not_yet_unpacked_values,
157+
packed_bytes,
155158
idx,
156159
)
157160
};
158161

159162
// SAFETY: We just reserved enough capacity to push these values.
160-
unsafe { output_vector.push_unchecked(unpacked_value) };
163+
unsafe { output.push_unchecked(transmute_copy(&unpacked_value)) };
161164
} else {
162-
output_vector.append_nulls(1);
165+
output.append_nulls(1);
163166
}
164167
});
165-
} else {
166-
// Otherwise if the mask is dense, it is faster to fully unpack the entire 1024
167-
// element lane with SIMD / FastLanes and let other nodes in the pipeline decide if they
168-
// want to perform the selection filter themselves.
169-
output_vector.reserve(N);
170-
debug_assert!(N <= output_vector.capacity());
171-
172-
let next_packed_chunk = &not_yet_unpacked_values[..self.packed_stride];
173-
debug_assert_eq!(
174-
next_packed_chunk.len(),
175-
FL_VECTOR_SIZE * self.packed_bit_width / BP::Physical::T
176-
);
177168

178-
// SAFETY: We have just reserved enough capacity for the elements buffer to set the
179-
// length, and we are about to initialize all of the values **without** reading the
180-
// memory.
181-
unsafe { output_vector.elements_mut().set_len(N) };
182-
183-
// SAFETY:
184-
// - The documentation for `packed_bit_width` explains that the size is valid.
185-
// - We know that the size of the `next_packed_chunk` we provide is equal to
186-
// `self.packed_stride`, and we explain why this is correct in its documentation.
187-
// - It is clear that the output buffer has length 1024.
188-
unsafe {
189-
BitPacking::unchecked_unpack(
190-
self.packed_bit_width,
191-
next_packed_chunk,
192-
output_vector.as_mut(),
193-
);
194-
}
169+
self.num_chunks_unpacked += 1;
170+
return Ok(output.into());
171+
}
195172

196-
if array_len < chunk_offset + N {
197-
let vector_len = array_len - chunk_offset;
198-
debug_assert!(vector_len < N, "math is broken");
173+
// Otherwise if the mask is dense, it is faster to fully unpack the entire 1024
174+
// element lane with SIMD / FastLanes and let other nodes in the pipeline decide if they
175+
// want to perform the selection filter themselves.
176+
let (mut elements, _validity) = output.into_parts();
199177

200-
// SAFETY: This must be less than `N` so this is just a truncate.
201-
unsafe { output_vector.elements_mut().set_len(vector_len) };
178+
elements.reserve(N);
179+
// SAFETY: we just reserved enough capacity.
180+
unsafe { elements.set_len(N) };
202181

203-
let chunk_mask = self.validity.slice(chunk_offset..array_len);
182+
unsafe {
183+
BitPacking::unchecked_unpack(
184+
self.packed_bit_width,
185+
packed_bytes,
186+
transmute::<&mut [BP], &mut [BP::Physical]>(elements.as_mut()),
187+
);
188+
}
204189

205-
// SAFETY: We have just set the elements length to N, and the validity buffer has
206-
// capacity for N elements.
207-
unsafe { output_vector.validity_mut() }.append_mask(&chunk_mask);
208-
} else {
209-
let chunk_mask = self.validity.slice(chunk_offset..chunk_offset + N);
190+
// Prepare the output validity mask for this chunk.
191+
let mut chunk_validity = self.validity.split_off(N.min(self.validity.capacity()));
192+
std::mem::swap(&mut self.validity, &mut chunk_validity);
210193

211-
// SAFETY: We have just set the elements length to N, and the validity buffer has
212-
// capacity for N elements.
213-
unsafe { output_vector.validity_mut() }.append_mask(&chunk_mask);
214-
}
215-
}
194+
// For the final chunk, we may have fewer than N elements to unpack.
195+
// So we just set the length of the output to the correct value.
196+
unsafe { elements.set_len(chunk_validity.len()) };
216197

217198
self.num_chunks_unpacked += 1;
218199

219-
Ok(())
200+
Ok(PVectorMut::new(elements, chunk_validity).into())
220201
}
221202
}
222203

vortex-array/src/pipeline/driver/input.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@ impl InputKernel {
2222
impl Kernel for InputKernel {
2323
fn step(
2424
&mut self,
25-
_ctx: &KernelCtx,
25+
_ctx: &mut KernelCtx,
2626
selection: &BitView,
27-
out: &mut VectorMut,
28-
) -> VortexResult<()> {
27+
mut out: VectorMut,
28+
) -> VortexResult<VectorMut> {
2929
let mut batch = self
3030
.batch
3131
.take()
@@ -47,7 +47,7 @@ impl Kernel for InputKernel {
4747
selection.iter_ones(|idx| {
4848
out.extend_from_vector(&immutable.slice(idx..idx + 1));
4949
});
50-
return Ok(());
50+
return Ok(out);
5151
}
5252

5353
// We split off from our owned batch vector in chunks of size N, and then unsplit onto the
@@ -66,7 +66,7 @@ impl Kernel for InputKernel {
6666

6767
self.batch = Some(batch);
6868

69-
Ok(())
69+
Ok(out)
7070
}
7171
}
7272

0 commit comments

Comments
 (0)