|
| 1 | +// SPDX-License-Identifier: Apache-2.0 |
| 2 | +// SPDX-FileCopyrightText: Copyright the Vortex contributors |
| 3 | + |
| 4 | +use fastlanes::{BitPacking, FastLanes}; |
| 5 | +use static_assertions::const_assert_eq; |
| 6 | +use vortex_array::pipeline::bit_view::BitView; |
| 7 | +use vortex_array::pipeline::{BindContext, Kernel, KernelCtx, N, PipelineInputs, PipelinedNode}; |
| 8 | +use vortex_buffer::Buffer; |
| 9 | +use vortex_dtype::{PTypeDowncastExt, PhysicalPType, match_each_integer_ptype}; |
| 10 | +use vortex_error::VortexResult; |
| 11 | +use vortex_vector::primitive::PVectorMut; |
| 12 | +use vortex_vector::{VectorMut, VectorMutOps}; |
| 13 | + |
| 14 | +use crate::BitPackedArray; |
| 15 | + |
| 16 | +/// The size of a FastLanes vector of elements. |
| 17 | +const FL_VECTOR_SIZE: usize = 1024; |
| 18 | + |
| 19 | +// Bitpacking uses FastLanes decompression, which expects a multiple of 1024 elements. |
| 20 | +const_assert_eq!(N, FL_VECTOR_SIZE); |
| 21 | + |
| 22 | +// TODO(connor): Run some benchmarks to actually get a good value. |
| 23 | +/// The true count threshold at which it is faster to unpack individual bitpacked values one at a |
| 24 | +/// time instead of unpack entire vectors and then filter later. |
| 25 | +const SCALAR_UNPACK_THRESHOLD: usize = 7; |
| 26 | + |
| 27 | +impl PipelinedNode for BitPackedArray { |
| 28 | + fn inputs(&self) -> PipelineInputs { |
| 29 | + PipelineInputs::Source |
| 30 | + } |
| 31 | + |
| 32 | + fn bind(&self, _ctx: &dyn BindContext) -> VortexResult<Box<dyn Kernel>> { |
| 33 | + debug_assert!(self.bit_width > 0); |
| 34 | + |
| 35 | + if self.patches.is_some() { |
| 36 | + unimplemented!( |
| 37 | + "We do not handle patches for bitpacked right now, as this will become a parent patch array" |
| 38 | + ); |
| 39 | + } |
| 40 | + |
| 41 | + match_each_integer_ptype!(self.ptype(), |T| { |
| 42 | + let packed_bit_width = self.bit_width as usize; |
| 43 | + let packed_buffer = Buffer::<<T as PhysicalPType>::Physical>::from_byte_buffer( |
| 44 | + self.packed.clone().into_byte_buffer(), |
| 45 | + ); |
| 46 | + |
| 47 | + if self.offset != 0 { |
| 48 | + // TODO(ngates): the unaligned kernel needs fixing for the non-masked API |
| 49 | + unimplemented!( |
| 50 | + "Unaligned `BitPackedArray` as a `PipelineSource` is not yet implemented" |
| 51 | + ) |
| 52 | + } |
| 53 | + |
| 54 | + Ok(Box::new(AlignedBitPackedKernel::<T>::new( |
| 55 | + packed_bit_width, |
| 56 | + packed_buffer, |
| 57 | + )) as Box<dyn Kernel>) |
| 58 | + }) |
| 59 | + } |
| 60 | +} |
| 61 | + |
| 62 | +pub struct AlignedBitPackedKernel<BP: PhysicalPType<Physical: BitPacking>> { |
| 63 | + /// The bit width of each bitpacked value. |
| 64 | + /// |
| 65 | + /// This is guaranteed to be less than or equal to the (unpacked) bit-width of `BP`. |
| 66 | + packed_bit_width: usize, |
| 67 | + |
| 68 | + /// The stride of the bitpacked values, which when fully unpacked will occupy exactly 1024 bits. |
| 69 | + /// This is equal to `1024 * bit_width / BP::Physical::T` |
| 70 | + /// |
| 71 | + /// We store this here so that we do not have to keep calculating this in [`step()`]. |
| 72 | + /// |
| 73 | + /// For example, if the `bit_width` is 10 and the physical type is `u16` (which will fill up |
| 74 | + /// `1024 / 16 = 64` lanes), the `packed_stride` will be `10 * 64 = 640`. This ensures we pass |
| 75 | + /// a slice with the correct length to [`BitPacking::unchecked_unpack`]. |
| 76 | + /// |
| 77 | + /// [`step()`]: SourceKernel::step |
| 78 | + /// [`BitPacking::unchecked_unpack()`]: BitPacking::unchecked_unpack |
| 79 | + packed_stride: usize, |
| 80 | + |
| 81 | + /// The buffer containing the bitpacked values. |
| 82 | + packed_buffer: Buffer<BP::Physical>, |
| 83 | + |
| 84 | + /// The total number of bitpacked chunks we have unpacked. |
| 85 | + num_chunks_unpacked: usize, |
| 86 | +} |
| 87 | + |
| 88 | +impl<BP: PhysicalPType<Physical: BitPacking>> AlignedBitPackedKernel<BP> { |
| 89 | + pub fn new(packed_bit_width: usize, packed_buffer: Buffer<BP::Physical>) -> Self { |
| 90 | + let packed_stride = |
| 91 | + packed_bit_width * <<BP as PhysicalPType>::Physical as FastLanes>::LANES; |
| 92 | + |
| 93 | + assert_eq!( |
| 94 | + packed_stride, |
| 95 | + FL_VECTOR_SIZE * packed_bit_width / BP::Physical::T |
| 96 | + ); |
| 97 | + assert!(packed_bit_width <= BP::Physical::T); |
| 98 | + |
| 99 | + Self { |
| 100 | + packed_bit_width, |
| 101 | + packed_stride, |
| 102 | + packed_buffer, |
| 103 | + num_chunks_unpacked: 0, |
| 104 | + } |
| 105 | + } |
| 106 | +} |
| 107 | + |
| 108 | +impl<BP: PhysicalPType<Physical: BitPacking>> Kernel for AlignedBitPackedKernel<BP> { |
| 109 | + fn step( |
| 110 | + &mut self, |
| 111 | + _ctx: &KernelCtx, |
| 112 | + selection: &BitView, |
| 113 | + out: &mut VectorMut, |
| 114 | + ) -> VortexResult<()> { |
| 115 | + let output_vector: &mut PVectorMut<BP::Physical> = out.as_primitive_mut().downcast(); |
| 116 | + debug_assert!(output_vector.is_empty()); |
| 117 | + |
| 118 | + let packed_offset = self.num_chunks_unpacked * self.packed_stride; |
| 119 | + let not_yet_unpacked_values = &self.packed_buffer.as_slice()[packed_offset..]; |
| 120 | + |
| 121 | + let true_count = selection.true_count(); |
| 122 | + |
| 123 | + // If the true count is very small (the selection is sparse), we can unpack individual |
| 124 | + // elements directly into the output vector. |
| 125 | + if true_count < SCALAR_UNPACK_THRESHOLD { |
| 126 | + output_vector.reserve(true_count); |
| 127 | + debug_assert!(true_count <= output_vector.capacity()); |
| 128 | + |
| 129 | + selection.iter_ones(|idx| { |
| 130 | + // SAFETY: |
| 131 | + // - The documentation for `packed_bit_width` explains that the size is valid. |
| 132 | + // - We know that the size of the `next_packed_chunk` we provide is equal to |
| 133 | + // `self.packed_stride`, and we explain why this is correct in its documentation. |
| 134 | + let unpacked_value = unsafe { |
| 135 | + BitPacking::unchecked_unpack_single( |
| 136 | + self.packed_bit_width, |
| 137 | + not_yet_unpacked_values, |
| 138 | + idx, |
| 139 | + ) |
| 140 | + }; |
| 141 | + |
| 142 | + // SAFETY: We just reserved enough capacity to push these values. |
| 143 | + unsafe { output_vector.push_unchecked(unpacked_value) }; |
| 144 | + }); |
| 145 | + } else { |
| 146 | + // Otherwise if the mask is dense, it is faster to fully unpack the entire 1024 |
| 147 | + // element lane with SIMD / FastLanes and let other nodes in the pipeline decide if they |
| 148 | + // want to perform the selection filter themselves. |
| 149 | + output_vector.reserve(N); |
| 150 | + debug_assert!(N <= output_vector.capacity()); |
| 151 | + |
| 152 | + // SAFETY: We have just reserved enough capacity for the vector to set the length, and |
| 153 | + // we also are about to initialize all of the values **without** reading the memory. |
| 154 | + unsafe { output_vector.set_len(N) }; |
| 155 | + |
| 156 | + let next_packed_chunk = ¬_yet_unpacked_values[..self.packed_stride]; |
| 157 | + debug_assert_eq!( |
| 158 | + next_packed_chunk.len(), |
| 159 | + FL_VECTOR_SIZE * self.packed_bit_width / BP::Physical::T |
| 160 | + ); |
| 161 | + |
| 162 | + // SAFETY: |
| 163 | + // - The documentation for `packed_bit_width` explains that the size is valid. |
| 164 | + // - We know that the size of the `next_packed_chunk` we provide is equal to |
| 165 | + // `self.packed_stride`, and we explain why this is correct in its documentation. |
| 166 | + // - It is clear that the output buffer has length 1024. |
| 167 | + unsafe { |
| 168 | + BitPacking::unchecked_unpack( |
| 169 | + self.packed_bit_width, |
| 170 | + next_packed_chunk, |
| 171 | + &mut output_vector.as_mut()[..FL_VECTOR_SIZE], |
| 172 | + ); |
| 173 | + } |
| 174 | + } |
| 175 | + |
| 176 | + self.num_chunks_unpacked += 1; |
| 177 | + |
| 178 | + Ok(()) |
| 179 | + } |
| 180 | +} |
| 181 | + |
| 182 | +#[cfg(test)] |
| 183 | +mod tests { |
| 184 | + use vortex_array::arrays::PrimitiveArray; |
| 185 | + use vortex_dtype::PTypeDowncast; |
| 186 | + use vortex_mask::Mask; |
| 187 | + use vortex_vector::VectorOps; |
| 188 | + |
| 189 | + use crate::BitPackedArray; |
| 190 | + |
| 191 | + #[test] |
| 192 | + fn test_bitpack_pipeline_basic() { |
| 193 | + // Create exactly 1024 elements (0 to 1023). |
| 194 | + let values = (0..1024).map(|i| i as u32); |
| 195 | + let primitive = PrimitiveArray::from_iter(values).to_array(); |
| 196 | + |
| 197 | + // Encode with 10-bit width (max value 1023 fits in 10 bits). |
| 198 | + let bitpacked = BitPackedArray::encode(&primitive, 10).unwrap(); |
| 199 | + assert_eq!(bitpacked.bit_width(), 10, "Bit width should be 10"); |
| 200 | + |
| 201 | + // Select all elements. |
| 202 | + let mask = Mask::new_true(1024); |
| 203 | + |
| 204 | + // This should trigger the pipeline since `BitPackedArray` implements `PipelinedNode`. |
| 205 | + let result = bitpacked.to_array().execute_with_selection(&mask).unwrap(); |
| 206 | + assert_eq!(result.len(), 1024, "Result should have 1024 elements"); |
| 207 | + |
| 208 | + let pvector_u32 = result.as_primitive().into_u32(); |
| 209 | + let elements = pvector_u32.elements().as_slice(); |
| 210 | + |
| 211 | + for i in 0..1024 { |
| 212 | + assert_eq!( |
| 213 | + elements[i], i as u32, |
| 214 | + "Value at index {} should be {}", |
| 215 | + i, i |
| 216 | + ); |
| 217 | + } |
| 218 | + } |
| 219 | + |
| 220 | + #[test] |
| 221 | + fn test_bitpack_pipeline_dense_75_percent() { |
| 222 | + // Create exactly 1024 elements (0 to 1023). |
| 223 | + let values = (0..1024).map(|i| i as u32); |
| 224 | + let primitive = PrimitiveArray::from_iter(values).to_array(); |
| 225 | + |
| 226 | + // Encode with 10-bit width. |
| 227 | + let bitpacked = BitPackedArray::encode(&primitive, 10).unwrap(); |
| 228 | + assert_eq!(bitpacked.bit_width(), 10, "Bit width should be 10"); |
| 229 | + |
| 230 | + // Select 75% of elements (768 out of 1024) - every element where index % 4 != 0. |
| 231 | + let indices: Vec<usize> = (0..1024).filter(|i| i % 4 != 0).collect(); |
| 232 | + assert_eq!(indices.len(), 768, "Should select exactly 768 elements"); |
| 233 | + let mask = Mask::from_indices(1024, indices); |
| 234 | + |
| 235 | + // This should still use the dense path since true_count >= 7. |
| 236 | + let result = bitpacked.to_array().execute_with_selection(&mask).unwrap(); |
| 237 | + assert_eq!( |
| 238 | + result.len(), |
| 239 | + 1024, |
| 240 | + "Result should have 1024 elements (dense path outputs all N elements)" |
| 241 | + ); |
| 242 | + |
| 243 | + let pvector_u32 = result.as_primitive().into_u32(); |
| 244 | + let elements = pvector_u32.elements().as_slice(); |
| 245 | + |
| 246 | + // Check that selected elements have correct values. |
| 247 | + // Elements where index % 4 != 0 should have their original values. |
| 248 | + for i in 0..1024 { |
| 249 | + if i % 4 != 0 { |
| 250 | + assert_eq!( |
| 251 | + elements[i], i as u32, |
| 252 | + "Selected element at {} should be {}", |
| 253 | + i, i |
| 254 | + ); |
| 255 | + } |
| 256 | + // Note: Unselected elements (where i % 4 == 0) may have undefined values. |
| 257 | + } |
| 258 | + } |
| 259 | + |
| 260 | + #[test] |
| 261 | + fn test_bitpack_pipeline_sparse_5_elements() { |
| 262 | + // Create exactly 1024 elements (0 to 1023). |
| 263 | + let values = (0..1024).map(|i| i as u32); |
| 264 | + let primitive = PrimitiveArray::from_iter(values).to_array(); |
| 265 | + |
| 266 | + // Encode with 10-bit width. |
| 267 | + let bitpacked = BitPackedArray::encode(&primitive, 10).unwrap(); |
| 268 | + assert_eq!(bitpacked.bit_width(), 10, "Bit width should be 10"); |
| 269 | + |
| 270 | + // Select only 5 elements at specific indices. |
| 271 | + let indices = vec![10, 100, 256, 512, 1000]; |
| 272 | + let mask = Mask::from_indices(1024, indices); |
| 273 | + |
| 274 | + // This should use the sparse path since true_count < 7. |
| 275 | + let result = bitpacked.to_array().execute_with_selection(&mask).unwrap(); |
| 276 | + assert_eq!(result.len(), 5, "Result should have 5 elements"); |
| 277 | + |
| 278 | + let pvector_u32 = result.as_primitive().into_u32(); |
| 279 | + let elements = pvector_u32.elements().as_slice(); |
| 280 | + |
| 281 | + // Verify the values match the selected indices. |
| 282 | + assert_eq!(elements[0], 10); |
| 283 | + assert_eq!(elements[1], 100); |
| 284 | + assert_eq!(elements[2], 256); |
| 285 | + assert_eq!(elements[3], 512); |
| 286 | + assert_eq!(elements[4], 1000); |
| 287 | + } |
| 288 | +} |
0 commit comments