Skip to content

Commit 2fd6412

Browse files
committed
wip
Signed-off-by: Alexander Droste <[email protected]>
1 parent 965dcdc commit 2fd6412

File tree

2 files changed

+119
-147
lines changed

2 files changed

+119
-147
lines changed

encodings/alp/src/alp/array.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,12 @@ impl ALPArray {
360360
pub fn patches(&self) -> Option<&Patches> {
361361
self.patches.as_ref()
362362
}
363+
364+
/// Consumes the array and returns its parts.
365+
#[inline]
366+
pub fn into_parts(self) -> (ArrayRef, Exponents, Option<Patches>, DType) {
367+
(self.encoded, self.exponents, self.patches, self.dtype)
368+
}
363369
}
364370

365371
impl ValidityChild<ALPVTable> for ALPVTable {

encodings/alp/src/alp/decompress.rs

Lines changed: 113 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,15 @@
44
use std::mem::transmute;
55

66
use num_traits::AsPrimitive;
7+
use vortex_array::ArrayRef;
78
use vortex_array::ToCanonical;
89
use vortex_array::arrays::PrimitiveArray;
910
use vortex_array::arrays::patch_chunk;
1011
use vortex_array::patches::Patches;
11-
use vortex_array::validity::Validity;
1212
use vortex_array::vtable::ValidityHelper;
1313
use vortex_buffer::BufferMut;
14+
use vortex_dtype::DType;
1415
use vortex_dtype::match_each_unsigned_integer_ptype;
15-
use vortex_mask::MaskMut;
1616
use vortex_vector::primitive::PVectorMut;
1717

1818
use crate::ALPArray;
@@ -31,40 +31,26 @@ pub(crate) fn decompress_to_pvector<T: ALPFloat>(array: ALPArray) -> PVectorMut<
3131
return PVectorMut::with_capacity(0);
3232
}
3333

34-
let (buffer, validity) = decompress_to_components::<T>(array);
35-
// SAFETY: buffer and validity have same length from decompress_to_components
36-
unsafe { PVectorMut::new_unchecked(buffer, validity) }
37-
}
38-
39-
/// Decompresses an ALP-encoded array to a buffer and mask.
40-
///
41-
/// Returns `(BufferMut<T>, MaskMut)` with matching lengths.
42-
pub(crate) fn decompress_to_components<T: ALPFloat>(array: ALPArray) -> (BufferMut<T>, MaskMut) {
43-
let encoded = array.encoded().to_primitive();
44-
let validity = encoded.validity().clone();
45-
let exponents = array.exponents();
46-
let patches = array.patches().cloned();
47-
48-
// We need to drop ALPArray here in case converting encoded buffer into
49-
// primitive didn't create a copy. In that case both alp_encoded and array
50-
// will hold a reference to the buffer we want to mutate.
51-
drop(array);
52-
53-
let encoded_buffer = encoded.into_buffer_mut();
54-
55-
// Extract chunk offsets if patches exist
56-
let chunk_offsets = patches
57-
.as_ref()
58-
.and_then(|p| p.chunk_offsets().as_ref())
59-
.map(|c| c.to_primitive());
60-
61-
decompress_core::<T>(
62-
encoded_buffer,
63-
exponents,
64-
validity,
65-
patches.as_ref(),
66-
chunk_offsets.as_ref(),
67-
)
34+
let (encoded, exponents, patches, dtype) = array.into_parts();
35+
let decompressed = if let Some(ref patches) = patches
36+
&& let Some(chunk_offsets) = patches.chunk_offsets()
37+
{
38+
decompress_chunked(
39+
encoded,
40+
exponents,
41+
patches,
42+
&chunk_offsets.as_ref().to_primitive(),
43+
dtype,
44+
)
45+
} else {
46+
decompress_unchunked(encoded, exponents, patches, dtype)
47+
};
48+
49+
let validity = decompressed.validity().clone();
50+
let buffer = decompressed.into_buffer_mut::<T>();
51+
let validity_mask = validity.to_mask(buffer.len()).into_mut();
52+
// SAFETY: buffer and validity_mask have same length.
53+
unsafe { PVectorMut::new_unchecked(buffer, validity_mask) }
6854
}
6955

7056
/// Decompresses an ALP-encoded array.
@@ -77,34 +63,19 @@ pub(crate) fn decompress_to_components<T: ALPFloat>(array: ALPArray) -> (BufferM
7763
///
7864
/// A `PrimitiveArray` containing the decompressed floating-point values with all patches applied.
7965
pub fn decompress(array: ALPArray) -> PrimitiveArray {
80-
let dtype = array.dtype().clone();
81-
82-
match_each_alp_float_ptype!(dtype.as_ptype(), |T| {
83-
let (buffer, validity_mask) = decompress_to_components::<T>(array);
84-
PrimitiveArray::new(
85-
buffer.freeze(),
86-
Validity::from_mask(validity_mask.freeze(), dtype.nullability()),
87-
)
88-
})
89-
}
90-
91-
/// Core decompression logic that all public APIs use.
92-
/// Returns raw components that can be assembled into different array types.
93-
fn decompress_core<T: ALPFloat>(
94-
encoded_buffer: BufferMut<T::ALPInt>,
95-
exponents: Exponents,
96-
validity: Validity,
97-
patches: Option<&Patches>,
98-
patches_chunk_offsets: Option<&PrimitiveArray>,
99-
) -> (BufferMut<T>, MaskMut) {
100-
// Only use chunked decompression if we have both patches AND chunk offsets
101-
if let Some(patches) = patches
102-
&& let Some(chunk_offsets) = patches_chunk_offsets
66+
let (encoded, exponents, patches, dtype) = array.into_parts();
67+
if let Some(ref patches) = patches
68+
&& let Some(chunk_offsets) = patches.chunk_offsets()
10369
{
104-
decompress_chunked_core::<T>(encoded_buffer, exponents, validity, patches, chunk_offsets)
70+
decompress_chunked(
71+
encoded,
72+
exponents,
73+
patches,
74+
&chunk_offsets.as_ref().to_primitive(),
75+
dtype,
76+
)
10577
} else {
106-
// Handle both: no patches, or patches without chunk offsets
107-
decompress_unchunked_core::<T>(encoded_buffer, exponents, validity, patches)
78+
decompress_unchunked(encoded, exponents, patches, dtype)
10879
}
10980
}
11081

@@ -114,103 +85,113 @@ fn decompress_core<T: ALPFloat>(
11485
///
11586
/// # Arguments
11687
///
117-
/// * `encoded_buffer` - The encoded buffer containing ALP-compressed data
118-
/// * `exponents` - The ALP exponents used for encoding
119-
/// * `validity` - The validity mask for the array
88+
/// * `array` - The ALP-encoded array to decompress
12089
/// * `patches` - The patches containing exceptional values and their positions
12190
/// * `patches_chunk_offsets` - Offsets into the patches array for each chunk
12291
///
12392
/// # Returns
12493
///
125-
/// A tuple containing the decompressed buffer and validity mask
94+
/// A `PrimitiveArray` containing the decompressed values with all patches applied.
12695
#[expect(
12796
clippy::cognitive_complexity,
12897
reason = "complexity is from nested match_each_* macros"
12998
)]
130-
fn decompress_chunked_core<T: ALPFloat>(
131-
mut encoded_buffer: BufferMut<T::ALPInt>,
99+
fn decompress_chunked(
100+
encoded_ref: ArrayRef,
132101
exponents: Exponents,
133-
validity: Validity,
134102
patches: &Patches,
135103
patches_chunk_offsets: &PrimitiveArray,
136-
) -> (BufferMut<T>, MaskMut) {
137-
let array_len = encoded_buffer.len();
138-
let validity_mask = validity.to_mask(array_len).into_mut();
139-
let patches_offset = patches.offset();
104+
dtype: DType,
105+
) -> PrimitiveArray {
106+
let encoded = encoded_ref.to_primitive();
107+
108+
let validity = encoded.validity().clone();
140109

141110
let patches_indices = patches.indices().as_ref().to_primitive();
142111
let patches_values = patches.values().as_ref().to_primitive();
143-
let patches_values_slice = patches_values.as_slice::<T>();
144-
145-
match_each_unsigned_integer_ptype!(patches_chunk_offsets.ptype(), |C| {
146-
let patches_chunk_offsets_slice = patches_chunk_offsets.as_slice::<C>();
147-
// Base offset is the first chunk offset
148-
let base_offset = if !patches_chunk_offsets_slice.is_empty() {
149-
AsPrimitive::<usize>::as_(patches_chunk_offsets_slice[0])
150-
} else {
151-
0
152-
};
153-
154-
match_each_unsigned_integer_ptype!(patches_indices.ptype(), |I| {
155-
let patches_indices_slice = patches_indices.as_slice::<I>();
156-
157-
// Process in chunks of 1024 for cache efficiency
158-
for (chunk_idx, chunk_start) in (0..array_len).step_by(1024).enumerate() {
159-
let chunk_end = (chunk_start + 1024).min(array_len);
160-
let chunk_slice = &mut encoded_buffer.as_mut_slice()[chunk_start..chunk_end];
161-
162-
T::decode_slice_inplace(chunk_slice, exponents);
163-
164-
// SAFETY: After decoding, T::ALPInt values are valid T values
165-
let decoded_chunk: &mut [T] = unsafe { transmute(chunk_slice) };
166-
167-
// Apply patches for this chunk using the original patch_chunk function
168-
let offset_within_chunk = patches.offset_within_chunk().unwrap_or(0);
169-
patch_chunk(
170-
decoded_chunk,
171-
patches_indices_slice,
172-
patches_values_slice,
173-
patches_offset,
174-
patches_chunk_offsets_slice,
175-
chunk_idx,
176-
base_offset,
177-
offset_within_chunk,
178-
);
179-
}
180-
})
181-
});
112+
let ptype = dtype.as_ptype();
113+
let array_len = encoded_ref.len();
114+
let patches_offset = patches.offset();
182115

183-
// SAFETY: After decoding, T::ALPInt buffer contains valid T values
184-
let decoded_buffer = unsafe { transmute::<BufferMut<T::ALPInt>, BufferMut<T>>(encoded_buffer) };
185-
(decoded_buffer, validity_mask)
116+
// We need to drop ALPArray here in case converting encoded buffer into
117+
// primitive didn't create a copy. In that case both alp_encoded and array
118+
// will hold a reference to the buffer we want to mutate.
119+
drop(encoded_ref);
120+
121+
match_each_alp_float_ptype!(ptype, |T| {
122+
let patches_values = patches_values.as_slice::<T>();
123+
let mut alp_buffer = encoded.into_buffer_mut();
124+
match_each_unsigned_integer_ptype!(patches_chunk_offsets.ptype(), |C| {
125+
let patches_chunk_offsets = patches_chunk_offsets.as_slice::<C>();
126+
// There always is at least one chunk offset.
127+
let base_offset = patches_chunk_offsets[0];
128+
let offset_within_chunk = patches.offset_within_chunk().unwrap_or(0);
129+
130+
match_each_unsigned_integer_ptype!(patches_indices.ptype(), |I| {
131+
let patches_indices = patches_indices.as_slice::<I>();
132+
133+
for (chunk_idx, chunk_start) in (0..array_len).step_by(1024).enumerate() {
134+
let chunk_end = (chunk_start + 1024).min(array_len);
135+
let chunk_slice = &mut alp_buffer.as_mut_slice()[chunk_start..chunk_end];
136+
137+
<T>::decode_slice_inplace(chunk_slice, exponents);
138+
139+
let decoded_chunk: &mut [T] = unsafe { transmute(chunk_slice) };
140+
patch_chunk(
141+
decoded_chunk,
142+
patches_indices,
143+
patches_values,
144+
patches_offset,
145+
patches_chunk_offsets,
146+
chunk_idx,
147+
base_offset.as_(),
148+
offset_within_chunk,
149+
);
150+
}
151+
152+
let decoded_buffer: BufferMut<T> = unsafe { transmute(alp_buffer) };
153+
PrimitiveArray::new::<T>(decoded_buffer.freeze(), validity)
154+
})
155+
})
156+
})
186157
}
187158

188159
/// Decompresses an ALP-encoded array without chunk offsets.
189160
///
190-
/// This function decodes the complete buffer at once and applies any patches at the end in one go.
191-
fn decompress_unchunked_core<T: ALPFloat>(
192-
encoded_buffer: BufferMut<T::ALPInt>,
161+
/// This function decodes the complete array at once and then applies any patches after.
162+
fn decompress_unchunked(
163+
encoded_ref: ArrayRef,
193164
exponents: Exponents,
194-
validity: Validity,
195-
patches: Option<&Patches>,
196-
) -> (BufferMut<T>, MaskMut) {
197-
// Decode entire buffer.
198-
let mut decoded_buffer = T::decode_buffer(encoded_buffer, exponents);
199-
let mut validity_mask = validity.to_mask(decoded_buffer.len()).into_mut();
200-
201-
// Apply patches if present
165+
patches: Option<Patches>,
166+
dtype: DType,
167+
) -> PrimitiveArray {
168+
let encoded = encoded_ref.to_primitive();
169+
170+
// We need to drop ALPArray here in case converting encoded buffer into
171+
// primitive didn't create a copy. In that case both alp_encoded and array
172+
// will hold a reference to the buffer we want to mutate.
173+
drop(encoded_ref);
174+
175+
let validity = encoded.validity().clone();
176+
let ptype = dtype.as_ptype();
177+
178+
let decoded = match_each_alp_float_ptype!(ptype, |T| {
179+
PrimitiveArray::new::<T>(
180+
<T>::decode_buffer(encoded.into_buffer_mut(), exponents),
181+
validity,
182+
)
183+
});
184+
202185
if let Some(patches) = patches {
203-
// SAFETY: Patches invariants guarantee indices are valid.
204-
unsafe {
205-
patches.apply_to_buffer(decoded_buffer.as_mut_slice(), &mut validity_mask);
206-
}
186+
decoded.patch(&patches)
187+
} else {
188+
decoded
207189
}
208-
209-
(decoded_buffer, validity_mask)
210190
}
211191

212192
#[cfg(test)]
213193
mod tests {
194+
use vortex_array::validity::Validity;
214195
use vortex_buffer::buffer;
215196
use vortex_vector::VectorMutOps;
216197

@@ -355,19 +336,4 @@ mod tests {
355336
let expected_f64 = decompress(f64_encoded);
356337
assert_eq!(expected_f64.as_slice::<f64>(), f64_vector.as_ref());
357338
}
358-
359-
#[test]
360-
fn test_components_api() {
361-
let array = PrimitiveArray::from_iter([1.5f32, 2.5, 3.5]);
362-
let encoded = alp_encode(&array, None).unwrap();
363-
364-
let (buffer, validity) = decompress_to_components::<f32>(encoded);
365-
assert_eq!(buffer.len(), 3);
366-
assert_eq!(validity.len(), 3);
367-
368-
// All should be valid
369-
for i in 0..3 {
370-
assert!(validity.value(i));
371-
}
372-
}
373339
}

0 commit comments

Comments
 (0)