Skip to content

Commit 4e78fc3

Browse files
committed
handle validity in bitpack pipeline kernel
Signed-off-by: Connor Tsui <[email protected]>
1 parent 31b5671 commit 4e78fc3

File tree

2 files changed

+165
-23
lines changed

2 files changed

+165
-23
lines changed

encodings/fastlanes/src/bitpacking/array/bitpack_pipeline.rs

Lines changed: 160 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use vortex_array::pipeline::{BindContext, Kernel, KernelCtx, N, PipelineInputs,
88
use vortex_buffer::Buffer;
99
use vortex_dtype::{PTypeDowncastExt, PhysicalPType, match_each_integer_ptype};
1010
use vortex_error::VortexResult;
11+
use vortex_mask::Mask;
1112
use vortex_vector::primitive::PVectorMut;
1213
use vortex_vector::{VectorMut, VectorMutOps};
1314

@@ -54,6 +55,7 @@ impl PipelinedNode for BitPackedArray {
5455
Ok(Box::new(AlignedBitPackedKernel::<T>::new(
5556
packed_bit_width,
5657
packed_buffer,
58+
self.validity.to_mask(self.len()),
5759
)) as Box<dyn Kernel>)
5860
})
5961
}
@@ -81,12 +83,19 @@ pub struct AlignedBitPackedKernel<BP: PhysicalPType<Physical: BitPacking>> {
8183
/// The buffer containing the bitpacked values.
8284
packed_buffer: Buffer<BP::Physical>,
8385

86+
/// The validity mask for the bitpacked array.
87+
validity: Mask,
88+
8489
/// The total number of bitpacked chunks we have unpacked.
8590
num_chunks_unpacked: usize,
8691
}
8792

8893
impl<BP: PhysicalPType<Physical: BitPacking>> AlignedBitPackedKernel<BP> {
89-
pub fn new(packed_bit_width: usize, packed_buffer: Buffer<BP::Physical>) -> Self {
94+
pub fn new(
95+
packed_bit_width: usize,
96+
packed_buffer: Buffer<BP::Physical>,
97+
validity: Mask,
98+
) -> Self {
9099
let packed_stride =
91100
packed_bit_width * <<BP as PhysicalPType>::Physical as FastLanes>::LANES;
92101

@@ -100,6 +109,7 @@ impl<BP: PhysicalPType<Physical: BitPacking>> AlignedBitPackedKernel<BP> {
100109
packed_bit_width,
101110
packed_stride,
102111
packed_buffer,
112+
validity,
103113
num_chunks_unpacked: 0,
104114
}
105115
}
@@ -119,6 +129,9 @@ impl<BP: PhysicalPType<Physical: BitPacking>> Kernel for AlignedBitPackedKernel<
119129
let not_yet_unpacked_values = &self.packed_buffer.as_slice()[packed_offset..];
120130

121131
let true_count = selection.true_count();
132+
let chunk_offset = self.num_chunks_unpacked * N;
133+
let array_len = self.validity.len();
134+
debug_assert!(chunk_offset < array_len);
122135

123136
// If the true count is very small (the selection is sparse), we can unpack individual
124137
// elements directly into the output vector.
@@ -127,20 +140,26 @@ impl<BP: PhysicalPType<Physical: BitPacking>> Kernel for AlignedBitPackedKernel<
127140
debug_assert!(true_count <= output_vector.capacity());
128141

129142
selection.iter_ones(|idx| {
130-
// SAFETY:
131-
// - The documentation for `packed_bit_width` explains that the size is valid.
132-
// - We know that the size of the `next_packed_chunk` we provide is equal to
133-
// `self.packed_stride`, and we explain why this is correct in its documentation.
134-
let unpacked_value = unsafe {
135-
BitPacking::unchecked_unpack_single(
136-
self.packed_bit_width,
137-
not_yet_unpacked_values,
138-
idx,
139-
)
140-
};
141-
142-
// SAFETY: We just reserved enough capacity to push these values.
143-
unsafe { output_vector.push_unchecked(unpacked_value) };
143+
let absolute_idx = chunk_offset + idx;
144+
if self.validity.value(absolute_idx) {
145+
// SAFETY:
146+
// - The documentation for `packed_bit_width` explains that the size is valid.
147+
// - We know that the size of the `next_packed_chunk` we provide is equal to
148+
// `self.packed_stride`, and we explain why this is correct in its
149+
// documentation.
150+
let unpacked_value = unsafe {
151+
BitPacking::unchecked_unpack_single(
152+
self.packed_bit_width,
153+
not_yet_unpacked_values,
154+
idx,
155+
)
156+
};
157+
158+
// SAFETY: We just reserved enough capacity to push these values.
159+
unsafe { output_vector.push_unchecked(unpacked_value) };
160+
} else {
161+
output_vector.append_nulls(1);
162+
}
144163
});
145164
} else {
146165
// Otherwise if the mask is dense, it is faster to fully unpack the entire 1024
@@ -149,16 +168,17 @@ impl<BP: PhysicalPType<Physical: BitPacking>> Kernel for AlignedBitPackedKernel<
149168
output_vector.reserve(N);
150169
debug_assert!(N <= output_vector.capacity());
151170

152-
// SAFETY: We have just reserved enough capacity for the vector to set the length, and
153-
// we also are about to initialize all of the values **without** reading the memory.
154-
unsafe { output_vector.set_len(N) };
155-
156171
let next_packed_chunk = &not_yet_unpacked_values[..self.packed_stride];
157172
debug_assert_eq!(
158173
next_packed_chunk.len(),
159174
FL_VECTOR_SIZE * self.packed_bit_width / BP::Physical::T
160175
);
161176

177+
// SAFETY: We have just reserved enough capacity for the elements buffer to set the
178+
// length, and we are about to initialize all of the values **without** reading the
179+
// memory.
180+
unsafe { output_vector.elements_mut().set_len(N) };
181+
162182
// SAFETY:
163183
// - The documentation for `packed_bit_width` explains that the size is valid.
164184
// - We know that the size of the `next_packed_chunk` we provide is equal to
@@ -168,9 +188,29 @@ impl<BP: PhysicalPType<Physical: BitPacking>> Kernel for AlignedBitPackedKernel<
168188
BitPacking::unchecked_unpack(
169189
self.packed_bit_width,
170190
next_packed_chunk,
171-
&mut output_vector.as_mut()[..FL_VECTOR_SIZE],
191+
output_vector.as_mut(),
172192
);
173193
}
194+
195+
if array_len < chunk_offset + N {
196+
let vector_len = array_len - chunk_offset;
197+
debug_assert!(vector_len < N, "math is broken");
198+
199+
// SAFETY: This must be less than `N` so this is just a truncate.
200+
unsafe { output_vector.elements_mut().set_len(vector_len) };
201+
202+
let chunk_mask = self.validity.slice(chunk_offset..array_len);
203+
204+
// SAFETY: We have just set the elements length to N, and the validity buffer has
205+
// capacity for N elements.
206+
unsafe { output_vector.validity_mut() }.append_mask(&chunk_mask);
207+
} else {
208+
let chunk_mask = self.validity.slice(chunk_offset..chunk_offset + N);
209+
210+
// SAFETY: We have just set the elements length to N, and the validity buffer has
211+
// capacity for N elements.
212+
unsafe { output_vector.validity_mut() }.append_mask(&chunk_mask);
213+
}
174214
}
175215

176216
self.num_chunks_unpacked += 1;
@@ -285,4 +325,104 @@ mod tests {
285325
assert_eq!(elements[3], 512);
286326
assert_eq!(elements[4], 1000);
287327
}
328+
329+
#[test]
330+
fn test_bitpack_pipeline_sparse_with_nulls() {
331+
// Create 1024 elements with some nulls.
332+
let values: Vec<Option<u32>> = (0..1024)
333+
.map(|i| if i % 100 == 0 { None } else { Some(i as u32) })
334+
.collect();
335+
let primitive = PrimitiveArray::from_option_iter(values).to_array();
336+
337+
// Encode with 10-bit width.
338+
let bitpacked = BitPackedArray::encode(&primitive, 10).unwrap();
339+
assert_eq!(bitpacked.bit_width(), 10, "Bit width should be 10");
340+
341+
// Select only 5 elements at specific indices, including a null value at index 100.
342+
let indices = vec![10, 100, 256, 512, 1000];
343+
let mask = Mask::from_indices(1024, indices);
344+
345+
// This should use the sparse path since true_count < 7.
346+
let result = bitpacked.to_array().execute_with_selection(&mask).unwrap();
347+
assert_eq!(result.len(), 5, "Result should have 5 elements");
348+
349+
let pvector_u32 = result.as_primitive().into_u32();
350+
let elements = pvector_u32.elements().as_slice();
351+
352+
// Verify the values and validity.
353+
assert_eq!(elements[0], 10);
354+
assert!(
355+
pvector_u32.validity().value(0),
356+
"Element at index 0 should be valid"
357+
);
358+
359+
// Index 100 should be null.
360+
assert!(
361+
!pvector_u32.validity().value(1),
362+
"Element at index 1 (original index 100) should be null"
363+
);
364+
365+
assert_eq!(elements[2], 256);
366+
assert!(
367+
pvector_u32.validity().value(2),
368+
"Element at index 2 should be valid"
369+
);
370+
371+
assert_eq!(elements[3], 512);
372+
assert!(
373+
pvector_u32.validity().value(3),
374+
"Element at index 3 should be valid"
375+
);
376+
377+
// Index 1000 should be null.
378+
assert!(
379+
!pvector_u32.validity().value(4),
380+
"Element at index 4 (original index 1000) should be null"
381+
);
382+
}
383+
384+
#[test]
385+
fn test_bitpack_pipeline_dense_with_nulls() {
386+
// Create 1024 elements with some nulls.
387+
let values: Vec<Option<u32>> = (0..1024)
388+
.map(|i| if i % 100 == 0 { None } else { Some(i as u32) })
389+
.collect();
390+
let primitive = PrimitiveArray::from_option_iter(values).to_array();
391+
392+
// Encode with 10-bit width.
393+
let bitpacked = BitPackedArray::encode(&primitive, 10).unwrap();
394+
assert_eq!(bitpacked.bit_width(), 10, "Bit width should be 10");
395+
396+
// Select all elements (dense path).
397+
let mask = Mask::new_true(1024);
398+
399+
// This should use the dense path since true_count >= 7.
400+
let result = bitpacked.to_array().execute_with_selection(&mask).unwrap();
401+
assert_eq!(result.len(), 1024, "Result should have 1024 elements");
402+
403+
let pvector_u32 = result.as_primitive().into_u32();
404+
let elements = pvector_u32.elements().as_slice();
405+
406+
// Verify the values and validity.
407+
for i in 0..1024 {
408+
if i % 100 == 0 {
409+
assert!(
410+
!pvector_u32.validity().value(i),
411+
"Element at index {} should be null",
412+
i
413+
);
414+
} else {
415+
assert_eq!(
416+
elements[i], i as u32,
417+
"Element at index {} should be {}",
418+
i, i
419+
);
420+
assert!(
421+
pvector_u32.validity().value(i),
422+
"Element at index {} should be valid",
423+
i
424+
);
425+
}
426+
}
427+
}
288428
}

vortex-vector/src/primitive/generic_mut_impl.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,17 @@ impl<T: NativePType> PVectorMut<T> {
2727
self.validity.value(index).then(|| self.elements[index])
2828
}
2929

30-
/// Appends an element to the back of the vector.
30+
/// Pushes an element to the back of the vector.
3131
///
32-
/// The element is treated as valid.
32+
/// The element is treated as non-null.
3333
pub fn push(&mut self, value: T) {
3434
self.elements.push(value);
3535
self.validity.append_n(true, 1);
3636
}
3737

38-
/// Pushes a value without bounds checking or validity updates.
38+
/// Pushes an element without bounds checking.
39+
///
40+
/// The element is treated as non-null.
3941
///
4042
/// # Safety
4143
///

0 commit comments

Comments
 (0)