Skip to content

Commit 50f814d

Browse files
authored
feat: wire in fastlanes rle to btrblocks (#4789)
Signed-off-by: Alexander Droste <alexander.droste@protonmail.com>
1 parent 244f488 commit 50f814d

File tree

18 files changed

+788
-243
lines changed

18 files changed

+788
-243
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ divan = { package = "codspeed-divan-compat", version = "3.0" }
110110
dyn-hash = "0.2.0"
111111
enum-iterator = "2.0.0"
112112
erased-serde = "0.4"
113-
fastlanes = "0.2.2"
113+
fastlanes = "0.4.0"
114114
flatbuffers = "25.2.10"
115115
fsst-rs = "0.5.2"
116116
futures = { version = "0.3.31", default-features = false }
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
���������������������������
1+
���������������������� ���������(����

encodings/fastlanes/src/rle/compress.rs

Lines changed: 106 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,61 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use std::hash::Hash;
5-
64
use arrayref::{array_mut_ref, array_ref};
75
use fastlanes::RLE;
6+
use num_traits::AsPrimitive;
87
use vortex_array::arrays::PrimitiveArray;
9-
use vortex_array::builders::PrimitiveBuilder;
8+
use vortex_array::validity::Validity;
109
use vortex_array::vtable::ValidityHelper;
1110
use vortex_array::{IntoArray, ToCanonical};
1211
use vortex_buffer::BufferMut;
13-
use vortex_dtype::{NativePType, match_each_unsigned_integer_ptype};
14-
use vortex_error::VortexResult;
12+
use vortex_dtype::{NativePType, match_each_native_ptype, match_each_unsigned_integer_ptype};
13+
use vortex_error::{VortexResult, vortex_panic};
1514

1615
use crate::{FL_CHUNK_SIZE, RLEArray};
1716

1817
impl RLEArray {
1918
/// Encodes a primitive array of unsigned integers using FastLanes RLE.
2019
pub fn encode(array: &PrimitiveArray) -> VortexResult<Self> {
21-
match_each_unsigned_integer_ptype!(array.ptype(), |T| { rle_encode_typed::<T>(array) })
20+
match_each_native_ptype!(array.ptype(), |T| { rle_encode_typed::<T>(array) })
2221
}
2322
}
2423

2524
/// Decompresses an RLE array back into a primitive array.
25+
#[allow(clippy::cognitive_complexity)]
2626
pub fn rle_decompress(array: &RLEArray) -> PrimitiveArray {
27-
match_each_unsigned_integer_ptype!(array.ptype(), |T| { rle_decode_typed::<T>(array) })
27+
match_each_native_ptype!(array.values().dtype().as_ptype(), |V| {
28+
match_each_unsigned_integer_ptype!(array.values_idx_offsets().dtype().as_ptype(), |O| {
29+
// RLE indices are always u16 (or u8 if downcasted).
30+
match array.indices().dtype().as_ptype() {
31+
PType::U8 => rle_decode_typed::<V, u8, O>(array),
32+
PType::U16 => rle_decode_typed::<V, u16, O>(array),
33+
_ => vortex_panic!(
34+
"Unsupported index type for RLE decoding: {}",
35+
array.indices().dtype().as_ptype()
36+
),
37+
}
38+
})
39+
})
2840
}
2941

3042
/// Encodes a primitive array of unsigned integers using FastLanes RLE.
3143
///
3244
/// In case the input array length is % 1024 != 0, the last chunk is padded.
3345
fn rle_encode_typed<T>(array: &PrimitiveArray) -> VortexResult<RLEArray>
3446
where
35-
T: NativePType + RLE + Clone + Hash + Eq,
47+
T: NativePType + RLE,
3648
{
3749
let values = array.as_slice::<T>();
3850
let len = values.len();
51+
let padded_len = len.next_multiple_of(FL_CHUNK_SIZE);
3952

4053
// Allocate capacity up to the next multiple of chunk size.
41-
let mut values_buf = BufferMut::<T>::with_capacity(len.next_multiple_of(FL_CHUNK_SIZE));
42-
let mut indices_buf = BufferMut::<u16>::with_capacity(len.next_multiple_of(FL_CHUNK_SIZE));
54+
let mut values_buf = BufferMut::<T>::with_capacity(padded_len);
55+
let mut indices_buf = BufferMut::<u16>::with_capacity(padded_len);
4356

4457
// Pre-allocate for one offset per chunk.
45-
let mut value_chunk_offsets = BufferMut::<u64>::with_capacity(len.div_ceil(FL_CHUNK_SIZE));
58+
let mut values_idx_offsets = BufferMut::<u64>::with_capacity(len.div_ceil(FL_CHUNK_SIZE));
4659

4760
let values_uninit = values_buf.spare_capacity_mut();
4861
let indices_uninit = indices_buf.spare_capacity_mut();
@@ -61,7 +74,7 @@ where
6174

6275
// Capture chunk start indices. This is necessary as indices
6376
// returned from `T::encode` are relative to the chunk.
64-
value_chunk_offsets.push(value_count_acc as u64);
77+
values_idx_offsets.push(value_count_acc as u64);
6578

6679
let value_count = T::encode(
6780
input,
@@ -90,78 +103,110 @@ where
90103

91104
unsafe {
92105
values_buf.set_len(value_count_acc);
93-
indices_buf.set_len(array.len().next_multiple_of(FL_CHUNK_SIZE));
106+
indices_buf.set_len(padded_len);
94107
}
95108

96109
RLEArray::try_new(
97110
values_buf.into_array(),
98-
indices_buf.into_array(),
99-
value_chunk_offsets.into_array(),
100-
array.validity().clone(),
111+
PrimitiveArray::new(indices_buf.freeze(), padded_validity(array)).into_array(),
112+
values_idx_offsets.into_array(),
101113
array.len(),
102114
)
103115
}
104116

117+
/// Returns validity padded to the next 1024 chunk for a given array.
118+
fn padded_validity(array: &PrimitiveArray) -> Validity {
119+
match array.validity() {
120+
Validity::NonNullable => Validity::NonNullable,
121+
Validity::AllValid => Validity::AllValid,
122+
Validity::AllInvalid => Validity::AllInvalid,
123+
Validity::Array(validity_array) => {
124+
let len = array.len();
125+
let padded_len = len.next_multiple_of(FL_CHUNK_SIZE);
126+
127+
if len == padded_len {
128+
return Validity::Array(validity_array.clone());
129+
}
130+
131+
let mut builder = arrow_buffer::BooleanBufferBuilder::new(padded_len);
132+
133+
let bool_array = validity_array.to_bool();
134+
let bool_buffer = bool_array.boolean_buffer();
135+
builder.append_buffer(&bool_buffer.slice(0, len));
136+
builder.append_n(padded_len - len, false);
137+
138+
Validity::from(builder.finish())
139+
}
140+
}
141+
}
142+
105143
/// Decompresses an `RLEArray` into to a primitive array of unsigned integers.
106-
fn rle_decode_typed<T>(array: &RLEArray) -> PrimitiveArray
144+
#[allow(clippy::cognitive_complexity)]
145+
fn rle_decode_typed<V, I, O>(array: &RLEArray) -> PrimitiveArray
107146
where
108-
T: NativePType + RLE + Clone + Copy,
147+
V: NativePType + RLE + Clone + Copy,
148+
I: NativePType + Into<usize>,
149+
O: NativePType + AsPrimitive<u64>,
109150
{
110151
let values = array.values().to_primitive();
111-
let values = values.as_slice::<T>();
152+
let values = values.as_slice::<V>();
112153

113154
let indices = array.indices().to_primitive();
114-
let indices = indices.as_slice::<u16>();
155+
let indices = indices.as_slice::<I>();
156+
assert_eq!(indices.len() % FL_CHUNK_SIZE, 0);
115157

116158
let chunk_start_idx = array.offset / FL_CHUNK_SIZE;
117159
let chunk_end_idx = (array.offset() + array.len()).div_ceil(FL_CHUNK_SIZE);
118160
let num_chunks = chunk_end_idx - chunk_start_idx;
119161

120-
let mut builder = PrimitiveBuilder::<T>::with_capacity(
121-
array.validity().nullability(),
122-
num_chunks * FL_CHUNK_SIZE,
123-
);
162+
let mut buffer = BufferMut::<V>::with_capacity(num_chunks * FL_CHUNK_SIZE);
163+
let buffer_uninit = buffer.spare_capacity_mut();
124164

125-
let mut range = builder.uninit_range(num_chunks * FL_CHUNK_SIZE);
165+
let values_idx_offsets = array.values_idx_offsets().to_primitive();
166+
let values_idx_offsets = values_idx_offsets.as_slice::<O>();
126167

127-
for (iter_idx, chunk_idx) in (chunk_start_idx..chunk_end_idx).enumerate() {
128-
let chunk_values = &values[array.value_chunk_offset(chunk_idx)..];
168+
for chunk_idx in 0..num_chunks {
169+
// Offsets in `values_idx_offsets` are absolute and need to be shifted
170+
// by the offset of the first chunk, respective the current slice, in
171+
// order to make them relative.
172+
let value_idx_offset =
173+
(values_idx_offsets[chunk_idx].as_() - values_idx_offsets[0].as_()) as usize;
174+
175+
let chunk_values = &values[value_idx_offset..];
129176
let chunk_indices = &indices[chunk_idx * FL_CHUNK_SIZE..];
130177

131178
// SAFETY: `MaybeUninit<T>` and `T` have the same layout.
132-
let builder_values: &mut [T] = unsafe {
133-
std::mem::transmute(range.slice_uninit_mut(iter_idx * FL_CHUNK_SIZE, FL_CHUNK_SIZE))
179+
let buffer_values: &mut [V] = unsafe {
180+
std::mem::transmute(&mut buffer_uninit[chunk_idx * FL_CHUNK_SIZE..][..FL_CHUNK_SIZE])
134181
};
135182

136-
T::decode(
183+
V::decode(
137184
chunk_values,
138185
array_ref![chunk_indices, 0, FL_CHUNK_SIZE],
139-
array_mut_ref![builder_values, 0, FL_CHUNK_SIZE],
186+
array_mut_ref![buffer_values, 0, FL_CHUNK_SIZE],
140187
);
141188
}
142189

143190
unsafe {
144-
range.finish();
191+
buffer.set_len(num_chunks * FL_CHUNK_SIZE);
145192
}
146193

147-
let offset_within_chunk = array.offset_in_chunk(array.offset);
148-
let primitive_array = builder.finish_into_primitive();
194+
let offset_within_chunk = array.offset();
149195

150196
PrimitiveArray::new(
151-
primitive_array
152-
.buffer::<T>()
197+
buffer
198+
.freeze()
153199
.slice(offset_within_chunk..(offset_within_chunk + array.len())),
154-
// Validity needs to be set on the sliced array. After decoding but
155-
// before slicing the length of the validity array can be smaller than
156-
// the primitive array, as RLE decodes in 1024 chunks.
157-
array.validity().clone(),
200+
Validity::copy_from_array(array.as_ref()),
158201
)
159202
}
160203

161204
#[cfg(test)]
162205
mod test {
206+
use rstest::rstest;
163207
use vortex_array::{IntoArray, ToCanonical};
164208
use vortex_buffer::Buffer;
209+
use vortex_dtype::half::f16;
165210

166211
use super::*;
167212

@@ -244,7 +289,7 @@ mod test {
244289
assert_eq!(encoded.len(), 1500);
245290
assert_eq!(decoded.as_slice::<u32>(), expected.as_slice());
246291
// 2 chunks: 1024 + 476 elements
247-
assert_eq!(encoded.value_chunk_offsets().len(), 2);
292+
assert_eq!(encoded.values_idx_offsets().len(), 2);
248293
}
249294

250295
#[test]
@@ -259,6 +304,25 @@ mod test {
259304

260305
assert_eq!(encoded.len(), 2048);
261306
assert_eq!(decoded.as_slice::<u32>(), expected.as_slice());
262-
assert_eq!(encoded.value_chunk_offsets().len(), 2);
307+
assert_eq!(encoded.values_idx_offsets().len(), 2);
308+
}
309+
310+
#[rstest]
311+
#[case::u8((0u8..100).collect::<Buffer<u8>>())]
312+
#[case::u16((0u16..2000).collect::<Buffer<u16>>())]
313+
#[case::u32((0u32..2000).collect::<Buffer<u32>>())]
314+
#[case::u64((0u64..2000).collect::<Buffer<u64>>())]
315+
#[case::i8((-100i8..100).collect::<Buffer<i8>>())]
316+
#[case::i16((-2000i16..2000).collect::<Buffer<i16>>())]
317+
#[case::i32((-2000i32..2000).collect::<Buffer<i32>>())]
318+
#[case::i64((-2000i64..2000).collect::<Buffer<i64>>())]
319+
#[case::f16((-2000..2000).map(|i| f16::from_f32(i as f32)).collect::<Buffer<f16>>())]
320+
#[case::f32((-2000..2000).map(|i| i as f32).collect::<Buffer<f32>>())]
321+
#[case::f64((-2000..2000).map(|i| i as f64).collect::<Buffer<f64>>())]
322+
fn test_roundtrip_primitive_types<T: NativePType>(#[case] values: Buffer<T>) {
323+
let primitive = values.clone().into_array().to_primitive();
324+
let result = RLEArray::encode(&primitive).unwrap();
325+
let decoded = result.to_primitive();
326+
assert_eq!(decoded.as_slice::<T>(), values.as_slice());
263327
}
264328
}

0 commit comments

Comments
 (0)