Skip to content

Commit 67a4457

Browse files
committed
batch execute for delta array
Signed-off-by: Connor Tsui <[email protected]>
1 parent a8ad74f commit 67a4457

File tree

2 files changed

+75
-13
lines changed

2 files changed

+75
-13
lines changed

encodings/fastlanes/src/delta/array/delta_decompress.rs

Lines changed: 68 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,37 +10,92 @@ use num_traits::WrappingAdd;
1010
use vortex_array::Array;
1111
use vortex_array::ToCanonical;
1212
use vortex_array::arrays::PrimitiveArray;
13+
use vortex_array::execution::ExecutionCtx;
1314
use vortex_array::validity::Validity;
1415
use vortex_buffer::Buffer;
1516
use vortex_buffer::BufferMut;
1617
use vortex_dtype::NativePType;
18+
use vortex_dtype::PTypeDowncastExt;
1719
use vortex_dtype::match_each_unsigned_integer_ptype;
20+
use vortex_error::VortexResult;
21+
use vortex_error::vortex_panic;
22+
use vortex_vector::Vector;
23+
use vortex_vector::primitive::PVector;
24+
use vortex_vector::primitive::PrimitiveVector;
1825

1926
use crate::DeltaArray;
2027

28+
/// Decompresses the [`DeltaArray`] into a vector.
29+
pub fn delta_decompress_into_primitive_vector(
30+
array: &DeltaArray,
31+
ctx: &mut ExecutionCtx,
32+
) -> VortexResult<Vector> {
33+
let bases = array.bases().batch_execute(ctx)?.into_primitive();
34+
let deltas = array.deltas().batch_execute(ctx)?.into_primitive();
35+
36+
let start = array.offset();
37+
let end = start + array.len();
38+
39+
let validity = array.deltas().validity_mask().slice(start..end);
40+
41+
macro_rules! decompress_case {
42+
($pv:expr, $type:ty, $deltas:expr, $start:expr, $end:expr, $validity:expr) => {{
43+
const LANES: usize = <$type>::LANES;
44+
let buffer = decompress_primitive::<$type, LANES>(
45+
$pv.as_ref(),
46+
$deltas.downcast::<$type>().as_ref(),
47+
);
48+
let buffer = buffer.slice($start..$end);
49+
50+
// SAFETY: We slice the buffer and the validity by the same range.
51+
unsafe { PVector::<$type>::new_unchecked(buffer, $validity) }.into()
52+
}};
53+
}
54+
55+
Ok(match bases {
56+
PrimitiveVector::U8(pv) => decompress_case!(pv, u8, deltas, start, end, validity),
57+
PrimitiveVector::U16(pv) => decompress_case!(pv, u16, deltas, start, end, validity),
58+
PrimitiveVector::U32(pv) => decompress_case!(pv, u32, deltas, start, end, validity),
59+
PrimitiveVector::U64(pv) => decompress_case!(pv, u64, deltas, start, end, validity),
60+
PrimitiveVector::I8(_)
61+
| PrimitiveVector::I16(_)
62+
| PrimitiveVector::I32(_)
63+
| PrimitiveVector::I64(_)
64+
| PrimitiveVector::F16(_)
65+
| PrimitiveVector::F32(_)
66+
| PrimitiveVector::F64(_) => {
67+
vortex_panic!("Tried to match a non-unsigned vector in an unsigned match statement")
68+
}
69+
})
70+
}
71+
2172
pub fn delta_decompress(array: &DeltaArray) -> PrimitiveArray {
2273
let bases = array.bases().to_primitive();
2374
let deltas = array.deltas().to_primitive();
24-
let decoded = match_each_unsigned_integer_ptype!(deltas.ptype(), |T| {
75+
76+
let start = array.offset();
77+
let end = start + array.len();
78+
79+
let validity = Validity::from_mask(array.deltas().validity_mask(), array.dtype().nullability());
80+
let validity = validity.slice(start..end);
81+
82+
match_each_unsigned_integer_ptype!(deltas.ptype(), |T| {
2583
const LANES: usize = T::LANES;
2684

27-
PrimitiveArray::new(
28-
decompress_primitive::<T, LANES>(bases.as_slice(), deltas.as_slice()),
29-
Validity::from_mask(array.deltas().validity_mask(), array.dtype().nullability()),
30-
)
31-
});
85+
let buffer = decompress_primitive::<T, LANES>(bases.as_slice(), deltas.as_slice());
86+
let buffer = buffer.slice(start..end);
3287

33-
decoded
34-
.slice(array.offset()..array.offset() + array.len())
35-
.to_primitive()
88+
PrimitiveArray::new(buffer, validity)
89+
})
3690
}
3791

3892
// TODO(ngates): can we re-use the deltas buffer for the result? Might be tricky given the
3993
// traversal ordering, but possibly doable.
40-
fn decompress_primitive<T: NativePType + Delta + Transpose + WrappingAdd, const LANES: usize>(
41-
bases: &[T],
42-
deltas: &[T],
43-
) -> Buffer<T> {
94+
/// Performs the low-level delta decompression on primitive values.
95+
pub(super) fn decompress_primitive<T, const LANES: usize>(bases: &[T], deltas: &[T]) -> Buffer<T>
96+
where
97+
T: NativePType + Delta + Transpose + WrappingAdd,
98+
{
4499
// How many fastlanes vectors we will process.
45100
let num_chunks = deltas.len() / 1024;
46101

encodings/fastlanes/src/delta/vtable/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use fastlanes::FastLanes;
55
use prost::Message;
66
use vortex_array::ProstMetadata;
7+
use vortex_array::execution::ExecutionCtx;
78
use vortex_array::serde::ArrayChildren;
89
use vortex_array::vtable;
910
use vortex_array::vtable::ArrayId;
@@ -18,8 +19,10 @@ use vortex_dtype::PType;
1819
use vortex_dtype::match_each_unsigned_integer_ptype;
1920
use vortex_error::VortexResult;
2021
use vortex_error::vortex_err;
22+
use vortex_vector::Vector;
2123

2224
use crate::DeltaArray;
25+
use crate::delta::array::delta_decompress::delta_decompress_into_primitive_vector;
2326

2427
mod array;
2528
mod canonical;
@@ -98,6 +101,10 @@ impl VTable for DeltaVTable {
98101

99102
DeltaArray::try_new(bases, deltas, metadata.0.offset as usize, len)
100103
}
104+
105+
fn batch_execute(array: &DeltaArray, ctx: &mut ExecutionCtx) -> VortexResult<Vector> {
106+
delta_decompress_into_primitive_vector(array, ctx)
107+
}
101108
}
102109

103110
#[derive(Debug)]

0 commit comments

Comments
 (0)