Skip to content

Commit 0dcefb6

Browse files
committed
add aligned bitpacked pipeline kernel
Also adds some tests. Signed-off-by: Connor Tsui <[email protected]>
1 parent 1cb7a39 commit 0dcefb6

File tree

9 files changed

+314
-203
lines changed

9 files changed

+314
-203
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

encodings/fastlanes/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ log = { workspace = true }
2626
num-traits = { workspace = true }
2727
prost = { workspace = true }
2828
rand = { workspace = true, optional = true }
29+
static_assertions = { workspace = true }
2930
vortex-array = { workspace = true }
3031
vortex-buffer = { workspace = true }
3132
vortex-compute = { workspace = true }
Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use fastlanes::{BitPacking, FastLanes};
5+
use static_assertions::const_assert_eq;
6+
use vortex_array::pipeline::bit_view::BitView;
7+
use vortex_array::pipeline::{BindContext, Kernel, KernelCtx, N, PipelineInputs, PipelinedNode};
8+
use vortex_buffer::Buffer;
9+
use vortex_dtype::{PTypeDowncastExt, PhysicalPType, match_each_integer_ptype};
10+
use vortex_error::VortexResult;
11+
use vortex_vector::primitive::PVectorMut;
12+
use vortex_vector::{VectorMut, VectorMutOps};
13+
14+
use crate::BitPackedArray;
15+
16+
/// The size of a FastLanes vector of elements.
17+
const FL_VECTOR_SIZE: usize = 1024;
18+
19+
// Bitpacking uses FastLanes decompression, which expects a multiple of 1024 elements.
20+
const_assert_eq!(N, FL_VECTOR_SIZE);
21+
22+
// TODO(connor): Run some benchmarks to actually get a good value.
23+
/// The true count threshold at which it is faster to unpack individual bitpacked values one at a
24+
/// time instead of unpack entire vectors and then filter later.
25+
const SCALAR_UNPACK_THRESHOLD: usize = 7;
26+
27+
impl PipelinedNode for BitPackedArray {
28+
fn inputs(&self) -> PipelineInputs {
29+
PipelineInputs::Source
30+
}
31+
32+
fn bind(&self, _ctx: &dyn BindContext) -> VortexResult<Box<dyn Kernel>> {
33+
debug_assert!(self.bit_width > 0);
34+
35+
if self.patches.is_some() {
36+
unimplemented!(
37+
"We do not handle patches for bitpacked right now, as this will become a parent patch array"
38+
);
39+
}
40+
41+
match_each_integer_ptype!(self.ptype(), |T| {
42+
let packed_bit_width = self.bit_width as usize;
43+
let packed_buffer = Buffer::<<T as PhysicalPType>::Physical>::from_byte_buffer(
44+
self.packed.clone().into_byte_buffer(),
45+
);
46+
47+
// See the documentation for `AlignedBitPackedKernel` for more info on why we need this.
48+
let packed_stride =
49+
self.bit_width as usize * <<T as PhysicalPType>::Physical as FastLanes>::LANES;
50+
51+
if self.offset != 0 {
52+
// TODO(ngates): the unaligned kernel needs fixing for the non-masked API
53+
unimplemented!(
54+
"Unaligned `BitPackedArray` as a `PipelineSource` is not yet implemented"
55+
)
56+
}
57+
58+
Ok(Box::new(AlignedBitPackedKernel::<T>::new(
59+
packed_bit_width,
60+
packed_stride,
61+
packed_buffer,
62+
)) as Box<dyn Kernel>)
63+
})
64+
}
65+
}
66+
67+
pub struct AlignedBitPackedKernel<BP: PhysicalPType<Physical: BitPacking>> {
68+
/// The bit width of each bitpacked value.
69+
///
70+
/// This is guaranteed to be less than or equal to the (unpacked) bit-width of `BP`.
71+
packed_bit_width: usize,
72+
73+
/// The stride of the bitpacked values, which when fully unpacked will occupy exactly 1024 bits.
74+
/// This is equal to `1024 * bit_width / BP::Physical::T`
75+
///
76+
/// We store this here so that we do not have to keep calculating this in [`step()`].
77+
///
78+
/// For example, if the `bit_width` is 10 and the physical type is `u16` (which will fill up
79+
/// `1024 / 16 = 64` lanes), the `packed_stride` will be `10 * 64 = 640`. This ensures we pass
80+
/// a slice with the correct length to [`BitPacking::unchecked_unpack`].
81+
///
82+
/// [`step()`]: SourceKernel::step
83+
/// [`BitPacking::unchecked_unpack()`]: BitPacking::unchecked_unpack
84+
packed_stride: usize,
85+
86+
/// The buffer containing the bitpacked values.
87+
packed_buffer: Buffer<BP::Physical>,
88+
89+
/// The total number of bitpacked chunks we have unpacked.
90+
num_chunks_unpacked: usize,
91+
}
92+
93+
impl<BP: PhysicalPType<Physical: BitPacking>> AlignedBitPackedKernel<BP> {
94+
pub fn new(
95+
packed_bit_width: usize,
96+
packed_stride: usize,
97+
packed_buffer: Buffer<BP::Physical>,
98+
) -> Self {
99+
assert_eq!(
100+
packed_stride,
101+
FL_VECTOR_SIZE * packed_bit_width / BP::Physical::T
102+
);
103+
assert!(packed_bit_width <= BP::Physical::T);
104+
105+
Self {
106+
packed_bit_width,
107+
packed_stride,
108+
packed_buffer,
109+
num_chunks_unpacked: 0,
110+
}
111+
}
112+
}
113+
114+
impl<BP: PhysicalPType<Physical: BitPacking>> Kernel for AlignedBitPackedKernel<BP> {
115+
fn step(
116+
&mut self,
117+
_ctx: &KernelCtx,
118+
selection: &BitView,
119+
out: &mut VectorMut,
120+
) -> VortexResult<()> {
121+
let output_vector: &mut PVectorMut<BP::Physical> = out.as_primitive_mut().downcast();
122+
debug_assert!(output_vector.is_empty());
123+
124+
let packed_offset = self.num_chunks_unpacked * self.packed_stride;
125+
let not_yet_unpacked_values = &self.packed_buffer.as_slice()[packed_offset..];
126+
127+
let true_count = selection.true_count();
128+
129+
// If the true count is very small (the selection is sparse), we can unpack individual
130+
// elements directly into the output vector.
131+
if true_count < SCALAR_UNPACK_THRESHOLD {
132+
output_vector.reserve(true_count);
133+
debug_assert!(true_count <= output_vector.capacity());
134+
135+
selection.iter_ones(|idx| {
136+
// SAFETY:
137+
// - The documentation for `packed_bit_width` explains that the size is valid.
138+
// - We know that the size of the `next_packed_chunk` we provide is equal to
139+
// `self.packed_stride`, and we explain why this is correct in its documentation.
140+
let unpacked_value = unsafe {
141+
BitPacking::unchecked_unpack_single(
142+
self.packed_bit_width,
143+
not_yet_unpacked_values,
144+
idx,
145+
)
146+
};
147+
148+
// SAFETY: We just reserved enough capacity to push these values.
149+
unsafe { output_vector.push_unchecked(unpacked_value) };
150+
});
151+
} else {
152+
// Otherwise if the mask is dense, it is faster to fully unpack the entire 1024
153+
// element lane with SIMD / FastLanes and let other nodes in the pipeline decide if they
154+
// want to perform the selection filter themselves.
155+
output_vector.reserve(N);
156+
debug_assert!(N <= output_vector.capacity());
157+
158+
// SAFETY: We have just reserved enough capacity for the vector to set the length, and
159+
// we also are about to initialize all of the values **without** reading the memory.
160+
unsafe { output_vector.set_len(N) };
161+
162+
let next_packed_chunk = &not_yet_unpacked_values[..self.packed_stride];
163+
debug_assert_eq!(
164+
next_packed_chunk.len(),
165+
FL_VECTOR_SIZE * self.packed_bit_width / BP::Physical::T
166+
);
167+
168+
// SAFETY:
169+
// - The documentation for `packed_bit_width` explains that the size is valid.
170+
// - We know that the size of the `next_packed_chunk` we provide is equal to
171+
// `self.packed_stride`, and we explain why this is correct in its documentation.
172+
// - It is clear that the output buffer has length 1024.
173+
unsafe {
174+
BitPacking::unchecked_unpack(
175+
self.packed_bit_width,
176+
next_packed_chunk,
177+
&mut output_vector.as_mut()[..FL_VECTOR_SIZE],
178+
);
179+
}
180+
}
181+
182+
self.num_chunks_unpacked += 1;
183+
184+
Ok(())
185+
}
186+
}
187+
188+
#[cfg(test)]
189+
mod tests {
190+
use vortex_array::arrays::PrimitiveArray;
191+
use vortex_dtype::PTypeDowncast;
192+
use vortex_mask::Mask;
193+
use vortex_vector::VectorOps;
194+
195+
use crate::BitPackedArray;
196+
197+
#[test]
198+
fn test_bitpack_pipeline_basic() {
199+
// Create exactly 1024 elements (0 to 1023).
200+
let values = (0..1024).map(|i| i as u32);
201+
let primitive = PrimitiveArray::from_iter(values).to_array();
202+
203+
// Encode with 10-bit width (max value 1023 fits in 10 bits).
204+
let bitpacked = BitPackedArray::encode(&primitive, 10).unwrap();
205+
assert_eq!(bitpacked.bit_width(), 10, "Bit width should be 10");
206+
207+
// Select all elements.
208+
let mask = Mask::new_true(1024);
209+
210+
// This should trigger the pipeline since `BitPackedArray` implements `PipelinedNode`.
211+
let result = bitpacked.to_array().execute_with_selection(&mask).unwrap();
212+
assert_eq!(result.len(), 1024, "Result should have 1024 elements");
213+
214+
let pvector_u32 = result.as_primitive().into_u32();
215+
let elements = pvector_u32.elements().as_slice();
216+
217+
for i in 0..1024 {
218+
assert_eq!(
219+
elements[i], i as u32,
220+
"Value at index {} should be {}",
221+
i, i
222+
);
223+
}
224+
}
225+
226+
#[test]
227+
fn test_bitpack_pipeline_dense_75_percent() {
228+
// Create exactly 1024 elements (0 to 1023).
229+
let values = (0..1024).map(|i| i as u32);
230+
let primitive = PrimitiveArray::from_iter(values).to_array();
231+
232+
// Encode with 10-bit width.
233+
let bitpacked = BitPackedArray::encode(&primitive, 10).unwrap();
234+
assert_eq!(bitpacked.bit_width(), 10, "Bit width should be 10");
235+
236+
// Select 75% of elements (768 out of 1024) - every element where index % 4 != 0.
237+
let indices: Vec<usize> = (0..1024).filter(|i| i % 4 != 0).collect();
238+
assert_eq!(indices.len(), 768, "Should select exactly 768 elements");
239+
let mask = Mask::from_indices(1024, indices);
240+
241+
// This should still use the dense path since true_count >= 7.
242+
let result = bitpacked.to_array().execute_with_selection(&mask).unwrap();
243+
assert_eq!(
244+
result.len(),
245+
1024,
246+
"Result should have 1024 elements (dense path outputs all N elements)"
247+
);
248+
249+
let pvector_u32 = result.as_primitive().into_u32();
250+
let elements = pvector_u32.elements().as_slice();
251+
252+
// Check that selected elements have correct values.
253+
// Elements where index % 4 != 0 should have their original values.
254+
for i in 0..1024 {
255+
if i % 4 != 0 {
256+
assert_eq!(
257+
elements[i], i as u32,
258+
"Selected element at {} should be {}",
259+
i, i
260+
);
261+
}
262+
// Note: Unselected elements (where i % 4 == 0) may have undefined values.
263+
}
264+
}
265+
266+
#[test]
267+
fn test_bitpack_pipeline_sparse_5_elements() {
268+
// Create exactly 1024 elements (0 to 1023).
269+
let values = (0..1024).map(|i| i as u32);
270+
let primitive = PrimitiveArray::from_iter(values).to_array();
271+
272+
// Encode with 10-bit width.
273+
let bitpacked = BitPackedArray::encode(&primitive, 10).unwrap();
274+
assert_eq!(bitpacked.bit_width(), 10, "Bit width should be 10");
275+
276+
// Select only 5 elements at specific indices.
277+
let indices = vec![10, 100, 256, 512, 1000];
278+
let mask = Mask::from_indices(1024, indices);
279+
280+
// This should use the sparse path since true_count < 7.
281+
let result = bitpacked.to_array().execute_with_selection(&mask).unwrap();
282+
assert_eq!(result.len(), 5, "Result should have 5 elements");
283+
284+
let pvector_u32 = result.as_primitive().into_u32();
285+
let elements = pvector_u32.elements().as_slice();
286+
287+
// Verify the values match the selected indices.
288+
assert_eq!(elements[0], 10);
289+
assert_eq!(elements[1], 100);
290+
assert_eq!(elements[2], 256);
291+
assert_eq!(elements[3], 512);
292+
assert_eq!(elements[4], 1000);
293+
}
294+
}

encodings/fastlanes/src/bitpacking/array/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use vortex_error::{VortexResult, vortex_bail, vortex_ensure};
1313

1414
pub mod bitpack_compress;
1515
pub mod bitpack_decompress;
16+
pub mod bitpack_pipeline;
1617
pub mod unpack_iter;
1718

1819
use crate::bitpack_compress::bitpack_encode;

encodings/fastlanes/src/bitpacking/vtable/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ mod array;
1010
mod canonical;
1111
mod encode;
1212
mod operations;
13+
mod operator;
1314
mod serde;
1415
mod validity;
1516
mod visitor;
@@ -28,7 +29,7 @@ impl VTable for BitPackedVTable {
2829
type ComputeVTable = NotSupported;
2930
type EncodeVTable = Self;
3031
type SerdeVTable = Self;
31-
type OperatorVTable = NotSupported;
32+
type OperatorVTable = Self;
3233

3334
fn id(_encoding: &Self::Encoding) -> EncodingId {
3435
EncodingId::new_ref("fastlanes.bitpacked")

0 commit comments

Comments
 (0)