Skip to content

Commit 5c2b9b0

Browse files
committed
handle validity in bitpack pipeline kernel
Signed-off-by: Connor Tsui <[email protected]>
1 parent 956f5c2 commit 5c2b9b0

File tree

2 files changed

+162
-22
lines changed

2 files changed

+162
-22
lines changed

encodings/fastlanes/src/bitpacking/array/bitpack_pipeline.rs

Lines changed: 157 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use vortex_array::pipeline::{BindContext, Kernel, KernelCtx, N, PipelineInputs,
88
use vortex_buffer::Buffer;
99
use vortex_dtype::{PTypeDowncastExt, PhysicalPType, match_each_integer_ptype};
1010
use vortex_error::VortexResult;
11+
use vortex_mask::Mask;
1112
use vortex_vector::primitive::PVectorMut;
1213
use vortex_vector::{VectorMut, VectorMutOps};
1314

@@ -54,6 +55,7 @@ impl PipelinedNode for BitPackedArray {
5455
Ok(Box::new(AlignedBitPackedKernel::<T>::new(
5556
packed_bit_width,
5657
packed_buffer,
58+
self.validity.to_mask(self.len()),
5759
)) as Box<dyn Kernel>)
5860
})
5961
}
@@ -81,12 +83,19 @@ pub struct AlignedBitPackedKernel<BP: PhysicalPType<Physical: BitPacking>> {
8183
/// The buffer containing the bitpacked values.
8284
packed_buffer: Buffer<BP::Physical>,
8385

86+
/// The validity mask for the bitpacked array.
87+
validity: Mask,
88+
8489
/// The total number of bitpacked chunks we have unpacked.
8590
num_chunks_unpacked: usize,
8691
}
8792

8893
impl<BP: PhysicalPType<Physical: BitPacking>> AlignedBitPackedKernel<BP> {
89-
pub fn new(packed_bit_width: usize, packed_buffer: Buffer<BP::Physical>) -> Self {
94+
pub fn new(
95+
packed_bit_width: usize,
96+
packed_buffer: Buffer<BP::Physical>,
97+
validity: Mask,
98+
) -> Self {
9099
let packed_stride =
91100
packed_bit_width * <<BP as PhysicalPType>::Physical as FastLanes>::LANES;
92101

@@ -100,6 +109,7 @@ impl<BP: PhysicalPType<Physical: BitPacking>> AlignedBitPackedKernel<BP> {
100109
packed_bit_width,
101110
packed_stride,
102111
packed_buffer,
112+
validity,
103113
num_chunks_unpacked: 0,
104114
}
105115
}
@@ -119,6 +129,7 @@ impl<BP: PhysicalPType<Physical: BitPacking>> Kernel for AlignedBitPackedKernel<
119129
let not_yet_unpacked_values = &self.packed_buffer.as_slice()[packed_offset..];
120130

121131
let true_count = selection.true_count();
132+
let chunk_offset = self.num_chunks_unpacked * N;
122133

123134
// If the true count is very small (the selection is sparse), we can unpack individual
124135
// elements directly into the output vector.
@@ -127,20 +138,26 @@ impl<BP: PhysicalPType<Physical: BitPacking>> Kernel for AlignedBitPackedKernel<
127138
debug_assert!(true_count <= output_vector.capacity());
128139

129140
selection.iter_ones(|idx| {
130-
// SAFETY:
131-
// - The documentation for `packed_bit_width` explains that the size is valid.
132-
// - We know that the size of the `next_packed_chunk` we provide is equal to
133-
// `self.packed_stride`, and we explain why this is correct in its documentation.
134-
let unpacked_value = unsafe {
135-
BitPacking::unchecked_unpack_single(
136-
self.packed_bit_width,
137-
not_yet_unpacked_values,
138-
idx,
139-
)
140-
};
141-
142-
// SAFETY: We just reserved enough capacity to push these values.
143-
unsafe { output_vector.push_unchecked(unpacked_value) };
141+
let absolute_idx = chunk_offset + idx;
142+
if self.validity.value(absolute_idx) {
143+
// SAFETY:
144+
// - The documentation for `packed_bit_width` explains that the size is valid.
145+
// - We know that the size of the `next_packed_chunk` we provide is equal to
146+
// `self.packed_stride`, and we explain why this is correct in its
147+
// documentation.
148+
let unpacked_value = unsafe {
149+
BitPacking::unchecked_unpack_single(
150+
self.packed_bit_width,
151+
not_yet_unpacked_values,
152+
idx,
153+
)
154+
};
155+
156+
// SAFETY: We just reserved enough capacity to push these values.
157+
unsafe { output_vector.push_unchecked(unpacked_value) };
158+
} else {
159+
output_vector.push_opt(None);
160+
}
144161
});
145162
} else {
146163
// Otherwise if the mask is dense, it is faster to fully unpack the entire 1024
@@ -149,16 +166,17 @@ impl<BP: PhysicalPType<Physical: BitPacking>> Kernel for AlignedBitPackedKernel<
149166
output_vector.reserve(N);
150167
debug_assert!(N <= output_vector.capacity());
151168

152-
// SAFETY: We have just reserved enough capacity for the vector to set the length, and
153-
// we also are about to initialize all of the values **without** reading the memory.
154-
unsafe { output_vector.set_len(N) };
155-
156169
let next_packed_chunk = &not_yet_unpacked_values[..self.packed_stride];
157170
debug_assert_eq!(
158171
next_packed_chunk.len(),
159172
FL_VECTOR_SIZE * self.packed_bit_width / BP::Physical::T
160173
);
161174

175+
// SAFETY: We have just reserved enough capacity for the elements buffer to set the
176+
// length, and we are about to initialize all of the values **without** reading the
177+
// memory.
178+
unsafe { output_vector.elements_mut().set_len(N) };
179+
162180
// SAFETY:
163181
// - The documentation for `packed_bit_width` explains that the size is valid.
164182
// - We know that the size of the `next_packed_chunk` we provide is equal to
@@ -171,6 +189,26 @@ impl<BP: PhysicalPType<Physical: BitPacking>> Kernel for AlignedBitPackedKernel<
171189
&mut output_vector.as_mut()[..FL_VECTOR_SIZE],
172190
);
173191
}
192+
193+
if self.validity.len() < chunk_offset + N {
194+
let vector_len = self.validity.len() - chunk_offset;
195+
debug_assert!(vector_len < N, "math is broken");
196+
197+
// SAFETY: This must be less than `N` so this is just a truncate.
198+
unsafe { output_vector.elements_mut().set_len(vector_len) };
199+
200+
let chunk_mask = self.validity.slice(chunk_offset..self.validity.len());
201+
202+
// SAFETY: We have just set the elements length to N, and the validity buffer has
203+
// capacity for N elements.
204+
unsafe { output_vector.validity_mut() }.append_mask(&chunk_mask);
205+
} else {
206+
let chunk_mask = self.validity.slice(chunk_offset..chunk_offset + N);
207+
208+
// SAFETY: We have just set the elements length to N, and the validity buffer has
209+
// capacity for N elements.
210+
unsafe { output_vector.validity_mut() }.append_mask(&chunk_mask);
211+
}
174212
}
175213

176214
self.num_chunks_unpacked += 1;
@@ -285,4 +323,104 @@ mod tests {
285323
assert_eq!(elements[3], 512);
286324
assert_eq!(elements[4], 1000);
287325
}
326+
327+
#[test]
328+
fn test_bitpack_pipeline_sparse_with_nulls() {
329+
// Create 1024 elements with some nulls.
330+
let values: Vec<Option<u32>> = (0..1024)
331+
.map(|i| if i % 100 == 0 { None } else { Some(i as u32) })
332+
.collect();
333+
let primitive = PrimitiveArray::from_option_iter(values).to_array();
334+
335+
// Encode with 10-bit width.
336+
let bitpacked = BitPackedArray::encode(&primitive, 10).unwrap();
337+
assert_eq!(bitpacked.bit_width(), 10, "Bit width should be 10");
338+
339+
// Select only 5 elements at specific indices, including a null value at index 100.
340+
let indices = vec![10, 100, 256, 512, 1000];
341+
let mask = Mask::from_indices(1024, indices);
342+
343+
// This should use the sparse path since true_count < 7.
344+
let result = bitpacked.to_array().execute_with_selection(&mask).unwrap();
345+
assert_eq!(result.len(), 5, "Result should have 5 elements");
346+
347+
let pvector_u32 = result.as_primitive().into_u32();
348+
let elements = pvector_u32.elements().as_slice();
349+
350+
// Verify the values and validity.
351+
assert_eq!(elements[0], 10);
352+
assert!(
353+
pvector_u32.validity().value(0),
354+
"Element at index 0 should be valid"
355+
);
356+
357+
// Index 100 should be null.
358+
assert!(
359+
!pvector_u32.validity().value(1),
360+
"Element at index 1 (original index 100) should be null"
361+
);
362+
363+
assert_eq!(elements[2], 256);
364+
assert!(
365+
pvector_u32.validity().value(2),
366+
"Element at index 2 should be valid"
367+
);
368+
369+
assert_eq!(elements[3], 512);
370+
assert!(
371+
pvector_u32.validity().value(3),
372+
"Element at index 3 should be valid"
373+
);
374+
375+
// Index 1000 should be null.
376+
assert!(
377+
!pvector_u32.validity().value(4),
378+
"Element at index 4 (original index 1000) should be null"
379+
);
380+
}
381+
382+
#[test]
383+
fn test_bitpack_pipeline_dense_with_nulls() {
384+
// Create 1024 elements with some nulls.
385+
let values: Vec<Option<u32>> = (0..1024)
386+
.map(|i| if i % 100 == 0 { None } else { Some(i as u32) })
387+
.collect();
388+
let primitive = PrimitiveArray::from_option_iter(values).to_array();
389+
390+
// Encode with 10-bit width.
391+
let bitpacked = BitPackedArray::encode(&primitive, 10).unwrap();
392+
assert_eq!(bitpacked.bit_width(), 10, "Bit width should be 10");
393+
394+
// Select all elements (dense path).
395+
let mask = Mask::new_true(1024);
396+
397+
// This should use the dense path since true_count >= 7.
398+
let result = bitpacked.to_array().execute_with_selection(&mask).unwrap();
399+
assert_eq!(result.len(), 1024, "Result should have 1024 elements");
400+
401+
let pvector_u32 = result.as_primitive().into_u32();
402+
let elements = pvector_u32.elements().as_slice();
403+
404+
// Verify the values and validity.
405+
for i in 0..1024 {
406+
if i % 100 == 0 {
407+
assert!(
408+
!pvector_u32.validity().value(i),
409+
"Element at index {} should be null",
410+
i
411+
);
412+
} else {
413+
assert_eq!(
414+
elements[i], i as u32,
415+
"Element at index {} should be {}",
416+
i, i
417+
);
418+
assert!(
419+
pvector_u32.validity().value(i),
420+
"Element at index {} should be valid",
421+
i
422+
);
423+
}
424+
}
425+
}
288426
}

vortex-vector/src/primitive/generic_mut_impl.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,17 @@ impl<T: NativePType> PVectorMut<T> {
2727
self.validity.value(index).then(|| self.elements[index])
2828
}
2929

30-
/// Appends an element to the back of the vector.
30+
/// Pushes an element to the back of the vector.
3131
///
32-
/// The element is treated as valid.
32+
/// The element is treated as non-null.
3333
pub fn push(&mut self, value: T) {
3434
self.elements.push(value);
3535
self.validity.append_n(true, 1);
3636
}
3737

38-
/// Pushes a value without bounds checking or validity updates.
38+
/// Pushes an element without bounds checking.
39+
///
40+
/// The element is treated as non-null.
3941
///
4042
/// # Safety
4143
///

0 commit comments

Comments
 (0)