Skip to content

Commit 07e35c8

Browse files
committed
add aligned bitpacked pipeline kernel
Signed-off-by: Connor Tsui <[email protected]>
1 parent 1cb7a39 commit 07e35c8

File tree

7 files changed

+201
-4
lines changed

7 files changed

+201
-4
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

encodings/fastlanes/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ log = { workspace = true }
2626
num-traits = { workspace = true }
2727
prost = { workspace = true }
2828
rand = { workspace = true, optional = true }
29+
static_assertions = { workspace = true }
2930
vortex-array = { workspace = true }
3031
vortex-buffer = { workspace = true }
3132
vortex-compute = { workspace = true }
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use fastlanes::{BitPacking, FastLanes};
5+
use static_assertions::const_assert_eq;
6+
use vortex_array::pipeline::bit_view::BitView;
7+
use vortex_array::pipeline::{BindContext, KernelCtx, N, PipelinedNode};
8+
use vortex_array::pipeline::{Kernel, PipelineInputs};
9+
use vortex_buffer::Buffer;
10+
use vortex_dtype::{PTypeDowncastExt, PhysicalPType, match_each_integer_ptype};
11+
use vortex_error::VortexResult;
12+
use vortex_vector::primitive::PVectorMut;
13+
use vortex_vector::{VectorMut, VectorMutOps};
14+
15+
use crate::BitPackedArray;
16+
17+
/// The size of a FastLanes vector of elements.
18+
const FL_VECTOR_SIZE: usize = 1024;
19+
20+
// Bitpacking uses FastLanes decompression, which expects a multiple of 1024 elements.
21+
const_assert_eq!(N, FL_VECTOR_SIZE);
22+
23+
impl PipelinedNode for BitPackedArray {
24+
fn inputs(&self) -> PipelineInputs {
25+
PipelineInputs::Source
26+
}
27+
28+
fn bind(&self, _ctx: &dyn BindContext) -> VortexResult<Box<dyn Kernel>> {
29+
debug_assert!(self.bit_width > 0);
30+
31+
if self.patches.is_some() {
32+
unimplemented!(
33+
"We do not handle patches for bitpacked right now, as this will become a parent patch array"
34+
);
35+
}
36+
37+
match_each_integer_ptype!(self.ptype(), |T| {
38+
let packed_bit_width = self.bit_width as usize;
39+
let packed_buffer = Buffer::<<T as PhysicalPType>::Physical>::from_byte_buffer(
40+
self.packed.clone().into_byte_buffer(),
41+
);
42+
43+
// See the documentation for `AlignedBitPackedKernel` for more info on why we need this.
44+
let packed_stride =
45+
self.bit_width as usize * <<T as PhysicalPType>::Physical as FastLanes>::LANES;
46+
47+
if self.offset != 0 {
48+
// TODO(ngates): the unaligned kernel needs fixing for the non-masked API
49+
unimplemented!(
50+
"Unaligned `BitPackedArray` as a `PipelineSource` is not yet implemented"
51+
)
52+
}
53+
54+
Ok(Box::new(AlignedBitPackedKernel::<T>::new(
55+
packed_bit_width,
56+
packed_stride,
57+
packed_buffer,
58+
)) as Box<dyn Kernel>)
59+
})
60+
}
61+
}
62+
63+
pub struct AlignedBitPackedKernel<BP: PhysicalPType<Physical: BitPacking>> {
64+
/// The bit width of each bitpacked value.
65+
///
66+
/// This is guaranteed to be less than or equal to the (unpacked) bit-width of `BP`.
67+
packed_bit_width: usize,
68+
69+
/// The stride of the bitpacked values, which when fully unpacked will occupy exactly 1024 bits.
70+
/// This is equal to `1024 * bit_width / size_of::<T>()`
71+
///
72+
/// We store this here so that we do not have to keep calculating this in [`step()`].
73+
///
74+
/// For example, if the `bit_width` is 10 and the physical type is `u16` (which will fill up
75+
/// `1024 / 16 = 64` lanes), the `packed_stride` will be `10 * 64 = 640`. This ensures we pass
76+
/// a slice with the correct length to [`BitPacking::unchecked_unpack`].
77+
///
78+
/// [`step()`]: SourceKernel::step
79+
/// [`BitPacking::unchecked_unpack()`]: BitPacking::unchecked_unpack
80+
packed_stride: usize,
81+
82+
/// The buffer containing the bitpacked values.
83+
packed_buffer: Buffer<BP::Physical>,
84+
85+
/// The total number of bitpacked chunks we have unpacked.
86+
num_chunks_unpacked: usize,
87+
}
88+
89+
impl<BP: PhysicalPType<Physical: BitPacking>> AlignedBitPackedKernel<BP> {
90+
pub fn new(
91+
packed_bit_width: usize,
92+
packed_stride: usize,
93+
packed_buffer: Buffer<BP::Physical>,
94+
) -> Self {
95+
assert_eq!(
96+
packed_stride,
97+
FL_VECTOR_SIZE * packed_bit_width / size_of::<BP>()
98+
);
99+
assert!(packed_bit_width <= BP::Physical::T);
100+
101+
Self {
102+
packed_bit_width,
103+
packed_stride,
104+
packed_buffer,
105+
num_chunks_unpacked: 0,
106+
}
107+
}
108+
}
109+
110+
impl<BP: PhysicalPType<Physical: BitPacking>> Kernel for AlignedBitPackedKernel<BP> {
111+
fn step(
112+
&mut self,
113+
_ctx: &KernelCtx,
114+
selection: &BitView,
115+
out: &mut VectorMut,
116+
) -> VortexResult<()> {
117+
let output_vector: &mut PVectorMut<BP::Physical> = out.as_primitive_mut().downcast();
118+
debug_assert!(output_vector.is_empty());
119+
120+
let packed_offset = self.num_chunks_unpacked * self.packed_stride;
121+
let not_yet_unpacked_values = &self.packed_buffer.as_slice()[packed_offset..];
122+
123+
let true_count = selection.true_count();
124+
125+
// If the true count is very small, we can unpack individual elements directly into the
126+
// output vector.
127+
if true_count < 7 {
128+
output_vector.reserve(true_count);
129+
debug_assert!(true_count <= output_vector.capacity());
130+
131+
// SAFETY: We have just reserved enough capacity for the vector to set the length, and
132+
// we also are about to initialize all of the values **without** reading the memory.
133+
unsafe { output_vector.set_len(true_count) };
134+
135+
selection.iter_ones(|idx| {
136+
// SAFETY:
137+
// - The documentation for `packed_bit_width` explains that the size is valid.
138+
// - We know that the size of the `next_packed_chunk` we provide is equal to
139+
// `self.packed_stride`, and we explain why this is correct in its documentation.
140+
let unpacked_value = unsafe {
141+
BitPacking::unchecked_unpack_single(
142+
self.packed_bit_width,
143+
not_yet_unpacked_values,
144+
idx,
145+
)
146+
};
147+
148+
// SAFETY: We just reserved enough capacity to push these values.
149+
unsafe { output_vector.push_unchecked(unpacked_value) };
150+
});
151+
} else {
152+
// Otherwise, it is faster to fully unpack the entire 1024 element lane with SIMD /
153+
// FastLanes and let other nodes in the pipeline decide if they want to perform the
154+
// selection filter themselves.
155+
output_vector.reserve(N);
156+
debug_assert!(N <= output_vector.capacity());
157+
158+
// SAFETY: We have just reserved enough capacity for the vector to set the length, and
159+
// we also are about to initialize all of the values **without** reading the memory.
160+
unsafe { output_vector.set_len(N) };
161+
162+
let next_packed_chunk = &not_yet_unpacked_values[..self.packed_stride];
163+
debug_assert_eq!(
164+
next_packed_chunk.len(),
165+
FL_VECTOR_SIZE * self.packed_bit_width / size_of::<BP::Physical>()
166+
);
167+
168+
// SAFETY:
169+
// - The documentation for `packed_bit_width` explains that the size is valid.
170+
// - We know that the size of the `next_packed_chunk` we provide is equal to
171+
// `self.packed_stride`, and we explain why this is correct in its documentation.
172+
// - It is clear that the output buffer has length 1024.
173+
unsafe {
174+
BitPacking::unchecked_unpack(
175+
self.packed_bit_width,
176+
next_packed_chunk,
177+
&mut output_vector.as_mut()[..FL_VECTOR_SIZE],
178+
);
179+
}
180+
}
181+
182+
self.num_chunks_unpacked += 1;
183+
184+
Ok(())
185+
}
186+
}

encodings/fastlanes/src/bitpacking/array/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use vortex_error::{VortexResult, vortex_bail, vortex_ensure};
1414
pub mod bitpack_compress;
1515
pub mod bitpack_decompress;
1616
pub mod unpack_iter;
17+
pub mod bitpack_pipeline;
1718

1819
use crate::bitpack_compress::bitpack_encode;
1920
use crate::unpack_iter::{BitPacked, BitUnpackedChunks};

vortex-buffer/src/buffer_mut.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,8 +245,10 @@ impl<T> BufferMut<T> {
245245
///
246246
/// # Safety
247247
///
248-
/// The caller must ensure that there is sufficient capacity in the buffer and that the values
249-
/// are valid up to `len`.
248+
/// - `new_len` must be less than or equal to [`capacity()`].
249+
/// - The elements at `old_len..new_len` must be initialized.
250+
///
251+
/// [`capacity()`]: Self::capacity
250252
#[inline]
251253
pub unsafe fn set_len(&mut self, len: usize) {
252254
debug_assert!(len <= self.capacity());

vortex-mask/src/mask_mut.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,10 @@ impl MaskMut {
9898
///
9999
/// # Safety
100100
///
101-
/// The caller must ensure that `new_len` is less than the capacity of the mask.
101+
/// - `new_len` must be less than or equal to [`capacity()`].
102+
/// - The elements at `old_len..new_len` must be initialized.
103+
///
104+
/// [`capacity()`]: Self::capacity
102105
pub unsafe fn set_len(&mut self, new_len: usize) {
103106
debug_assert!(new_len < self.capacity());
104107
match &mut self.0 {

vortex-vector/src/primitive/generic_mut.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,10 @@ impl<T> PVectorMut<T> {
7777
///
7878
/// # Safety
7979
///
80-
/// The caller must ensure that the new length does not exceed the capacity of the vector.
80+
/// - `new_len` must be less than or equal to [`capacity()`].
81+
/// - The elements at `old_len..new_len` must be initialized.
82+
///
83+
/// [`capacity()`]: Self::capacity
8184
pub unsafe fn set_len(&mut self, new_len: usize) {
8285
debug_assert!(new_len < self.elements.capacity());
8386
debug_assert!(new_len < self.validity.capacity());

0 commit comments

Comments
 (0)