Skip to content

Commit 6c214de

Browse files
committed
ALP Kernel
Signed-off-by: Nicholas Gates <[email protected]>
1 parent 5c5f7d1 commit 6c214de

File tree

4 files changed

+107
-32
lines changed

4 files changed

+107
-32
lines changed

encodings/alp/src/alp/array.rs

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use vortex_array::ProstMetadata;
1717
use vortex_array::SerializeMetadata;
1818
use vortex_array::kernel::BindCtx;
1919
use vortex_array::kernel::KernelRef;
20-
use vortex_array::kernel::kernel;
2120
use vortex_array::patches::Patches;
2221
use vortex_array::patches::PatchesMetadata;
2322
use vortex_array::serde::ArrayChildren;
@@ -49,7 +48,8 @@ use crate::ALPFloat;
4948
use crate::alp::Exponents;
5049
use crate::alp::alp_encode;
5150
use crate::alp::decompress::decompress_into_array;
52-
use crate::alp::decompress::decompress_into_vector;
51+
use crate::alp::kernel::ALPKernel;
52+
use crate::alp::kernel::PatchKernels;
5353
use crate::match_each_alp_float_ptype;
5454

5555
vtable!(ALP);
@@ -190,15 +190,15 @@ impl VTable for ALPVTable {
190190
fn bind_kernel(array: &ALPArray, ctx: &mut BindCtx) -> VortexResult<KernelRef> {
191191
let encoded = array.encoded().bind_kernel(ctx)?;
192192
let patches_kernels = if let Some(patches) = array.patches() {
193-
Some((
194-
patches.indices().bind_kernel(ctx)?,
195-
patches.values().bind_kernel(ctx)?,
196-
patches
193+
Some(PatchKernels {
194+
indices: patches.indices().bind_kernel(ctx)?,
195+
values: patches.values().bind_kernel(ctx)?,
196+
chunk_offsets: patches
197197
.chunk_offsets()
198198
.as_ref()
199199
.map(|co| co.bind_kernel(ctx))
200200
.transpose()?,
201-
))
201+
})
202202
} else {
203203
None
204204
};
@@ -207,24 +207,12 @@ impl VTable for ALPVTable {
207207
let exponents = array.exponents();
208208

209209
match_each_alp_float_ptype!(array.dtype().as_ptype(), |T| {
210-
Ok(kernel(move || {
211-
let encoded_vector = encoded.execute()?;
212-
let patches_vectors = match patches_kernels {
213-
Some((idx_kernel, val_kernel, co_kernel)) => Some((
214-
idx_kernel.execute()?,
215-
val_kernel.execute()?,
216-
co_kernel.map(|k| k.execute()).transpose()?,
217-
)),
218-
None => None,
219-
};
220-
221-
decompress_into_vector::<T>(
222-
encoded_vector,
223-
exponents,
224-
patches_vectors,
225-
patches_offset,
226-
)
227-
}))
210+
Ok(Box::new(ALPKernel::<T>::new(
211+
exponents,
212+
encoded,
213+
patches_kernels,
214+
patches_offset,
215+
)))
228216
})
229217
}
230218
}

encodings/alp/src/alp/decompress.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ use vortex_error::VortexResult;
1818
use vortex_vector::Vector;
1919
use vortex_vector::VectorMutOps;
2020
use vortex_vector::VectorOps;
21+
use vortex_vector::primitive::PVector;
2122
use vortex_vector::primitive::PVectorMut;
23+
use vortex_vector::primitive::PrimitiveVector;
2224

2325
use crate::ALPArray;
2426
use crate::ALPFloat;
@@ -53,23 +55,20 @@ pub fn decompress_into_array(array: ALPArray) -> PrimitiveArray {
5355
///
5456
/// A `Vector` containing the decompressed floating-point values with all patches applied.
5557
pub fn decompress_into_vector<T: ALPFloat>(
56-
encoded_vector: Vector,
58+
encoded_vector: PVector<T::ALPInt>,
5759
exponents: Exponents,
58-
patches_vectors: Option<(Vector, Vector, Option<Vector>)>,
60+
patches_vectors: Option<(PrimitiveVector, PrimitiveVector, Option<PrimitiveVector>)>,
5961
patches_offset: usize,
6062
) -> VortexResult<Vector> {
61-
let encoded_primitive = encoded_vector.into_primitive().into_mut();
62-
let (mut alp_buffer, mask) = T::ALPInt::downcast(encoded_primitive).into_parts();
63+
// TODO(ngates): we should try_into_mut and fallback to decode _while_ copying.
64+
let (mut alp_buffer, mask) = encoded_vector.into_mut().into_parts();
6365
<T>::decode_slice_inplace(alp_buffer.as_mut_slice(), exponents);
6466

6567
// SAFETY: `Buffer<T::ALPInt> and `BufferMut<T>` have the same layout.
6668
let mut decoded_buffer: BufferMut<T> = unsafe { transmute(alp_buffer) };
6769

6870
// Apply patches if they exist.
6971
if let Some((patches_indices, patches_values, _)) = patches_vectors {
70-
let patches_indices = patches_indices.into_primitive();
71-
let patches_values = patches_values.into_primitive();
72-
7372
let values_buffer = T::downcast(patches_values.into_mut()).into_parts().0;
7473
let values_slice = values_buffer.as_slice();
7574
let decoded_slice = decoded_buffer.as_mut_slice();

encodings/alp/src/alp/kernel.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::marker::PhantomData;
5+
6+
use vortex_array::kernel::Kernel;
7+
use vortex_array::kernel::KernelRef;
8+
use vortex_array::kernel::PushDownResult;
9+
use vortex_dtype::PTypeDowncastExt;
10+
use vortex_error::VortexResult;
11+
use vortex_mask::Mask;
12+
use vortex_vector::Vector;
13+
14+
use crate::ALPFloat;
15+
use crate::Exponents;
16+
use crate::alp::decompress::decompress_into_vector;
17+
18+
#[derive(Debug)]
19+
pub(super) struct ALPKernel<A: ALPFloat> {
20+
exponents: Exponents,
21+
encoded: KernelRef,
22+
patches: Option<PatchKernels>,
23+
patches_offset: usize,
24+
_phantom: PhantomData<A>,
25+
}
26+
27+
impl<A: ALPFloat> ALPKernel<A> {
28+
pub(super) fn new(
29+
exponents: Exponents,
30+
encoded: KernelRef,
31+
patches: Option<PatchKernels>,
32+
patches_offset: usize,
33+
) -> Self {
34+
Self {
35+
exponents,
36+
encoded,
37+
patches,
38+
patches_offset,
39+
_phantom: PhantomData,
40+
}
41+
}
42+
}
43+
44+
#[derive(Debug)]
45+
pub(super) struct PatchKernels {
46+
pub(super) indices: KernelRef,
47+
pub(super) values: KernelRef,
48+
pub(super) chunk_offsets: Option<KernelRef>,
49+
}
50+
51+
impl<A: ALPFloat> Kernel for ALPKernel<A> {
52+
fn execute(self: Box<Self>) -> VortexResult<Vector> {
53+
let encoded = self
54+
.encoded
55+
.execute()?
56+
.into_primitive()
57+
.downcast::<A::ALPInt>();
58+
59+
let patches_vectors = match self.patches {
60+
None => None,
61+
Some(PatchKernels {
62+
indices,
63+
values,
64+
chunk_offsets,
65+
}) => {
66+
let indices = indices.execute()?.into_primitive();
67+
let values = values.execute()?.into_primitive();
68+
let chunk_offsets = match chunk_offsets {
69+
None => None,
70+
Some(co) => Some(co.execute()?.into_primitive()),
71+
};
72+
Some((indices, values, chunk_offsets))
73+
}
74+
};
75+
76+
decompress_into_vector::<A>(
77+
encoded,
78+
self.exponents,
79+
patches_vectors,
80+
self.patches_offset,
81+
)
82+
}
83+
84+
fn push_down_filter(self: Box<Self>, _selection: &Mask) -> VortexResult<PushDownResult> {
85+
Ok(PushDownResult::NotPushed(self))
86+
}
87+
}

encodings/alp/src/alp/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ mod array;
1717
mod compress;
1818
mod compute;
1919
mod decompress;
20+
mod kernel;
2021
mod ops;
2122

2223
#[cfg(test)]

0 commit comments

Comments
 (0)