Skip to content

Commit d12c41b

Browse files
authored
Feature: batch execute for DeltaArray (#5667)
Tracking Issue: #5652 Implements `batch_execute` for `DeltaArray`. Signed-off-by: Connor Tsui <[email protected]>
1 parent ad1de1e commit d12c41b

File tree

3 files changed

+144
-14
lines changed

3 files changed

+144
-14
lines changed

encodings/fastlanes/src/delta/array/delta_decompress.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use fastlanes::Delta;
77
use fastlanes::FastLanes;
88
use fastlanes::Transpose;
99
use num_traits::WrappingAdd;
10-
use vortex_array::Array;
1110
use vortex_array::ToCanonical;
1211
use vortex_array::arrays::PrimitiveArray;
1312
use vortex_array::validity::Validity;
@@ -21,26 +20,32 @@ use crate::DeltaArray;
2120
pub fn delta_decompress(array: &DeltaArray) -> PrimitiveArray {
2221
let bases = array.bases().to_primitive();
2322
let deltas = array.deltas().to_primitive();
24-
let decoded = match_each_unsigned_integer_ptype!(deltas.ptype(), |T| {
23+
24+
let start = array.offset();
25+
let end = start + array.len();
26+
27+
// TODO(connor): This is incorrect, we need to untranspose the validity!!!
28+
29+
let validity = Validity::from_mask(array.deltas().validity_mask(), array.dtype().nullability());
30+
let validity = validity.slice(start..end);
31+
32+
match_each_unsigned_integer_ptype!(deltas.ptype(), |T| {
2533
const LANES: usize = T::LANES;
2634

27-
PrimitiveArray::new(
28-
decompress_primitive::<T, LANES>(bases.as_slice(), deltas.as_slice()),
29-
Validity::from_mask(array.deltas().validity_mask(), array.dtype().nullability()),
30-
)
31-
});
35+
let buffer = decompress_primitive::<T, LANES>(bases.as_slice(), deltas.as_slice());
36+
let buffer = buffer.slice(start..end);
3237

33-
decoded
34-
.slice(array.offset()..array.offset() + array.len())
35-
.to_primitive()
38+
PrimitiveArray::new(buffer, validity)
39+
})
3640
}
3741

3842
// TODO(ngates): can we re-use the deltas buffer for the result? Might be tricky given the
3943
// traversal ordering, but possibly doable.
40-
fn decompress_primitive<T: NativePType + Delta + Transpose + WrappingAdd, const LANES: usize>(
41-
bases: &[T],
42-
deltas: &[T],
43-
) -> Buffer<T> {
44+
/// Performs the low-level delta decompression on primitive values.
45+
pub(crate) fn decompress_primitive<T, const LANES: usize>(bases: &[T], deltas: &[T]) -> Buffer<T>
46+
where
47+
T: NativePType + Delta + Transpose + WrappingAdd,
48+
{
4449
// How many fastlanes vectors we will process.
4550
let num_chunks = deltas.len() / 1024;
4651

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::fmt::Debug;
5+
use std::fmt::Formatter;
6+
7+
use fastlanes::Delta;
8+
use fastlanes::FastLanes;
9+
use fastlanes::Transpose;
10+
use num_traits::WrappingAdd;
11+
use vortex_array::kernel::Kernel;
12+
use vortex_array::kernel::KernelRef;
13+
use vortex_array::kernel::PushDownResult;
14+
use vortex_dtype::NativePType;
15+
use vortex_dtype::PTypeDowncastExt;
16+
use vortex_error::VortexResult;
17+
use vortex_error::vortex_panic;
18+
use vortex_mask::Mask;
19+
use vortex_vector::Vector;
20+
use vortex_vector::primitive::PVector;
21+
use vortex_vector::primitive::PrimitiveVector;
22+
23+
use crate::delta::array::delta_decompress::decompress_primitive;
24+
25+
/// Holds the bound kernels and metadata needed to execute delta decompression.
26+
pub struct DeltaKernel {
27+
pub(super) bases_kernel: KernelRef,
28+
pub(super) deltas_kernel: KernelRef,
29+
pub(super) start: usize,
30+
pub(super) end: usize,
31+
pub(super) validity: Mask,
32+
}
33+
34+
impl Kernel for DeltaKernel {
35+
fn execute(self: Box<Self>) -> VortexResult<Vector> {
36+
// Extract all fields to avoid borrow issues.
37+
let DeltaKernel {
38+
bases_kernel,
39+
deltas_kernel,
40+
start,
41+
end,
42+
validity,
43+
} = *self;
44+
45+
let bases = bases_kernel.execute()?.into_primitive();
46+
let deltas = deltas_kernel.execute()?.into_primitive();
47+
48+
Ok(match bases {
49+
PrimitiveVector::U8(pv) => {
50+
decompress::<u8, { u8::LANES }>(&pv, &deltas, start, end, validity)
51+
}
52+
PrimitiveVector::U16(pv) => {
53+
decompress::<u16, { u16::LANES }>(&pv, &deltas, start, end, validity)
54+
}
55+
PrimitiveVector::U32(pv) => {
56+
decompress::<u32, { u32::LANES }>(&pv, &deltas, start, end, validity)
57+
}
58+
PrimitiveVector::U64(pv) => {
59+
decompress::<u64, { u64::LANES }>(&pv, &deltas, start, end, validity)
60+
}
61+
PrimitiveVector::I8(_)
62+
| PrimitiveVector::I16(_)
63+
| PrimitiveVector::I32(_)
64+
| PrimitiveVector::I64(_)
65+
| PrimitiveVector::F16(_)
66+
| PrimitiveVector::F32(_)
67+
| PrimitiveVector::F64(_) => {
68+
vortex_panic!("Tried to match a non-unsigned vector in an unsigned match statement")
69+
}
70+
})
71+
}
72+
73+
fn push_down_filter(self: Box<Self>, _selection: &Mask) -> VortexResult<PushDownResult> {
74+
Ok(PushDownResult::NotPushed(self))
75+
}
76+
}
77+
78+
/// Decompresses delta-encoded data for a specific primitive type.
79+
fn decompress<T, const LANES: usize>(
80+
bases: &PVector<T>,
81+
deltas: &PrimitiveVector,
82+
start: usize,
83+
end: usize,
84+
validity: Mask,
85+
) -> Vector
86+
where
87+
T: NativePType + Delta + Transpose + WrappingAdd,
88+
{
89+
let buffer = decompress_primitive::<T, LANES>(bases.as_ref(), deltas.downcast::<T>().as_ref());
90+
let buffer = buffer.slice(start..end);
91+
92+
// SAFETY: We slice the buffer and the validity by the same range.
93+
unsafe { PVector::<T>::new_unchecked(buffer, validity) }.into()
94+
}
95+
96+
impl Debug for DeltaKernel {
97+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
98+
f.debug_struct("DeltaKernel")
99+
.field("start", &self.start)
100+
.field("end", &self.end)
101+
.finish_non_exhaustive()
102+
}
103+
}

encodings/fastlanes/src/delta/vtable/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use fastlanes::FastLanes;
55
use prost::Message;
66
use vortex_array::ArrayRef;
77
use vortex_array::ProstMetadata;
8+
use vortex_array::kernel::BindCtx;
9+
use vortex_array::kernel::KernelRef;
810
use vortex_array::serde::ArrayChildren;
911
use vortex_array::vtable;
1012
use vortex_array::vtable::ArrayId;
@@ -21,10 +23,12 @@ use vortex_error::VortexResult;
2123
use vortex_error::vortex_ensure;
2224
use vortex_error::vortex_err;
2325

26+
use self::kernel::DeltaKernel;
2427
use crate::DeltaArray;
2528

2629
mod array;
2730
mod canonical;
31+
mod kernel;
2832
mod operations;
2933
mod validity;
3034
mod visitor;
@@ -117,6 +121,24 @@ impl VTable for DeltaVTable {
117121

118122
DeltaArray::try_new(bases, deltas, metadata.0.offset as usize, len)
119123
}
124+
125+
fn bind_kernel(array: &DeltaArray, ctx: &mut BindCtx) -> VortexResult<KernelRef> {
126+
let bases_kernel = array.bases().bind_kernel(ctx)?;
127+
let deltas_kernel = array.deltas().bind_kernel(ctx)?;
128+
129+
let start = array.offset();
130+
let end = start + array.len();
131+
132+
let validity = array.deltas().validity_mask().slice(start..end);
133+
134+
Ok(Box::new(DeltaKernel {
135+
bases_kernel,
136+
deltas_kernel,
137+
start,
138+
end,
139+
validity,
140+
}))
141+
}
120142
}
121143

122144
#[derive(Debug)]

0 commit comments

Comments
 (0)