Skip to content

Commit f5785f9

Browse files
committed
batch execute for delta array
Signed-off-by: Connor Tsui <connor.tsui20@gmail.com>
1 parent a8ad74f commit f5785f9

File tree

2 files changed

+86
-9
lines changed

2 files changed

+86
-9
lines changed

encodings/fastlanes/src/delta/array/delta_decompress.rs

Lines changed: 79 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,28 +15,55 @@ use vortex_buffer::Buffer;
1515
use vortex_buffer::BufferMut;
1616
use vortex_dtype::NativePType;
1717
use vortex_dtype::match_each_unsigned_integer_ptype;
18+
use vortex_vector::primitive::PVector;
19+
use vortex_vector::primitive::PrimitiveVector;
1820

1921
use crate::DeltaArray;
2022

23+
/// Decompresses the [`DeltaArray`] into a vector.
24+
pub fn delta_decompress_into_primitive_vector(array: &DeltaArray) -> PrimitiveVector {
25+
let bases = array.bases().to_primitive();
26+
let deltas = array.deltas().to_primitive();
27+
28+
let start = array.offset();
29+
let end = start + array.len();
30+
31+
let validity = array.deltas().validity_mask().slice(start..end);
32+
33+
match_each_unsigned_integer_ptype!(deltas.ptype(), |T| {
34+
const LANES: usize = T::LANES;
35+
36+
let buffer = decompress_primitive::<T, LANES>(bases.as_slice(), deltas.as_slice());
37+
let buffer = buffer.slice(start..end);
38+
39+
// SAFETY: We slice the buffer and the validity by the same range.
40+
unsafe { PVector::<T>::new_unchecked(buffer, validity) }.into()
41+
})
42+
}
43+
2144
pub fn delta_decompress(array: &DeltaArray) -> PrimitiveArray {
2245
let bases = array.bases().to_primitive();
2346
let deltas = array.deltas().to_primitive();
24-
let decoded = match_each_unsigned_integer_ptype!(deltas.ptype(), |T| {
47+
48+
let start = array.offset();
49+
let end = start + array.len();
50+
51+
let validity = Validity::from_mask(array.deltas().validity_mask(), array.dtype().nullability());
52+
let validity = validity.slice(start..end);
53+
54+
match_each_unsigned_integer_ptype!(deltas.ptype(), |T| {
2555
const LANES: usize = T::LANES;
2656

27-
PrimitiveArray::new(
28-
decompress_primitive::<T, LANES>(bases.as_slice(), deltas.as_slice()),
29-
Validity::from_mask(array.deltas().validity_mask(), array.dtype().nullability()),
30-
)
31-
});
57+
let buffer = decompress_primitive::<T, LANES>(bases.as_slice(), deltas.as_slice());
58+
let buffer = buffer.slice(start..end);
3259

33-
decoded
34-
.slice(array.offset()..array.offset() + array.len())
35-
.to_primitive()
60+
PrimitiveArray::new(buffer, validity)
61+
})
3662
}
3763

3864
// TODO(ngates): can we re-use the deltas buffer for the result? Might be tricky given the
3965
// traversal ordering, but possibly doable.
66+
/// Performs the low-level delta decompression on primitive values.
4067
fn decompress_primitive<T: NativePType + Delta + Transpose + WrappingAdd, const LANES: usize>(
4168
bases: &[T],
4269
deltas: &[T],
@@ -84,3 +111,46 @@ fn decompress_primitive<T: NativePType + Delta + Transpose + WrappingAdd, const
84111

85112
output.freeze()
86113
}
114+
115+
#[cfg(test)]
116+
mod tests {
117+
use vortex_array::arrays::PrimitiveArray;
118+
use vortex_dtype::PTypeDowncast;
119+
use vortex_vector::VectorOps;
120+
121+
use super::*;
122+
use crate::DeltaArray;
123+
124+
#[test]
125+
fn test_decompress_into_vector() {
126+
let input: PrimitiveArray = (0u32..10_000).collect();
127+
let delta = DeltaArray::try_from_primitive_array(&input).unwrap();
128+
assert_eq!(delta.len(), input.len());
129+
130+
let decompressed = delta_decompress_into_primitive_vector(&delta);
131+
let decompressed = decompressed.into_u32();
132+
133+
assert_eq!(decompressed.len(), input.len());
134+
for (actual, expected) in decompressed.as_ref().iter().zip(input.as_slice::<u32>()) {
135+
assert_eq!(actual, expected);
136+
}
137+
assert_eq!(decompressed.validity(), &input.validity_mask());
138+
}
139+
140+
#[test]
141+
fn test_decompress_into_vector_nullable() {
142+
let input =
143+
PrimitiveArray::from_option_iter((0u32..10_000).map(|i| (i % 2 == 0).then_some(i)));
144+
let delta = DeltaArray::try_from_primitive_array(&input).unwrap();
145+
assert_eq!(delta.len(), input.len());
146+
147+
let decompressed = delta_decompress_into_primitive_vector(&delta);
148+
let decompressed = decompressed.into_u32();
149+
150+
assert_eq!(decompressed.len(), input.len());
151+
for (actual, expected) in decompressed.as_ref().iter().zip(input.as_slice::<u32>()) {
152+
assert_eq!(actual, expected);
153+
}
154+
assert_eq!(decompressed.validity(), &input.validity_mask());
155+
}
156+
}

encodings/fastlanes/src/delta/vtable/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use fastlanes::FastLanes;
55
use prost::Message;
66
use vortex_array::ProstMetadata;
7+
use vortex_array::execution::ExecutionCtx;
78
use vortex_array::serde::ArrayChildren;
89
use vortex_array::vtable;
910
use vortex_array::vtable::ArrayId;
@@ -18,8 +19,10 @@ use vortex_dtype::PType;
1819
use vortex_dtype::match_each_unsigned_integer_ptype;
1920
use vortex_error::VortexResult;
2021
use vortex_error::vortex_err;
22+
use vortex_vector::Vector;
2123

2224
use crate::DeltaArray;
25+
use crate::delta::array::delta_decompress::delta_decompress_into_primitive_vector;
2326

2427
mod array;
2528
mod canonical;
@@ -98,6 +101,10 @@ impl VTable for DeltaVTable {
98101

99102
DeltaArray::try_new(bases, deltas, metadata.0.offset as usize, len)
100103
}
104+
105+
fn batch_execute(array: &DeltaArray, _ctx: &mut ExecutionCtx) -> VortexResult<Vector> {
106+
Ok(delta_decompress_into_primitive_vector(array).into())
107+
}
101108
}
102109

103110
#[derive(Debug)]

0 commit comments

Comments
 (0)