Skip to content

Commit 8ebf37a

Browse files
connortsui20gatesn
andauthored
Feature: add fused bitpack filter kernel (#5741)
Adds a fused bitpacked and filter kernel by mostly porting over the existing filter compute function on `BitPackedArray`. Very much WIP Not really sure how to actually integrate this kernel into the system either so everything is unused. --------- Signed-off-by: Connor Tsui <[email protected]> Signed-off-by: Nicholas Gates <[email protected]> Co-authored-by: Nicholas Gates <[email protected]>
1 parent bdcba32 commit 8ebf37a

File tree

13 files changed

+338
-29
lines changed

13 files changed

+338
-29
lines changed

encodings/decimal-byte-parts/src/decimal_byte_parts/rules.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
use vortex_array::Array;
45
use vortex_array::ArrayRef;
56
use vortex_array::IntoArray;
67
use vortex_array::arrays::FilterArray;
@@ -38,8 +39,7 @@ impl ArrayParentReduceRule<DecimalBytePartsVTable> for DecimalBytePartsFilterPus
3839
return Ok(None);
3940
}
4041

41-
let new_msp =
42-
FilterArray::new(child.msp.clone(), parent.filter_mask().clone()).into_array();
42+
let new_msp = child.msp.filter(parent.filter_mask().clone())?;
4343
let new_child =
4444
DecimalBytePartsArray::try_new(new_msp, *child.decimal_dtype())?.into_array();
4545
Ok(Some(new_child))

encodings/fastlanes/src/bitpacking/compute/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod filter;
77
mod is_constant;
88
mod take;
99

10+
// TODO(connor): This is duplicated in `encodings/fastlanes/src/bitpacking/kernels/mod.rs`.
1011
fn chunked_indices<F: FnMut(usize, &[usize])>(
1112
mut indices: impl Iterator<Item = usize>,
1213
offset: usize,

encodings/fastlanes/src/bitpacking/compute/take.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::BitPackedArray;
3131
use crate::BitPackedVTable;
3232
use crate::bitpack_decompress;
3333

34+
// TODO(connor): This is duplicated in `encodings/fastlanes/src/bitpacking/kernels/mod.rs`.
3435
/// assuming the buffer is already allocated (which will happen at most once) then unpacking
3536
/// all 1024 elements takes ~8.8x as long as unpacking a single element on an M2 Macbook Air.
3637
/// see https://github.com/vortex-data/vortex/pull/190#issue-2223752833
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::mem::MaybeUninit;
5+
use std::sync::Arc;
6+
7+
use fastlanes::BitPacking;
8+
use vortex_array::ExecutionCtx;
9+
use vortex_array::arrays::FilterArray;
10+
use vortex_array::arrays::FilterVTable;
11+
use vortex_array::kernel::ExecuteParentKernel;
12+
use vortex_array::kernel::ParentKernelSet;
13+
use vortex_array::matchers::Exact;
14+
use vortex_buffer::BufferMut;
15+
use vortex_compute::filter::Filter;
16+
use vortex_dtype::NativePType;
17+
use vortex_dtype::PType;
18+
use vortex_dtype::UnsignedPType;
19+
use vortex_dtype::match_each_integer_ptype;
20+
use vortex_error::VortexResult;
21+
use vortex_mask::Mask;
22+
use vortex_mask::MaskValues;
23+
use vortex_vector::Vector;
24+
use vortex_vector::VectorMutOps;
25+
use vortex_vector::primitive::PVector;
26+
use vortex_vector::primitive::PVectorMut;
27+
use vortex_vector::primitive::PrimitiveVector;
28+
29+
use crate::BitPackedArray;
30+
use crate::BitPackedVTable;
31+
use crate::bitpacking::vtable::kernels::UNPACK_CHUNK_THRESHOLD;
32+
use crate::bitpacking::vtable::kernels::chunked_indices;
33+
34+
pub(crate) const PARENT_KERNELS: ParentKernelSet<BitPackedVTable> =
35+
ParentKernelSet::new(&[ParentKernelSet::lift(&BitPackingFilterKernel)]);
36+
37+
/// The threshold over which it is faster to fully unpack the entire [`BitPackedArray`] and then
38+
/// filter the result than to unpack only specific bitpacked values into the output buffer.
39+
pub const fn unpack_then_filter_threshold<T>() -> f64 {
40+
// TODO(connor): Where did these numbers come from? Add a public link after validating them.
41+
// These numbers probably don't work for in-place filtering either.
42+
match size_of::<T>() {
43+
1 => 0.03,
44+
2 => 0.03,
45+
4 => 0.075,
46+
_ => 0.09,
47+
// >8 bytes may have a higher threshold. These numbers are derived from a GCP c2-standard-4
48+
// with a "Cascade Lake" CPU.
49+
}
50+
}
51+
52+
/// Kernel to execute filtering directly on a bit-packed array.
53+
#[derive(Debug)]
54+
struct BitPackingFilterKernel;
55+
56+
impl ExecuteParentKernel<BitPackedVTable> for BitPackingFilterKernel {
57+
type Parent = Exact<FilterVTable>;
58+
59+
fn parent(&self) -> Self::Parent {
60+
Exact::from(&FilterVTable)
61+
}
62+
63+
fn execute_parent(
64+
&self,
65+
array: &BitPackedArray,
66+
parent: &FilterArray,
67+
_child_idx: usize,
68+
_ctx: &mut ExecutionCtx,
69+
) -> VortexResult<Option<Vector>> {
70+
let values = match parent.filter_mask() {
71+
Mask::AllTrue(_) | Mask::AllFalse(_) => {
72+
// No optimization for full or empty mask
73+
return Ok(None);
74+
}
75+
Mask::Values(values) => values,
76+
};
77+
78+
match_each_integer_ptype!(array.ptype(), |I| {
79+
// If the density is high enough, then we would rather decompress the whole array and then apply
80+
// a filter over decompressing values one by one.
81+
if values.density() > unpack_then_filter_threshold::<I>() {
82+
return Ok(None);
83+
}
84+
});
85+
86+
let primitive_vector: PrimitiveVector = match array.ptype() {
87+
PType::U8 => filter_primitive::<u8>(array, values)?.into(),
88+
PType::U16 => filter_primitive::<u16>(array, values)?.into(),
89+
PType::U32 => filter_primitive::<u32>(array, values)?.into(),
90+
PType::U64 => filter_primitive::<u64>(array, values)?.into(),
91+
92+
// Since the fastlanes crate only supports unsigned integers, and since we know that all
93+
// numbers are going to be non-negative, we can safely "cast" to unsigned and back.
94+
PType::I8 => {
95+
let pvector = filter_primitive::<u8>(array, values)?;
96+
unsafe { pvector.transmute::<i8>() }.into()
97+
}
98+
PType::I16 => {
99+
let pvector = filter_primitive::<u16>(array, values)?;
100+
unsafe { pvector.transmute::<i16>() }.into()
101+
}
102+
PType::I32 => {
103+
let pvector = filter_primitive::<u32>(array, values)?;
104+
unsafe { pvector.transmute::<i32>() }.into()
105+
}
106+
PType::I64 => {
107+
let pvector = filter_primitive::<u64>(array, values)?;
108+
unsafe { pvector.transmute::<i64>() }.into()
109+
}
110+
other => {
111+
unreachable!("Unsupported ptype {other} for bitpacking, we also checked this above")
112+
}
113+
};
114+
115+
Ok(Some(primitive_vector.into()))
116+
}
117+
}
118+
119+
/// Specialized filter kernel for primitive bit-packed arrays.
120+
///
121+
/// Because the FastLanes bit-packing kernels are only implemented for unsigned types, the provided
122+
/// `U` should be promoted to the unsigned variant for any target bit width.
123+
/// For example, if the array is bit-packed `i16`, this function should be called with `U = u16`.
124+
///
125+
/// This function fully decompresses the array for all but the most selective masks because the
126+
/// FastLanes decompression is so fast and the bookkeepping necessary to decompress individual
127+
/// elements is relatively slow.
128+
fn filter_primitive<U: UnsignedPType + BitPacking>(
129+
array: &BitPackedArray,
130+
selection: &Arc<MaskValues>,
131+
) -> VortexResult<PVector<U>> {
132+
let values = filter_with_indices(array, selection.indices());
133+
let validity = array
134+
.validity_mask()
135+
.filter(&Mask::Values(selection.clone()))
136+
.into_mut();
137+
138+
debug_assert_eq!(
139+
values.len(),
140+
validity.len(),
141+
"`filter_with_indices` was somehow incorrect"
142+
);
143+
144+
let mut pvector = unsafe { PVectorMut::new_unchecked(values, validity) };
145+
146+
// TODO(connor): We want a `PatchesArray` or patching compute functions instead of this.
147+
let patches = array
148+
.patches()
149+
.map(|patches| patches.filter(&Mask::Values(selection.clone())))
150+
.transpose()?
151+
.flatten();
152+
if let Some(patches) = patches {
153+
pvector = patches.apply_to_pvector(pvector);
154+
}
155+
156+
Ok(pvector.freeze())
157+
}
158+
159+
fn filter_with_indices<T: NativePType + BitPacking>(
160+
array: &BitPackedArray,
161+
indices: &[usize],
162+
) -> BufferMut<T> {
163+
let offset = array.offset() as usize;
164+
let bit_width = array.bit_width() as usize;
165+
let mut values = BufferMut::with_capacity(indices.len());
166+
167+
// Some re-usable memory to store per-chunk indices.
168+
let mut unpacked = [const { MaybeUninit::<T>::uninit() }; 1024];
169+
let packed_bytes = array.packed_slice::<T>();
170+
171+
// Group the indices by the FastLanes chunk they belong to.
172+
let chunk_size = 128 * bit_width / size_of::<T>();
173+
174+
chunked_indices(indices, offset, |chunk_idx, indices_within_chunk| {
175+
let packed = &packed_bytes[chunk_idx * chunk_size..][..chunk_size];
176+
177+
if indices_within_chunk.len() == 1024 {
178+
// Unpack the entire chunk.
179+
unsafe {
180+
let values_len = values.len();
181+
values.set_len(values_len + 1024);
182+
BitPacking::unchecked_unpack(
183+
bit_width,
184+
packed,
185+
&mut values.as_mut_slice()[values_len..],
186+
);
187+
}
188+
} else if indices_within_chunk.len() > UNPACK_CHUNK_THRESHOLD {
189+
// Unpack into a temporary chunk and then copy the values.
190+
unsafe {
191+
let dst: &mut [MaybeUninit<T>] = &mut unpacked;
192+
let dst: &mut [T] = std::mem::transmute(dst);
193+
BitPacking::unchecked_unpack(bit_width, packed, dst);
194+
}
195+
values.extend_trusted(
196+
indices_within_chunk
197+
.iter()
198+
.map(|&idx| unsafe { unpacked.get_unchecked(idx).assume_init() }),
199+
);
200+
} else {
201+
// Otherwise, unpack each element individually.
202+
values.extend_trusted(indices_within_chunk.iter().map(|&idx| unsafe {
203+
BitPacking::unchecked_unpack_single(bit_width, packed, idx)
204+
}));
205+
}
206+
});
207+
208+
values
209+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
pub(crate) mod filter;
5+
6+
/// Assuming the buffer is already allocated (which will happen at most once), then unpacking all
7+
/// 1024 elements takes ~8.8x as long as unpacking a single element on an M2 Macbook Air.
8+
///
9+
/// See https://github.com/vortex-data/vortex/pull/190#issue-2223752833
10+
const UNPACK_CHUNK_THRESHOLD: usize = 8;
11+
12+
fn chunked_indices<F: FnMut(usize, &[usize])>(indices: &[usize], offset: usize, mut chunk_fn: F) {
13+
if indices.is_empty() {
14+
return;
15+
}
16+
17+
let mut indices_within_chunk: Vec<usize> = Vec::with_capacity(1024);
18+
19+
let first_idx = indices[0];
20+
let mut current_chunk_idx = (first_idx + offset) / 1024;
21+
indices_within_chunk.push((first_idx + offset) % 1024);
22+
23+
for idx in &indices[1..] {
24+
let new_chunk_idx = (idx + offset) / 1024;
25+
26+
if new_chunk_idx != current_chunk_idx {
27+
chunk_fn(current_chunk_idx, &indices_within_chunk);
28+
indices_within_chunk.clear();
29+
}
30+
31+
current_chunk_idx = new_chunk_idx;
32+
indices_within_chunk.push((idx + offset) % 1024);
33+
}
34+
35+
if !indices_within_chunk.is_empty() {
36+
chunk_fn(current_chunk_idx, &indices_within_chunk);
37+
}
38+
}

encodings/fastlanes/src/bitpacking/vtable/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@ use vortex_vector::VectorMutOps;
3030

3131
use crate::BitPackedArray;
3232
use crate::bitpack_decompress::unpack_to_primitive_vector;
33+
use crate::bitpacking::vtable::kernels::filter::PARENT_KERNELS;
3334

3435
mod array;
3536
mod canonical;
3637
mod encode;
38+
mod kernels;
3739
mod operations;
3840
mod validity;
3941
mod visitor;
@@ -246,6 +248,15 @@ impl VTable for BitPackedVTable {
246248
fn execute(array: &Self::Array, _ctx: &mut ExecutionCtx) -> VortexResult<Vector> {
247249
Ok(unpack_to_primitive_vector(array).freeze().into())
248250
}
251+
252+
fn execute_parent(
253+
array: &Self::Array,
254+
parent: &ArrayRef,
255+
child_idx: usize,
256+
ctx: &mut ExecutionCtx,
257+
) -> VortexResult<Option<Vector>> {
258+
PARENT_KERNELS.execute(array, parent, child_idx, ctx)
259+
}
249260
}
250261

251262
#[derive(Debug)]

encodings/fastlanes/src/for/vtable/rules.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
use vortex_array::Array;
45
use vortex_array::ArrayRef;
56
use vortex_array::IntoArray;
67
use vortex_array::arrays::FilterArray;
@@ -34,8 +35,7 @@ impl ArrayParentReduceRule<FoRVTable> for FoRFilterPushDownRule {
3435
) -> VortexResult<Option<ArrayRef>> {
3536
let new_array = unsafe {
3637
FoRArray::new_unchecked(
37-
FilterArray::new(child.encoded().clone(), parent.filter_mask().clone())
38-
.into_array(),
38+
child.encoded.filter(parent.filter_mask().clone())?,
3939
child.reference.clone(),
4040
)
4141
};

vortex-array/src/patches.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use vortex_mask::MaskMut;
3030
use vortex_scalar::PValue;
3131
use vortex_scalar::Scalar;
3232
use vortex_utils::aliases::hash_map::HashMap;
33+
use vortex_vector::primitive::PVectorMut;
3334

3435
use crate::Array;
3536
use crate::ArrayRef;
@@ -822,6 +823,21 @@ impl Patches {
822823
}))
823824
}
824825

826+
/// Applies patches to a [`PVectorMut<T>`], returning the patched vector.
827+
///
828+
/// This function modifies the elements buffer in-place at the positions specified by the patch
829+
/// indices. It also updates the validity mask to reflect the nullability of patch values.
830+
pub fn apply_to_pvector<T: NativePType>(&self, pvector: PVectorMut<T>) -> PVectorMut<T> {
831+
let (mut elements, mut validity) = pvector.into_parts();
832+
833+
// SAFETY: We maintain the invariant that elements and validity have the same length, and all
834+
// patch indices are valid after offset adjustment (guaranteed by `Patches`).
835+
unsafe { self.apply_to_buffer(elements.as_mut_slice(), &mut validity) };
836+
837+
// SAFETY: We have not modified the length of elements or validity.
838+
unsafe { PVectorMut::new_unchecked(elements, validity) }
839+
}
840+
825841
/// Apply patches to a mutable buffer and validity mask.
826842
///
827843
/// This method applies the patch values to the given buffer at the positions specified by the

0 commit comments

Comments
 (0)