Skip to content

Commit 7873c9d

Browse files
robert3005a10y
andauthored
feat: Faster bitpacked filter & take (#1667)
Instead of using Itertools::chunk_by we do our own iteration that is aware of magic 1024 element treshold --------- Co-authored-by: Andrew Duffy <[email protected]>
1 parent b8d3ff2 commit 7873c9d

File tree

3 files changed

+49
-31
lines changed

3 files changed

+49
-31
lines changed

encodings/fastlanes/src/bitpacking/compute/filter.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
use arrow_buffer::ArrowNativeType;
22
use fastlanes::BitPacking;
3-
use itertools::Itertools;
43
use vortex_array::array::PrimitiveArray;
54
use vortex_array::compute::{filter, FilterFn, FilterIter, FilterMask};
65
use vortex_array::variants::PrimitiveArrayTrait;
76
use vortex_array::{ArrayData, IntoArrayData, IntoArrayVariant};
87
use vortex_dtype::{match_each_unsigned_integer_ptype, NativePType};
98
use vortex_error::VortexResult;
109

10+
use super::chunked_indices;
1111
use crate::bitpacking::compute::take::UNPACK_CHUNK_THRESHOLD;
1212
use crate::{BitPackedArray, BitPackedEncoding};
1313

@@ -66,19 +66,14 @@ fn filter_indices<T: NativePType + BitPacking + ArrowNativeType>(
6666
let mut values = Vec::with_capacity(indices_len);
6767

6868
// Some re-usable memory to store per-chunk indices.
69-
let mut indices_within_chunk: Vec<usize> = Vec::with_capacity(1024);
7069
let mut unpacked = [T::zero(); 1024];
70+
let packed_bytes = array.packed_slice::<T>();
7171

7272
// Group the indices by the FastLanes chunk they belong to.
73-
let chunked = indices.chunk_by(|&idx| (idx + offset) / 1024);
74-
let chunk_len = 128 * bit_width / size_of::<T>();
73+
let chunk_size = 128 * bit_width / size_of::<T>();
7574

76-
chunked.into_iter().for_each(|(chunk_idx, indices)| {
77-
let packed = &array.packed_slice::<T>()[chunk_idx * chunk_len..(chunk_idx + 1) * chunk_len];
78-
79-
// Re-use the indices buffer to store the indices within the current chunk.
80-
indices_within_chunk.clear();
81-
indices_within_chunk.extend(indices.map(|idx| (idx + offset) % 1024));
75+
chunked_indices(indices, offset, |chunk_idx, indices_within_chunk| {
76+
let packed = &packed_bytes[chunk_idx * chunk_size..][..chunk_size];
8277

8378
if indices_within_chunk.len() == 1024 {
8479
// Unpack the entire chunk.

encodings/fastlanes/src/bitpacking/compute/mod.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,33 @@ impl ComputeVTable for BitPackedEncoding {
3030
Some(self)
3131
}
3232
}
33+
34+
fn chunked_indices<F: FnMut(usize, &[usize])>(
35+
mut indices: impl Iterator<Item = usize>,
36+
offset: usize,
37+
mut chunk_fn: F,
38+
) {
39+
let mut indices_within_chunk: Vec<usize> = Vec::with_capacity(1024);
40+
41+
let Some(first_idx) = indices.next() else {
42+
return;
43+
};
44+
45+
let mut current_chunk_idx = (first_idx + offset) / 1024;
46+
indices_within_chunk.push((first_idx + offset) % 1024);
47+
for idx in indices {
48+
let new_chunk_idx = (idx + offset) / 1024;
49+
50+
if new_chunk_idx != current_chunk_idx {
51+
chunk_fn(current_chunk_idx, &indices_within_chunk);
52+
indices_within_chunk.clear();
53+
}
54+
55+
current_chunk_idx = new_chunk_idx;
56+
indices_within_chunk.push((idx + offset) % 1024);
57+
}
58+
59+
if !indices_within_chunk.is_empty() {
60+
chunk_fn(current_chunk_idx, &indices_within_chunk);
61+
}
62+
}

encodings/fastlanes/src/bitpacking/compute/take.rs

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use fastlanes::BitPacking;
2-
use itertools::Itertools;
32
use vortex_array::array::PrimitiveArray;
43
use vortex_array::compute::{take, try_cast, TakeFn};
54
use vortex_array::variants::PrimitiveArrayTrait;
@@ -12,6 +11,7 @@ use vortex_dtype::{
1211
};
1312
use vortex_error::{VortexExpect as _, VortexResult};
1413

14+
use super::chunked_indices;
1515
use crate::{unpack_single_primitive, BitPackedArray, BitPackedEncoding};
1616

1717
// assuming the buffer is already allocated (which will happen at most once) then unpacking
@@ -54,35 +54,30 @@ fn take_primitive<T: NativePType + BitPacking, I: NativePType>(
5454
let packed = array.packed_slice::<T>();
5555

5656
// Group indices by 1024-element chunk, *without* allocating on the heap
57-
let chunked_indices = &indices
58-
.maybe_null_slice::<I>()
59-
.iter()
60-
.map(|i| {
61-
i.to_usize()
62-
.vortex_expect("index must be expressible as usize")
63-
+ offset
64-
})
65-
.chunk_by(|idx| idx / 1024);
57+
let indices_iter = indices.maybe_null_slice::<I>().iter().map(|i| {
58+
i.to_usize()
59+
.vortex_expect("index must be expressible as usize")
60+
});
6661

6762
let mut output = Vec::with_capacity(indices.len());
6863
let mut unpacked = [T::zero(); 1024];
64+
let chunk_len = 128 * bit_width / size_of::<T>();
6965

70-
for (chunk, offsets) in chunked_indices {
71-
let chunk_size = 128 * bit_width / size_of::<T>();
72-
let packed_chunk = &packed[chunk * chunk_size..][..chunk_size];
66+
chunked_indices(indices_iter, offset, |chunk_idx, indices_within_chunk| {
67+
let packed = &packed[chunk_idx * chunk_len..][..chunk_len];
7368

7469
// array_chunks produced a fixed size array, doesn't heap allocate
7570
let mut have_unpacked = false;
76-
let mut offset_chunk_iter = offsets
77-
// relativize indices to the start of the chunk
78-
.map(|i| i % 1024)
71+
let mut offset_chunk_iter = indices_within_chunk
72+
.iter()
73+
.copied()
7974
.array_chunks::<UNPACK_CHUNK_THRESHOLD>();
8075

8176
// this loop only runs if we have at least UNPACK_CHUNK_THRESHOLD offsets
8277
for offset_chunk in &mut offset_chunk_iter {
8378
if !have_unpacked {
8479
unsafe {
85-
BitPacking::unchecked_unpack(bit_width, packed_chunk, &mut unpacked);
80+
BitPacking::unchecked_unpack(bit_width, packed, &mut unpacked);
8681
}
8782
have_unpacked = true;
8883
}
@@ -103,13 +98,11 @@ fn take_primitive<T: NativePType + BitPacking, I: NativePType>(
10398
// we had fewer than UNPACK_CHUNK_THRESHOLD offsets in the first place,
10499
// so we need to unpack each one individually
105100
for index in remainder {
106-
output.push(unsafe {
107-
unpack_single_primitive::<T>(packed_chunk, bit_width, index)
108-
});
101+
output.push(unsafe { unpack_single_primitive::<T>(packed, bit_width, index) });
109102
}
110103
}
111104
}
112-
}
105+
});
113106

114107
if let Some(patches) = array
115108
.patches()

0 commit comments

Comments
 (0)