Skip to content

Commit 7c0977a

Browse files
authored
perf: fuse alp decode with applying patches (#5065)
Signed-off-by: Alexander Droste <[email protected]>
1 parent 72bd594 commit 7c0977a

File tree

7 files changed

+343
-21
lines changed

7 files changed

+343
-21
lines changed

encodings/alp/benches/alp_compress.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66
use divan::Bencher;
77
use rand::rngs::StdRng;
88
use rand::{Rng, SeedableRng as _};
9-
use vortex_alp::{ALPFloat, ALPRDFloat, RDEncoder, alp_encode};
9+
use vortex_alp::{ALPFloat, ALPRDFloat, RDEncoder, alp_encode, decompress};
1010
use vortex_array::arrays::PrimitiveArray;
1111
use vortex_array::compute::warm_up_vtables;
1212
use vortex_array::validity::Validity;
13-
use vortex_buffer::buffer;
13+
use vortex_buffer::{Buffer, buffer};
1414
use vortex_dtype::NativePType;
1515

1616
fn main() {
@@ -84,10 +84,15 @@ fn decompress_alp<T: ALPFloat + NativePType>(bencher: Bencher, args: (usize, f64
8484
Validity::NonNullable
8585
};
8686
let values = values.freeze();
87-
let array = alp_encode(&PrimitiveArray::new(values, validity), None).unwrap();
8887
bencher
89-
.with_inputs(|| array.clone())
90-
.bench_values(|array| array.to_canonical());
88+
.with_inputs(|| {
89+
alp_encode(
90+
&PrimitiveArray::new(Buffer::copy_from(&values), validity.clone()),
91+
None,
92+
)
93+
.unwrap()
94+
})
95+
.bench_values(decompress);
9196
}
9297

9398
#[divan::bench(types = [f32, f64], args = [10_000, 100_000])]

encodings/alp/src/alp/array.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,6 @@ impl ArrayVTable<ALPVTable> for ALPVTable {
282282

283283
impl CanonicalVTable<ALPVTable> for ALPVTable {
284284
fn canonicalize(array: &ALPArray) -> Canonical {
285-
Canonical::Primitive(decompress(array))
285+
Canonical::Primitive(decompress(array.clone()))
286286
}
287287
}

encodings/alp/src/alp/compress.rs

Lines changed: 276 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
use std::mem::transmute;
5+
46
use itertools::Itertools;
5-
use vortex_array::arrays::PrimitiveArray;
7+
use vortex_array::arrays::{PrimitiveArray, patch_chunk};
68
use vortex_array::patches::Patches;
79
use vortex_array::validity::Validity;
810
use vortex_array::vtable::ValidityHelper;
911
use vortex_array::{ArrayRef, IntoArray, ToCanonical};
1012
use vortex_buffer::{Buffer, BufferMut};
11-
use vortex_dtype::PType;
13+
use vortex_dtype::{PType, match_each_unsigned_integer_ptype};
1214
use vortex_error::{VortexResult, vortex_bail};
1315
use vortex_mask::Mask;
1416

@@ -118,20 +120,113 @@ where
118120
Ok((exponents, encoded_array, patches))
119121
}
120122

121-
pub fn decompress(array: &ALPArray) -> PrimitiveArray {
123+
/// Decompresses an ALP-encoded array.
124+
///
125+
/// # Arguments
126+
///
127+
/// * `array` - The ALP-encoded array to decompress
128+
///
129+
/// # Returns
130+
///
131+
/// A `PrimitiveArray` containing the decompressed floating-point values with all patches applied.
132+
pub fn decompress(array: ALPArray) -> PrimitiveArray {
133+
let patches = array.patches().cloned();
134+
if let Some(patches) = patches
135+
&& let Some(chunk_offsets) = patches.chunk_offsets()
136+
{
137+
return decompress_chunked(array, &patches, &chunk_offsets.as_ref().to_primitive());
138+
}
139+
140+
decompress_unchunked(array)
141+
}
142+
143+
/// Decompresses an ALP-encoded array in 1024-element chunks.
144+
///
145+
/// Decoding and applying patches is done in chunks of 1024 elements for better L1 cache locality.
146+
///
147+
/// # Arguments
148+
///
149+
/// * `array` - The ALP-encoded array to decompress
150+
/// * `patches` - The patches containing exceptional values and their positions
151+
/// * `patches_chunk_offsets` - Offsets into the patches array for each chunk
152+
///
153+
/// # Returns
154+
///
155+
/// A `PrimitiveArray` containing the decompressed values with all patches applied.
156+
#[allow(clippy::cognitive_complexity)]
157+
pub fn decompress_chunked(
158+
array: ALPArray,
159+
patches: &Patches,
160+
patches_chunk_offsets: &PrimitiveArray,
161+
) -> PrimitiveArray {
162+
let encoded = array.encoded().to_primitive();
163+
let validity = encoded.validity().clone();
164+
165+
let patches_indices = patches.indices().as_ref().to_primitive();
166+
let patches_values = patches.values().as_ref().to_primitive();
167+
let ptype = array.dtype().as_ptype();
168+
let array_len = array.len();
169+
let exponents = array.exponents();
170+
let patches_offset = patches.offset();
171+
172+
// We need to drop ALPArray here in case converting encoded buffer into primitive didn't create a copy. In that case
173+
// both alp_encoded and array will hold a reference to the buffer we want to mutate.
174+
drop(array);
175+
176+
match_each_alp_float_ptype!(ptype, |T| {
177+
let patches_values = patches_values.as_slice::<T>();
178+
let mut alp_buffer = encoded.into_buffer_mut();
179+
match_each_unsigned_integer_ptype!(patches_chunk_offsets.ptype(), |C| {
180+
let patches_chunk_offsets = patches_chunk_offsets.as_slice::<C>();
181+
match_each_unsigned_integer_ptype!(patches_indices.ptype(), |I| {
182+
let patches_indices = patches_indices.as_slice::<I>();
183+
184+
for (chunk_idx, chunk_start) in (0..array_len).step_by(1024).enumerate() {
185+
let chunk_end = (chunk_start + 1024).min(array_len);
186+
let chunk_slice = &mut alp_buffer.as_mut_slice()[chunk_start..chunk_end];
187+
<T>::decode_slice_inplace(chunk_slice, exponents);
188+
189+
let decoded_chunk: &mut [T] = unsafe { transmute(chunk_slice) };
190+
patch_chunk(
191+
decoded_chunk,
192+
patches_indices,
193+
patches_values,
194+
patches_offset,
195+
patches_chunk_offsets,
196+
chunk_idx,
197+
);
198+
}
199+
200+
let decoded_buffer: BufferMut<T> = unsafe { transmute(alp_buffer) };
201+
PrimitiveArray::new::<T>(decoded_buffer.freeze(), validity)
202+
})
203+
})
204+
})
205+
}
206+
207+
/// Decompresses an ALP-encoded array without chunk offsets.
208+
///
209+
/// This function decodes the complete array at once and then applies any patches after.
210+
fn decompress_unchunked(array: ALPArray) -> PrimitiveArray {
211+
let patches = array.patches().cloned();
122212
let encoded = array.encoded().to_primitive();
123213
let validity = encoded.validity().clone();
214+
let exponents = array.exponents();
124215
let ptype = array.dtype().as_ptype();
125216

217+
// We need to drop ALPArray here in case converting encoded buffer into primitive didn't create a copy. In that case
218+
// both alp_encoded and array will hold a reference to the buffer we want to mutate.
219+
drop(array);
220+
126221
let decoded = match_each_alp_float_ptype!(ptype, |T| {
127222
PrimitiveArray::new::<T>(
128-
<T>::decode_buffer(encoded.into_buffer_mut(), array.exponents()),
223+
<T>::decode_buffer(encoded.into_buffer_mut(), exponents),
129224
validity,
130225
)
131226
});
132227

133-
if let Some(patches) = array.patches() {
134-
decoded.patch(patches)
228+
if let Some(patches) = patches {
229+
decoded.patch(&patches)
135230
} else {
136231
decoded
137232
}
@@ -158,7 +253,7 @@ mod tests {
158253
assert_arrays_eq!(encoded.encoded(), expected_encoded);
159254
assert_eq!(encoded.exponents(), Exponents { e: 9, f: 6 });
160255

161-
let decoded = decompress(&encoded);
256+
let decoded = decompress(encoded);
162257
assert_arrays_eq!(decoded, array);
163258
}
164259

@@ -171,7 +266,7 @@ mod tests {
171266
assert_arrays_eq!(encoded.encoded(), expected_encoded);
172267
assert_eq!(encoded.exponents(), Exponents { e: 9, f: 6 });
173268

174-
let decoded = decompress(&encoded);
269+
let decoded = decompress(encoded);
175270
let expected = PrimitiveArray::from_option_iter(vec![None, Some(1.234f32), None]);
176271
assert_arrays_eq!(decoded, expected);
177272
}
@@ -187,7 +282,7 @@ mod tests {
187282
assert_arrays_eq!(encoded.encoded(), expected_encoded);
188283
assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 });
189284

190-
let decoded = decompress(&encoded);
285+
let decoded = decompress(encoded);
191286
let expected_decoded = PrimitiveArray::new(values, Validity::NonNullable);
192287
assert_arrays_eq!(decoded, expected_decoded);
193288
}
@@ -204,7 +299,7 @@ mod tests {
204299
assert_arrays_eq!(encoded.encoded(), expected_encoded);
205300
assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 });
206301

207-
let decoded = decompress(&encoded);
302+
let decoded = decompress(encoded);
208303
assert_arrays_eq!(decoded, array);
209304
}
210305

@@ -223,9 +318,9 @@ mod tests {
223318

224319
assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 });
225320

226-
assert_arrays_eq!(&encoded, array);
321+
assert_arrays_eq!(encoded, array);
227322

228-
let _decoded = decompress(&encoded);
323+
let _decoded = decompress(encoded);
229324
}
230325

231326
#[test]
@@ -362,4 +457,173 @@ mod tests {
362457
let expected_values = PrimitiveArray::from_iter(vec![PI, E]);
363458
assert_arrays_eq!(patch_values, expected_values);
364459
}
460+
461+
#[test]
462+
fn test_slice_half_chunk_f32_roundtrip() {
463+
// Create 1024 elements, encode, slice to first 512, then decode
464+
let values = vec![1.234f32; 1024];
465+
let original = PrimitiveArray::new(Buffer::from(values), Validity::NonNullable);
466+
let encoded = alp_encode(&original, None).unwrap();
467+
468+
let sliced_alp = encoded.slice(512..1024);
469+
let decoded = sliced_alp.to_primitive();
470+
471+
let expected_slice = original.slice(512..1024).to_primitive();
472+
assert_eq!(expected_slice.as_slice::<f32>(), decoded.as_slice::<f32>());
473+
}
474+
475+
#[test]
476+
fn test_slice_half_chunk_f64_roundtrip() {
477+
let values = vec![5.678f64; 1024];
478+
let original = PrimitiveArray::new(Buffer::from(values), Validity::NonNullable);
479+
let encoded = alp_encode(&original, None).unwrap();
480+
481+
let sliced_alp = encoded.slice(512..1024);
482+
let decoded = sliced_alp.to_primitive();
483+
484+
let expected_slice = original.slice(512..1024).to_primitive();
485+
assert_eq!(expected_slice.as_slice::<f64>(), decoded.as_slice::<f64>());
486+
}
487+
488+
#[test]
489+
fn test_slice_half_chunk_with_patches_roundtrip() {
490+
let mut values = vec![1.0f64; 1024];
491+
values[100] = PI;
492+
values[200] = E;
493+
values[600] = 42.42;
494+
495+
let original = PrimitiveArray::new(Buffer::from(values), Validity::NonNullable);
496+
let encoded = alp_encode(&original, None).unwrap();
497+
498+
let sliced_alp = encoded.slice(512..1024);
499+
let decoded = sliced_alp.to_primitive();
500+
501+
let expected_slice = original.slice(512..1024).to_primitive();
502+
assert_eq!(expected_slice.as_slice::<f64>(), decoded.as_slice::<f64>());
503+
assert!(encoded.patches().is_some());
504+
}
505+
506+
#[test]
507+
fn test_slice_half_chunk_nullable_roundtrip() {
508+
let values = (0..1024)
509+
.map(|i| if i % 3 == 0 { None } else { Some(2.5f32) })
510+
.collect::<Vec<_>>();
511+
512+
let original = PrimitiveArray::from_option_iter(values);
513+
let encoded = alp_encode(&original, None).unwrap();
514+
515+
let sliced_alp = encoded.slice(512..1024);
516+
let decoded = sliced_alp.to_primitive();
517+
518+
let expected_slice = original.slice(512..1024);
519+
assert_arrays_eq!(decoded, expected_slice);
520+
}
521+
522+
#[test]
523+
fn test_large_f32_array_uniform_values() {
524+
let size = 10_000;
525+
let array = PrimitiveArray::new(buffer![42.125f32; size], Validity::NonNullable);
526+
let encoded = alp_encode(&array, None).unwrap();
527+
528+
assert!(encoded.patches().is_none());
529+
let decoded = decompress(encoded);
530+
assert_eq!(array.as_slice::<f32>(), decoded.as_slice::<f32>());
531+
}
532+
533+
#[test]
534+
fn test_large_f64_array_uniform_values() {
535+
let size = 50_000;
536+
let array = PrimitiveArray::new(buffer![123.456789f64; size], Validity::NonNullable);
537+
let encoded = alp_encode(&array, None).unwrap();
538+
539+
assert!(encoded.patches().is_none());
540+
let decoded = decompress(encoded);
541+
assert_eq!(array.as_slice::<f64>(), decoded.as_slice::<f64>());
542+
}
543+
544+
#[test]
545+
fn test_large_f32_array_with_patches() {
546+
let size = 5_000;
547+
let mut values = vec![1.5f32; size];
548+
values[100] = std::f32::consts::PI;
549+
values[1500] = std::f32::consts::E;
550+
values[3000] = f32::NEG_INFINITY;
551+
values[4500] = f32::INFINITY;
552+
553+
let array = PrimitiveArray::new(Buffer::from(values.clone()), Validity::NonNullable);
554+
let encoded = alp_encode(&array, None).unwrap();
555+
556+
assert!(encoded.patches().is_some());
557+
let decoded = decompress(encoded);
558+
assert_eq!(values.as_slice(), decoded.as_slice::<f32>());
559+
}
560+
561+
#[test]
562+
fn test_large_f64_array_with_patches() {
563+
let size = 8_000;
564+
let mut values = vec![2.2184f64; size];
565+
values[0] = PI;
566+
values[1000] = E;
567+
values[2000] = f64::NAN;
568+
values[3000] = f64::INFINITY;
569+
values[4000] = f64::NEG_INFINITY;
570+
values[5000] = 0.0;
571+
values[6000] = -0.0;
572+
values[7000] = 999.999999999;
573+
574+
let array = PrimitiveArray::new(Buffer::from(values.clone()), Validity::NonNullable);
575+
let encoded = alp_encode(&array, None).unwrap();
576+
577+
assert!(encoded.patches().is_some());
578+
let decoded = decompress(encoded);
579+
580+
for idx in 0..size {
581+
let decoded_val = decoded.as_slice::<f64>()[idx];
582+
let original_val = values[idx];
583+
assert!(
584+
decoded_val.is_eq(original_val),
585+
"At index {idx}: Expected {original_val} but got {decoded_val}"
586+
);
587+
}
588+
}
589+
590+
#[test]
591+
fn test_large_nullable_array() {
592+
let size = 12_000;
593+
let values: Vec<Option<f32>> = (0..size)
594+
.map(|i| {
595+
if i % 7 == 0 {
596+
None
597+
} else {
598+
Some((i as f32) * 0.1)
599+
}
600+
})
601+
.collect();
602+
603+
let array = PrimitiveArray::from_option_iter(values);
604+
let encoded = alp_encode(&array, None).unwrap();
605+
let decoded = decompress(encoded);
606+
607+
assert_arrays_eq!(decoded, array);
608+
}
609+
610+
#[test]
611+
fn test_large_mixed_validity_with_patches() {
612+
let size = 6_000;
613+
let mut values = vec![10.125f64; size];
614+
615+
values[500] = PI;
616+
values[1500] = E;
617+
values[2500] = f64::INFINITY;
618+
values[3500] = f64::NEG_INFINITY;
619+
values[4500] = f64::NAN;
620+
621+
let validity = Validity::from_iter((0..size).map(|i| !matches!(i, 500 | 2500)));
622+
623+
let array = PrimitiveArray::new(Buffer::from(values), validity);
624+
let encoded = alp_encode(&array, None).unwrap();
625+
let decoded = decompress(encoded);
626+
627+
assert_arrays_eq!(decoded, array);
628+
}
365629
}

0 commit comments

Comments
 (0)