Skip to content

Commit 4b871b6

Browse files
committed
feat: fuse alp decode with applying patches
Signed-off-by: Alexander Droste <[email protected]>
1 parent 28f5b3d commit 4b871b6

File tree

3 files changed

+319
-4
lines changed

3 files changed

+319
-4
lines changed

encodings/alp/src/alp/compress.rs

Lines changed: 251 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use vortex_array::validity::Validity;
88
use vortex_array::vtable::ValidityHelper;
99
use vortex_array::{ArrayRef, IntoArray, ToCanonical};
1010
use vortex_buffer::{Buffer, BufferMut};
11-
use vortex_dtype::PType;
11+
use vortex_dtype::{PType, match_each_unsigned_integer_ptype};
1212
use vortex_error::{VortexResult, vortex_bail};
1313
use vortex_mask::Mask;
1414

@@ -118,12 +118,91 @@ where
118118
Ok((exponents, encoded_array, patches))
119119
}
120120

121+
/// Decompresses an ALP-encoded array.
122+
///
123+
/// # Arguments
124+
///
125+
/// * `array` - The ALP-encoded array to decompress
126+
///
127+
/// # Returns
128+
///
129+
/// A `PrimitiveArray` containing the decompressed floating-point values with all patches applied.
121130
pub fn decompress(array: &ALPArray) -> PrimitiveArray {
131+
if let Some(patches) = array.patches()
132+
&& let Some(chunk_offsets) = patches.chunk_offsets()
133+
{
134+
return decompress_chunked(array, patches, &chunk_offsets.as_ref().to_primitive());
135+
}
136+
137+
decompress_unchunked(array)
138+
}
139+
140+
/// Decompresses an ALP-encoded array in 1024-element chunks.
141+
///
142+
/// Decoding and applying patches is done in chunks of 1024 elements for better L1 cache locality.
143+
///
144+
/// # Arguments
145+
///
146+
/// * `array` - The ALP-encoded array to decompress
147+
/// * `patches` - The patches containing exceptional values and their positions
148+
/// * `patches_chunk_offsets` - Offsets into the patches array for each chunk
149+
///
150+
/// # Returns
151+
///
152+
/// A `PrimitiveArray` containing the decompressed values with all patches applied.
153+
#[allow(clippy::cognitive_complexity)]
154+
pub fn decompress_chunked(
155+
array: &ALPArray,
156+
patches: &Patches,
157+
patches_chunk_offsets: &PrimitiveArray,
158+
) -> PrimitiveArray {
159+
let alp_encoded = array.encoded().to_primitive();
160+
let validity = alp_encoded.validity().clone();
161+
162+
let patches_indices = patches.indices().as_ref().to_primitive();
163+
let patches_values = patches.values().as_ref().to_primitive();
164+
165+
match_each_alp_float_ptype!(array.dtype().as_ptype(), |T| {
166+
match_each_unsigned_integer_ptype!(patches_chunk_offsets.ptype(), |C| {
167+
match_each_unsigned_integer_ptype!(patches_indices.ptype(), |I| {
168+
let patches_indices = patches_indices.as_slice::<I>();
169+
let patches_values = patches_values.as_slice::<T>();
170+
let patches_chunk_offsets = patches_chunk_offsets.as_slice::<C>();
171+
172+
let alp_buffer = alp_encoded.into_buffer();
173+
let array_len = array.len();
174+
let mut decoded_values = BufferMut::<T>::with_capacity(array_len);
175+
let patches_offset = patches.offset();
176+
177+
for (chunk_idx, chunk_start) in (0..array_len).step_by(1024).enumerate() {
178+
let chunk_end = (chunk_start + 1024).min(array_len);
179+
let chunk_slice = &alp_buffer.as_slice()[chunk_start..chunk_end];
180+
<T>::decode_into_buffer(chunk_slice, array.exponents(), &mut decoded_values);
181+
182+
PrimitiveArray::patch_chunk(
183+
&mut decoded_values,
184+
patches_indices,
185+
patches_values,
186+
patches_offset,
187+
patches_chunk_offsets,
188+
chunk_idx,
189+
);
190+
}
191+
192+
PrimitiveArray::new::<T>(decoded_values, validity)
193+
})
194+
})
195+
})
196+
}
197+
198+
/// Decompresses an ALP-encoded array without chunk offsets.
199+
///
200+
/// This function decodes the complete array at once and then applies any patches after.
201+
fn decompress_unchunked(array: &ALPArray) -> PrimitiveArray {
122202
let encoded = array.encoded().to_primitive();
123203
let validity = encoded.validity().clone();
124-
let ptype = array.dtype().as_ptype();
125204

126-
let decoded = match_each_alp_float_ptype!(ptype, |T| {
205+
let decoded = match_each_alp_float_ptype!(array.dtype().as_ptype(), |T| {
127206
PrimitiveArray::new::<T>(
128207
<T>::decode_buffer(encoded.into_buffer_mut(), array.exponents()),
129208
validity,
@@ -362,4 +441,173 @@ mod tests {
362441
let expected_values = PrimitiveArray::from_iter(vec![PI, E]);
363442
assert_arrays_eq!(patch_values, expected_values);
364443
}
444+
445+
#[test]
446+
fn test_slice_half_chunk_f32_roundtrip() {
447+
// Create 1024 elements, encode, slice to first 512, then decode
448+
let values = vec![1.234f32; 1024];
449+
let original = PrimitiveArray::new(Buffer::from(values), Validity::NonNullable);
450+
let encoded = alp_encode(&original, None).unwrap();
451+
452+
let sliced_alp = encoded.slice(512..1024);
453+
let decoded = sliced_alp.to_primitive();
454+
455+
let expected_slice = original.slice(512..1024).to_primitive();
456+
assert_eq!(expected_slice.as_slice::<f32>(), decoded.as_slice::<f32>());
457+
}
458+
459+
#[test]
460+
fn test_slice_half_chunk_f64_roundtrip() {
461+
let values = vec![5.678f64; 1024];
462+
let original = PrimitiveArray::new(Buffer::from(values), Validity::NonNullable);
463+
let encoded = alp_encode(&original, None).unwrap();
464+
465+
let sliced_alp = encoded.slice(512..1024);
466+
let decoded = sliced_alp.to_primitive();
467+
468+
let expected_slice = original.slice(512..1024).to_primitive();
469+
assert_eq!(expected_slice.as_slice::<f64>(), decoded.as_slice::<f64>());
470+
}
471+
472+
#[test]
473+
fn test_slice_half_chunk_with_patches_roundtrip() {
474+
let mut values = vec![1.0f64; 1024];
475+
values[100] = PI;
476+
values[200] = E;
477+
values[600] = 42.42;
478+
479+
let original = PrimitiveArray::new(Buffer::from(values), Validity::NonNullable);
480+
let encoded = alp_encode(&original, None).unwrap();
481+
482+
let sliced_alp = encoded.slice(512..1024);
483+
let decoded = sliced_alp.to_primitive();
484+
485+
let expected_slice = original.slice(512..1024).to_primitive();
486+
assert_eq!(expected_slice.as_slice::<f64>(), decoded.as_slice::<f64>());
487+
assert!(encoded.patches().is_some());
488+
}
489+
490+
#[test]
491+
fn test_slice_half_chunk_nullable_roundtrip() {
492+
let values = (0..1024)
493+
.map(|i| if i % 3 == 0 { None } else { Some(2.5f32) })
494+
.collect::<Vec<_>>();
495+
496+
let original = PrimitiveArray::from_option_iter(values);
497+
let encoded = alp_encode(&original, None).unwrap();
498+
499+
let sliced_alp = encoded.slice(512..1024);
500+
let decoded = sliced_alp.to_primitive();
501+
502+
let expected_slice = original.slice(512..1024);
503+
assert_arrays_eq!(decoded, expected_slice);
504+
}
505+
506+
#[test]
507+
fn test_large_f32_array_uniform_values() {
508+
let size = 10_000;
509+
let array = PrimitiveArray::new(buffer![42.125f32; size], Validity::NonNullable);
510+
let encoded = alp_encode(&array, None).unwrap();
511+
512+
assert!(encoded.patches().is_none());
513+
let decoded = decompress(&encoded);
514+
assert_eq!(array.as_slice::<f32>(), decoded.as_slice::<f32>());
515+
}
516+
517+
#[test]
518+
fn test_large_f64_array_uniform_values() {
519+
let size = 50_000;
520+
let array = PrimitiveArray::new(buffer![123.456789f64; size], Validity::NonNullable);
521+
let encoded = alp_encode(&array, None).unwrap();
522+
523+
assert!(encoded.patches().is_none());
524+
let decoded = decompress(&encoded);
525+
assert_eq!(array.as_slice::<f64>(), decoded.as_slice::<f64>());
526+
}
527+
528+
#[test]
529+
fn test_large_f32_array_with_patches() {
530+
let size = 5_000;
531+
let mut values = vec![1.5f32; size];
532+
values[100] = std::f32::consts::PI;
533+
values[1500] = std::f32::consts::E;
534+
values[3000] = f32::NEG_INFINITY;
535+
values[4500] = f32::INFINITY;
536+
537+
let array = PrimitiveArray::new(Buffer::from(values.clone()), Validity::NonNullable);
538+
let encoded = alp_encode(&array, None).unwrap();
539+
540+
assert!(encoded.patches().is_some());
541+
let decoded = decompress(&encoded);
542+
assert_eq!(values.as_slice(), decoded.as_slice::<f32>());
543+
}
544+
545+
#[test]
546+
fn test_large_f64_array_with_patches() {
547+
let size = 8_000;
548+
let mut values = vec![2.2184f64; size];
549+
values[0] = PI;
550+
values[1000] = E;
551+
values[2000] = f64::NAN;
552+
values[3000] = f64::INFINITY;
553+
values[4000] = f64::NEG_INFINITY;
554+
values[5000] = 0.0;
555+
values[6000] = -0.0;
556+
values[7000] = 999.999999999;
557+
558+
let array = PrimitiveArray::new(Buffer::from(values.clone()), Validity::NonNullable);
559+
let encoded = alp_encode(&array, None).unwrap();
560+
561+
assert!(encoded.patches().is_some());
562+
let decoded = decompress(&encoded);
563+
564+
for idx in 0..size {
565+
let decoded_val = decoded.as_slice::<f64>()[idx];
566+
let original_val = values[idx];
567+
assert!(
568+
decoded_val.is_eq(original_val),
569+
"At index {idx}: Expected {original_val} but got {decoded_val}"
570+
);
571+
}
572+
}
573+
574+
#[test]
575+
fn test_large_nullable_array() {
576+
let size = 12_000;
577+
let values: Vec<Option<f32>> = (0..size)
578+
.map(|i| {
579+
if i % 7 == 0 {
580+
None
581+
} else {
582+
Some((i as f32) * 0.1)
583+
}
584+
})
585+
.collect();
586+
587+
let array = PrimitiveArray::from_option_iter(values);
588+
let encoded = alp_encode(&array, None).unwrap();
589+
let decoded = decompress(&encoded);
590+
591+
assert_arrays_eq!(decoded, array);
592+
}
593+
594+
#[test]
595+
fn test_large_mixed_validity_with_patches() {
596+
let size = 6_000;
597+
let mut values = vec![10.125f64; size];
598+
599+
values[500] = PI;
600+
values[1500] = E;
601+
values[2500] = f64::INFINITY;
602+
values[3500] = f64::NEG_INFINITY;
603+
values[4500] = f64::NAN;
604+
605+
let validity = Validity::from_iter((0..size).map(|i| !matches!(i, 500 | 2500)));
606+
607+
let array = PrimitiveArray::new(Buffer::from(values), validity);
608+
let encoded = alp_encode(&array, None).unwrap();
609+
let decoded = decompress(&encoded);
610+
611+
assert_arrays_eq!(decoded, array);
612+
}
365613
}

encodings/alp/src/alp/mod.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,33 @@ pub trait ALPFloat: private::Sealed + Float + Display + NativePType {
192192
values
193193
}
194194

195+
/// Decodes a slice of encoded ALP values into the output buffer.
196+
///
197+
/// ## Preconditions
198+
///
199+
/// The `output` buffer must have sufficient spare capacity for `encoded.len()` elements
200+
fn decode_into_buffer(
201+
encoded: &[Self::ALPInt],
202+
exponents: Exponents,
203+
output: &mut BufferMut<Self>,
204+
) {
205+
let input_len = encoded.len();
206+
let current_len = output.len();
207+
let buffer_uninit = output.spare_capacity_mut();
208+
209+
// SAFETY: `MaybeUninit<Self>` and `Self` have the same layout.
210+
let buffer_values: &mut [Self] =
211+
unsafe { std::mem::transmute(&mut buffer_uninit[..input_len]) };
212+
213+
for (idx, &encoded_val) in encoded.iter().enumerate() {
214+
buffer_values[idx] = Self::decode_single(encoded_val, exponents);
215+
}
216+
217+
unsafe {
218+
output.set_len(current_len + input_len);
219+
}
220+
}
221+
195222
fn decode_buffer(encoded: BufferMut<Self::ALPInt>, exponents: Exponents) -> BufferMut<Self> {
196223
encoded.map_each_in_place(move |encoded| Self::decode_single(encoded, exponents))
197224
}

vortex-array/src/arrays/primitive/array/patch.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
use vortex_dtype::{IntegerPType, NativePType, match_each_integer_ptype, match_each_native_ptype};
4+
use vortex_buffer::BufferMut;
5+
use vortex_dtype::{
6+
IntegerPType, NativePType, UnsignedPType, match_each_integer_ptype, match_each_native_ptype,
7+
};
58

69
use crate::ToCanonical;
710
use crate::arrays::PrimitiveArray;
@@ -53,6 +56,43 @@ impl PrimitiveArray {
5356
}
5457
Self::new(own_values, patched_validity)
5558
}
59+
60+
/// Patches a chunk of decoded values with the given patches.
61+
///
62+
/// # Arguments
63+
///
64+
/// * `decoded_values` - Mutable buffer of decoded values to be patched
65+
/// * `patches_indices` - Indices indicating which positions to patch
66+
/// * `patches_values` - Values to apply at the patched indices
67+
/// * `patches_offset` - Offset to subtract from patch indices
68+
/// * `chunk_offsets_slice` - Slice containing offsets for each chunk
69+
/// * `chunk_idx` - Index of the chunk to patch
70+
#[inline]
71+
pub fn patch_chunk<T, I, C>(
72+
decoded_values: &mut BufferMut<T>,
73+
patches_indices: &[I],
74+
patches_values: &[T],
75+
patches_offset: usize,
76+
chunk_offsets_slice: &[C],
77+
chunk_idx: usize,
78+
) where
79+
T: NativePType,
80+
I: UnsignedPType,
81+
C: UnsignedPType,
82+
{
83+
let patches_start_idx = chunk_offsets_slice[chunk_idx].as_();
84+
let patches_end_idx = if chunk_idx + 1 < chunk_offsets_slice.len() {
85+
chunk_offsets_slice[chunk_idx + 1].as_()
86+
} else {
87+
patches_indices.len()
88+
};
89+
90+
for patches_idx in patches_start_idx..patches_end_idx {
91+
let patched_value = patches_values[patches_idx];
92+
let patches_idx_without_offset: usize = patches_indices[patches_idx].as_();
93+
decoded_values[patches_idx_without_offset - patches_offset] = patched_value;
94+
}
95+
}
5696
}
5797

5898
#[cfg(test)]

0 commit comments

Comments
 (0)