Skip to content

Commit 222cd71

Browse files
committed
add fused bitpack filter kernel
Signed-off-by: Connor Tsui <[email protected]>
1 parent 47d6ba7 commit 222cd71

File tree

8 files changed

+308
-23
lines changed

8 files changed

+308
-23
lines changed

encodings/fastlanes/src/bitpacking/compute/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod filter;
77
mod is_constant;
88
mod take;
99

10+
// TODO(connor): This is duplicated in `encodings/fastlanes/src/bitpacking/kernels/mod.rs`.
1011
fn chunked_indices<F: FnMut(usize, &[usize])>(
1112
mut indices: impl Iterator<Item = usize>,
1213
offset: usize,

encodings/fastlanes/src/bitpacking/compute/take.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::BitPackedArray;
3131
use crate::BitPackedVTable;
3232
use crate::bitpack_decompress;
3333

34+
// TODO(connor): This is duplicated in `encodings/fastlanes/src/bitpacking/kernels/mod.rs`.
3435
/// assuming the buffer is already allocated (which will happen at most once) then unpacking
3536
/// all 1024 elements takes ~8.8x as long as unpacking a single element on an M2 Macbook Air.
3637
/// see https://github.com/vortex-data/vortex/pull/190#issue-2223752833
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::mem::MaybeUninit;
5+
6+
use fastlanes::BitPacking;
7+
use vortex_array::ExecutionCtx;
8+
use vortex_array::IntoArray;
9+
use vortex_array::VectorExecutor;
10+
use vortex_array::arrays::FilterArray;
11+
use vortex_array::arrays::FilterVTable;
12+
use vortex_array::kernel::ExecuteParentKernel;
13+
use vortex_array::matchers::Exact;
14+
use vortex_array::patches::patch_pvector;
15+
use vortex_buffer::Buffer;
16+
use vortex_buffer::BufferMut;
17+
use vortex_compute::filter::Filter;
18+
use vortex_dtype::NativePType;
19+
use vortex_dtype::PType;
20+
use vortex_dtype::UnsignedPType;
21+
use vortex_dtype::match_each_integer_ptype;
22+
use vortex_error::VortexExpect;
23+
use vortex_error::VortexResult;
24+
use vortex_mask::Mask;
25+
use vortex_vector::Vector;
26+
use vortex_vector::VectorMut;
27+
use vortex_vector::VectorMutOps;
28+
use vortex_vector::primitive::PVector;
29+
use vortex_vector::primitive::PrimitiveVector;
30+
31+
use crate::BitPackedArray;
32+
use crate::BitPackedVTable;
33+
use crate::bitpacking::kernels::UNPACK_CHUNK_THRESHOLD;
34+
use crate::bitpacking::kernels::chunked_indices;
35+
36+
/// The threshold over which it is faster to fully unpack the entire [`BitPackedArray`] and then
37+
/// filter the result than to unpack only specific bitpacked values into the output buffer.
38+
pub const fn unpack_then_filter_threshold<T>() -> f64 {
39+
// TODO(connor): Where did these numbers come from? Add a public link after validating them.
40+
// These numbers probably don't work for in-place filtering either.
41+
match size_of::<T>() {
42+
1 => 0.03,
43+
2 => 0.03,
44+
4 => 0.075,
45+
_ => 0.09,
46+
// >8 bytes may have a higher threshold. These numbers are derived from a GCP c2-standard-4
47+
// with a "Cascade Lake" CPU.
48+
}
49+
}
50+
51+
#[derive(Debug)]
52+
struct BitPackingFilterKernel;
53+
54+
impl ExecuteParentKernel<BitPackedVTable> for BitPackingFilterKernel {
55+
type Parent = Exact<FilterVTable>;
56+
57+
fn parent(&self) -> Self::Parent {
58+
Exact::from(&FilterVTable)
59+
}
60+
61+
fn execute_parent(
62+
&self,
63+
array: &BitPackedArray,
64+
parent: &FilterArray,
65+
_child_idx: usize,
66+
ctx: &mut ExecutionCtx,
67+
) -> VortexResult<Option<Vector>> {
68+
let selection = parent.filter_mask();
69+
70+
let true_count = selection.true_count();
71+
if true_count == 0 {
72+
// Fast-path for an empty mask.
73+
return Ok(Some(VectorMut::with_capacity(array.dtype(), 0).freeze()));
74+
} else if true_count == selection.len() {
75+
// Fast-path for a full mask.
76+
return Ok(Some(array.to_array().execute(ctx)?));
77+
}
78+
79+
match_each_integer_ptype!(array.ptype(), |I| {
80+
// If the density is high enough, then we would rather decompress the whole array and then apply
81+
// a filter over decompressing values one by one.
82+
if selection.density() > unpack_then_filter_threshold::<I>() {
83+
return Ok(None);
84+
}
85+
});
86+
87+
let primitive_vector: PrimitiveVector = match array.ptype() {
88+
PType::U8 => filter_primitive::<u8>(array, selection)?.into(),
89+
PType::U16 => filter_primitive::<u16>(array, selection)?.into(),
90+
PType::U32 => filter_primitive::<u32>(array, selection)?.into(),
91+
PType::U64 => filter_primitive::<u64>(array, selection)?.into(),
92+
93+
// Since the fastlanes crate only supports unsigned integers, and since we know that all
94+
// numbers are going to be non-negative, we can safely "cast" to unsigned and back.
95+
PType::I8 => {
96+
let pvector = filter_primitive::<u8>(array, selection)?;
97+
pvector.cast_into::<i8>().into()
98+
}
99+
PType::I16 => {
100+
let pvector = filter_primitive::<u16>(array, selection)?;
101+
pvector.cast_into::<i16>().into()
102+
}
103+
PType::I32 => {
104+
let pvector = filter_primitive::<u32>(array, selection)?;
105+
pvector.cast_into::<i32>().into()
106+
}
107+
PType::I64 => {
108+
let pvector = filter_primitive::<u64>(array, selection)?;
109+
pvector.cast_into::<i64>().into()
110+
}
111+
other => {
112+
unreachable!("Unsupported ptype {other} for bitpacking, we also checked this above")
113+
}
114+
};
115+
116+
Ok(Some(primitive_vector.into()))
117+
}
118+
}
119+
120+
/// Specialized filter kernel for primitive bit-packed arrays.
121+
///
122+
/// Because the FastLanes bit-packing kernels are only implemented for unsigned types, the provided
123+
/// `U` should be promoted to the unsigned variant for any target bit width.
124+
/// For example, if the array is bit-packed `i16`, this function should be called with `U = u16`.
125+
///
126+
/// This function fully decompresses the array for all but the most selective masks because the
127+
/// FastLanes decompression is so fast and the bookkeepping necessary to decompress individual
128+
/// elements is relatively slow.
129+
fn filter_primitive<U: UnsignedPType + BitPacking>(
130+
array: &BitPackedArray,
131+
selection: &Mask,
132+
) -> VortexResult<PVector<U>> {
133+
let values = filter_with_indices(
134+
array,
135+
selection
136+
.values()
137+
.vortex_expect("AllTrue and AllFalse handled by filter fn")
138+
.indices(),
139+
);
140+
let validity = array.validity_mask().filter(selection);
141+
142+
debug_assert_eq!(
143+
values.len(),
144+
validity.len(),
145+
"`filter_with_indices` was somehow incorrect"
146+
);
147+
148+
let mut pvector = unsafe { PVector::new_unchecked(values, validity) };
149+
150+
// TODO(connor): We want a `PatchesArray` or patching compute functions instead of this.
151+
let patches = array
152+
.patches()
153+
.map(|patches| patches.filter(selection))
154+
.transpose()?
155+
.flatten();
156+
if let Some(patches) = patches {
157+
pvector = patch_pvector(pvector, &patches);
158+
}
159+
160+
Ok(pvector)
161+
}
162+
163+
fn filter_with_indices<T: NativePType + BitPacking>(
164+
array: &BitPackedArray,
165+
indices: &[usize],
166+
) -> Buffer<T> {
167+
let offset = array.offset() as usize;
168+
let bit_width = array.bit_width() as usize;
169+
let mut values = BufferMut::with_capacity(indices.len());
170+
171+
// Some re-usable memory to store per-chunk indices.
172+
let mut unpacked = [const { MaybeUninit::<T>::uninit() }; 1024];
173+
let packed_bytes = array.packed_slice::<T>();
174+
175+
// Group the indices by the FastLanes chunk they belong to.
176+
let chunk_size = 128 * bit_width / size_of::<T>();
177+
178+
chunked_indices(indices, offset, |chunk_idx, indices_within_chunk| {
179+
let packed = &packed_bytes[chunk_idx * chunk_size..][..chunk_size];
180+
181+
if indices_within_chunk.len() == 1024 {
182+
// Unpack the entire chunk.
183+
unsafe {
184+
let values_len = values.len();
185+
values.set_len(values_len + 1024);
186+
BitPacking::unchecked_unpack(
187+
bit_width,
188+
packed,
189+
&mut values.as_mut_slice()[values_len..],
190+
);
191+
}
192+
} else if indices_within_chunk.len() > UNPACK_CHUNK_THRESHOLD {
193+
// Unpack into a temporary chunk and then copy the values.
194+
unsafe {
195+
let dst: &mut [MaybeUninit<T>] = &mut unpacked;
196+
let dst: &mut [T] = std::mem::transmute(dst);
197+
BitPacking::unchecked_unpack(bit_width, packed, dst);
198+
}
199+
values.extend_trusted(
200+
indices_within_chunk
201+
.iter()
202+
.map(|&idx| unsafe { unpacked.get_unchecked(idx).assume_init() }),
203+
);
204+
} else {
205+
// Otherwise, unpack each element individually.
206+
values.extend_trusted(indices_within_chunk.iter().map(|&idx| unsafe {
207+
BitPacking::unchecked_unpack_single(bit_width, packed, idx)
208+
}));
209+
}
210+
});
211+
212+
values.freeze()
213+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
mod filter;
5+
6+
/// Assuming the buffer is already allocated (which will happen at most once), then unpacking all
7+
/// 1024 elements takes ~8.8x as long as unpacking a single element on an M2 Macbook Air.
8+
///
9+
/// See https://github.com/vortex-data/vortex/pull/190#issue-2223752833
10+
const UNPACK_CHUNK_THRESHOLD: usize = 8;
11+
12+
fn chunked_indices<F: FnMut(usize, &[usize])>(indices: &[usize], offset: usize, mut chunk_fn: F) {
13+
if indices.is_empty() {
14+
return;
15+
}
16+
17+
let mut indices_within_chunk: Vec<usize> = Vec::with_capacity(1024);
18+
19+
let first_idx = indices[0];
20+
let mut current_chunk_idx = (first_idx + offset) / 1024;
21+
indices_within_chunk.push((first_idx + offset) % 1024);
22+
23+
for idx in &indices[1..] {
24+
let new_chunk_idx = (idx + offset) / 1024;
25+
26+
if new_chunk_idx != current_chunk_idx {
27+
chunk_fn(current_chunk_idx, &indices_within_chunk);
28+
indices_within_chunk.clear();
29+
}
30+
31+
current_chunk_idx = new_chunk_idx;
32+
indices_within_chunk.push((idx + offset) % 1024);
33+
}
34+
35+
if !indices_within_chunk.is_empty() {
36+
chunk_fn(current_chunk_idx, &indices_within_chunk);
37+
}
38+
}

encodings/fastlanes/src/bitpacking/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ pub use array::bitpack_decompress;
88
pub use array::unpack_iter;
99

1010
mod compute;
11+
mod kernels;
1112

1213
mod vtable;
1314
pub use vtable::BitPackedVTable;

vortex-array/src/patches.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ use vortex_mask::MaskMut;
3030
use vortex_scalar::PValue;
3131
use vortex_scalar::Scalar;
3232
use vortex_utils::aliases::hash_map::HashMap;
33+
use vortex_vector::VectorOps;
34+
use vortex_vector::primitive::PVector;
3335

3436
use crate::Array;
3537
use crate::ArrayRef;
@@ -883,6 +885,20 @@ impl Patches {
883885
}
884886
}
885887

888+
/// Applies patches to a [`PVector<T>`], returning the patched vector.
889+
///
890+
/// This function modifies the elements buffer in-place at the positions specified by the patch
891+
/// indices. It also updates the validity mask to reflect the nullability of patch values.
892+
pub fn patch_pvector<T: NativePType>(pvector: PVector<T>, patches: &Patches) -> PVector<T> {
893+
let (mut elements, mut validity) = pvector.into_mut().into_parts();
894+
895+
// SAFETY: We maintain the invariant that elements and validity have the same length, and all
896+
// patch indices are valid after offset adjustment (guaranteed by `Patches`).
897+
unsafe { patches.apply_to_buffer(elements.as_mut_slice(), &mut validity) };
898+
899+
PVector::new(elements.freeze(), validity.freeze())
900+
}
901+
886902
/// Helper function to apply patches to a buffer.
887903
///
888904
/// # Safety

vortex-buffer/src/buffer.rs

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -409,27 +409,6 @@ impl<T> Buffer<T> {
409409
}
410410
}
411411

412-
/// Cast a `Buffer<T>` into a `Buffer<U>`.
413-
///
414-
/// # Panics
415-
///
416-
/// Panics if the type `U` does not have the same size and alignment as `T`.
417-
pub fn cast_into<U>(self) -> Buffer<U> {
418-
assert_eq!(size_of::<T>(), size_of::<U>(), "Buffer type size mismatch");
419-
assert_eq!(
420-
align_of::<T>(),
421-
align_of::<U>(),
422-
"Buffer type alignment mismatch"
423-
);
424-
425-
Buffer {
426-
bytes: self.bytes,
427-
length: self.length,
428-
alignment: self.alignment,
429-
_marker: PhantomData,
430-
}
431-
}
432-
433412
/// Try to convert self into `BufferMut<T>` if there is only a single strong reference.
434413
pub fn try_into_mut(self) -> Result<BufferMut<T>, Self> {
435414
self.bytes
@@ -487,6 +466,29 @@ impl<T> Buffer<T> {
487466
}
488467
}
489468

469+
impl<T: Copy> Buffer<T> {
470+
/// Cast a `Buffer<T>` into a `Buffer<U>`.
471+
///
472+
/// # Panics
473+
///
474+
/// Panics if the type `U` does not have the same size and alignment as `T`.
475+
pub fn cast_into<U>(self) -> Buffer<U> {
476+
assert_eq!(size_of::<T>(), size_of::<U>(), "Buffer type size mismatch");
477+
assert_eq!(
478+
align_of::<T>(),
479+
align_of::<U>(),
480+
"Buffer type alignment mismatch"
481+
);
482+
483+
Buffer {
484+
bytes: self.bytes,
485+
length: self.length,
486+
alignment: self.alignment,
487+
_marker: PhantomData,
488+
}
489+
}
490+
}
491+
490492
/// An iterator over Buffer elements.
491493
///
492494
/// This is an analog to the `std::slice::Iter` type.

0 commit comments

Comments
 (0)