Skip to content

Commit e1fee19

Browse files
authored
Chore: refactor delta module (#5452)
This is a purely cosmetic change. Moves all vtable implementations to their own files and separates decompression functions out into a delta_decompress file Signed-off-by: Connor Tsui <[email protected]>
1 parent d96a3f3 commit e1fee19

File tree

10 files changed

+504
-414
lines changed

10 files changed

+504
-414
lines changed

encodings/fastlanes/src/delta/compress.rs renamed to encodings/fastlanes/src/delta/array/delta_compress.rs

Lines changed: 7 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,13 @@
33

44
use arrayref::{array_mut_ref, array_ref};
55
use fastlanes::{Delta, FastLanes, Transpose};
6-
use num_traits::{WrappingAdd, WrappingSub};
6+
use num_traits::WrappingSub;
77
use vortex_array::arrays::PrimitiveArray;
8-
use vortex_array::validity::Validity;
98
use vortex_array::vtable::ValidityHelper;
10-
use vortex_array::{Array, ToCanonical};
119
use vortex_buffer::{Buffer, BufferMut};
1210
use vortex_dtype::{NativePType, match_each_unsigned_integer_ptype};
1311
use vortex_error::VortexResult;
1412

15-
use crate::DeltaArray;
16-
1713
pub fn delta_compress(array: &PrimitiveArray) -> VortexResult<(PrimitiveArray, PrimitiveArray)> {
1814
// TODO(ngates): fill forward nulls?
1915
// let filled = fill_forward(array)?.to_primitive()?;
@@ -89,76 +85,14 @@ fn compress_primitive<T: NativePType + Delta + Transpose + WrappingSub, const LA
8985
(bases.freeze(), deltas.freeze())
9086
}
9187

92-
pub fn delta_decompress(array: &DeltaArray) -> PrimitiveArray {
93-
let bases = array.bases().to_primitive();
94-
let deltas = array.deltas().to_primitive();
95-
let decoded = match_each_unsigned_integer_ptype!(deltas.ptype(), |T| {
96-
const LANES: usize = T::LANES;
97-
98-
PrimitiveArray::new(
99-
decompress_primitive::<T, LANES>(bases.as_slice(), deltas.as_slice()),
100-
Validity::from_mask(array.deltas().validity_mask(), array.dtype().nullability()),
101-
)
102-
});
103-
104-
decoded
105-
.slice(array.offset()..array.offset() + array.len())
106-
.to_primitive()
107-
}
108-
109-
// TODO(ngates): can we re-use the deltas buffer for the result? Might be tricky given the
110-
// traversal ordering, but possibly doable.
111-
fn decompress_primitive<T: NativePType + Delta + Transpose + WrappingAdd, const LANES: usize>(
112-
bases: &[T],
113-
deltas: &[T],
114-
) -> Buffer<T> {
115-
// How many fastlanes vectors we will process.
116-
let num_chunks = deltas.len() / 1024;
117-
118-
// Allocate a result array.
119-
let mut output = BufferMut::with_capacity(deltas.len());
120-
121-
// Loop over all the chunks
122-
if num_chunks > 0 {
123-
let mut transposed: [T; 1024] = [T::default(); 1024];
124-
125-
for i in 0..num_chunks {
126-
let start_elem = i * 1024;
127-
let chunk: &[T; 1024] = array_ref![deltas, start_elem, 1024];
128-
129-
// Initialize the base vector for this chunk
130-
Delta::undelta::<LANES>(
131-
chunk,
132-
unsafe { &*(bases[i * LANES..(i + 1) * LANES].as_ptr().cast()) },
133-
&mut transposed,
134-
);
135-
136-
let output_len = output.len();
137-
unsafe { output.set_len(output_len + 1024) }
138-
Transpose::untranspose(&transposed, array_mut_ref![output[output_len..], 0, 1024]);
139-
}
140-
}
141-
assert_eq!(output.len() % 1024, 0);
142-
143-
// The remainder was encoded with scalar logic, so we need to scalar decode it.
144-
let remainder_size = deltas.len() % 1024;
145-
if remainder_size > 0 {
146-
let chunk = &deltas[num_chunks * 1024..];
147-
assert_eq!(bases.len(), num_chunks * LANES + 1);
148-
let mut base_scalar = bases[num_chunks * LANES];
149-
for next_diff in chunk {
150-
let next = next_diff.wrapping_add(&base_scalar);
151-
output.push(next);
152-
base_scalar = next;
153-
}
154-
}
155-
156-
output.freeze()
157-
}
158-
15988
#[cfg(test)]
160-
mod test {
89+
mod tests {
90+
use vortex_array::arrays::PrimitiveArray;
91+
use vortex_dtype::NativePType;
92+
16193
use super::*;
94+
use crate::DeltaArray;
95+
use crate::delta::array::delta_decompress::delta_decompress;
16296

16397
#[test]
16498
fn test_compress() {
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use arrayref::{array_mut_ref, array_ref};
5+
use fastlanes::{Delta, FastLanes, Transpose};
6+
use num_traits::WrappingAdd;
7+
use vortex_array::arrays::PrimitiveArray;
8+
use vortex_array::validity::Validity;
9+
use vortex_array::{Array, ToCanonical};
10+
use vortex_buffer::{Buffer, BufferMut};
11+
use vortex_dtype::{NativePType, match_each_unsigned_integer_ptype};
12+
13+
use crate::DeltaArray;
14+
15+
pub fn delta_decompress(array: &DeltaArray) -> PrimitiveArray {
16+
let bases = array.bases().to_primitive();
17+
let deltas = array.deltas().to_primitive();
18+
let decoded = match_each_unsigned_integer_ptype!(deltas.ptype(), |T| {
19+
const LANES: usize = T::LANES;
20+
21+
PrimitiveArray::new(
22+
decompress_primitive::<T, LANES>(bases.as_slice(), deltas.as_slice()),
23+
Validity::from_mask(array.deltas().validity_mask(), array.dtype().nullability()),
24+
)
25+
});
26+
27+
decoded
28+
.slice(array.offset()..array.offset() + array.len())
29+
.to_primitive()
30+
}
31+
32+
// TODO(ngates): can we re-use the deltas buffer for the result? Might be tricky given the
33+
// traversal ordering, but possibly doable.
34+
fn decompress_primitive<T: NativePType + Delta + Transpose + WrappingAdd, const LANES: usize>(
35+
bases: &[T],
36+
deltas: &[T],
37+
) -> Buffer<T> {
38+
// How many fastlanes vectors we will process.
39+
let num_chunks = deltas.len() / 1024;
40+
41+
// Allocate a result array.
42+
let mut output = BufferMut::with_capacity(deltas.len());
43+
44+
// Loop over all the chunks
45+
if num_chunks > 0 {
46+
let mut transposed: [T; 1024] = [T::default(); 1024];
47+
48+
for i in 0..num_chunks {
49+
let start_elem = i * 1024;
50+
let chunk: &[T; 1024] = array_ref![deltas, start_elem, 1024];
51+
52+
// Initialize the base vector for this chunk
53+
Delta::undelta::<LANES>(
54+
chunk,
55+
unsafe { &*(bases[i * LANES..(i + 1) * LANES].as_ptr().cast()) },
56+
&mut transposed,
57+
);
58+
59+
let output_len = output.len();
60+
unsafe { output.set_len(output_len + 1024) }
61+
Transpose::untranspose(&transposed, array_mut_ref![output[output_len..], 0, 1024]);
62+
}
63+
}
64+
assert_eq!(output.len() % 1024, 0);
65+
66+
// The remainder was encoded with scalar logic, so we need to scalar decode it.
67+
let remainder_size = deltas.len() % 1024;
68+
if remainder_size > 0 {
69+
let chunk = &deltas[num_chunks * 1024..];
70+
assert_eq!(bases.len(), num_chunks * LANES + 1);
71+
let mut base_scalar = bases[num_chunks * LANES];
72+
for next_diff in chunk {
73+
let next = next_diff.wrapping_add(&base_scalar);
74+
output.push(next);
75+
base_scalar = next;
76+
}
77+
}
78+
79+
output.freeze()
80+
}
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use fastlanes::FastLanes;
5+
use vortex_array::arrays::PrimitiveArray;
6+
use vortex_array::stats::ArrayStats;
7+
use vortex_array::validity::Validity;
8+
use vortex_array::{ArrayRef, IntoArray};
9+
use vortex_buffer::Buffer;
10+
use vortex_dtype::{DType, NativePType, PType, match_each_unsigned_integer_ptype};
11+
use vortex_error::{VortexExpect as _, VortexResult, vortex_bail};
12+
13+
pub mod delta_compress;
14+
pub mod delta_decompress;
15+
16+
/// A FastLanes-style delta-encoded array of primitive values.
17+
///
18+
/// A [`DeltaArray`] comprises a sequence of _chunks_ each representing 1,024 delta-encoded values,
19+
/// except the last chunk which may represent from one to 1,024 values.
20+
///
21+
/// # Examples
22+
///
23+
/// ```
24+
/// use vortex_fastlanes::DeltaArray;
25+
/// let array = DeltaArray::try_from_vec(vec![1_u32, 2, 3, 5, 10, 11]).unwrap();
26+
/// ```
27+
///
28+
/// # Details
29+
///
30+
/// To facilitate slicing, this array accepts an `offset` and `logical_len`. The offset must be
31+
/// strictly less than 1,024 and the sum of `offset` and `logical_len` must not exceed the length of
32+
/// the `deltas` array. These values permit logical slicing without modifying any chunk containing a
33+
/// kept value. In particular, we may defer decompresison until the array is canonicalized or
34+
/// indexed. The `offset` is a physical offset into the first chunk, which necessarily contains
35+
/// 1,024 values. The `logical_len` is the number of logical values following the `offset`, which
36+
/// may be less than the number of physically stored values.
37+
///
38+
/// Each chunk is stored as a vector of bases and a vector of deltas. If the chunk physically
39+
/// contains 1,024 values, then there are as many bases as there are _lanes_ of this type in a
40+
/// 1024-bit register. For example, for 64-bit values, there are 16 bases because there are 16
41+
/// _lanes_. Each lane is a [delta-encoding](https://en.wikipedia.org/wiki/Delta_encoding) `1024 /
42+
/// bit_width` long vector of values. The deltas are stored in the
43+
/// [FastLanes](https://www.vldb.org/pvldb/vol16/p2132-afroozeh.pdf) order which splits the 1,024
44+
/// values into one contiguous sub-sequence per-lane, thus permitting delta encoding.
45+
///
46+
/// If the chunk physically has fewer than 1,024 values, then it is stored as a traditional,
47+
/// non-SIMD-amenable, delta-encoded vector.
48+
///
49+
/// Note the validity is stored in the deltas array.
50+
#[derive(Clone, Debug)]
51+
pub struct DeltaArray {
52+
offset: usize,
53+
len: usize,
54+
dtype: DType,
55+
bases: ArrayRef,
56+
deltas: ArrayRef,
57+
stats_set: ArrayStats,
58+
}
59+
60+
impl DeltaArray {
61+
// TODO(ngates): remove constructing from vec
62+
pub fn try_from_vec<T: NativePType>(vec: Vec<T>) -> VortexResult<Self> {
63+
Self::try_from_primitive_array(&PrimitiveArray::new(
64+
Buffer::copy_from(vec),
65+
Validity::NonNullable,
66+
))
67+
}
68+
69+
pub fn try_from_primitive_array(array: &PrimitiveArray) -> VortexResult<Self> {
70+
let (bases, deltas) = delta_compress::delta_compress(array)?;
71+
72+
Self::try_from_delta_compress_parts(bases.into_array(), deltas.into_array())
73+
}
74+
75+
/// Create a [`DeltaArray`] from the given `bases` and `deltas` arrays.
76+
/// Note the `deltas` might be nullable
77+
pub fn try_from_delta_compress_parts(bases: ArrayRef, deltas: ArrayRef) -> VortexResult<Self> {
78+
let logical_len = deltas.len();
79+
Self::try_new(bases, deltas, 0, logical_len)
80+
}
81+
82+
pub fn try_new(
83+
bases: ArrayRef,
84+
deltas: ArrayRef,
85+
offset: usize,
86+
logical_len: usize,
87+
) -> VortexResult<Self> {
88+
if offset >= 1024 {
89+
vortex_bail!("offset must be less than 1024: {}", offset);
90+
}
91+
if offset + logical_len > deltas.len() {
92+
vortex_bail!(
93+
"offset + logical_len, {} + {}, must be less than or equal to the size of deltas: {}",
94+
offset,
95+
logical_len,
96+
deltas.len()
97+
)
98+
}
99+
if !bases.dtype().eq_ignore_nullability(deltas.dtype()) {
100+
vortex_bail!(
101+
"DeltaArray: bases and deltas must have the same dtype, got {:?} and {:?}",
102+
bases.dtype(),
103+
deltas.dtype()
104+
);
105+
}
106+
let DType::Primitive(ptype, _) = bases.dtype().clone() else {
107+
vortex_bail!(
108+
"DeltaArray: dtype must be an integer, got {}",
109+
bases.dtype()
110+
);
111+
};
112+
113+
if !ptype.is_int() {
114+
vortex_bail!("DeltaArray: ptype must be an integer, got {}", ptype);
115+
}
116+
117+
let lanes = lane_count(ptype);
118+
119+
if (deltas.len() % 1024 == 0) != (bases.len() % lanes == 0) {
120+
vortex_bail!(
121+
"deltas length ({}) is a multiple of 1024 iff bases length ({}) is a multiple of LANES ({})",
122+
deltas.len(),
123+
bases.len(),
124+
lanes,
125+
);
126+
}
127+
128+
// SAFETY: validation done above
129+
Ok(unsafe { Self::new_unchecked(bases, deltas, offset, logical_len) })
130+
}
131+
132+
pub(crate) unsafe fn new_unchecked(
133+
bases: ArrayRef,
134+
deltas: ArrayRef,
135+
offset: usize,
136+
logical_len: usize,
137+
) -> Self {
138+
Self {
139+
offset,
140+
len: logical_len,
141+
dtype: bases.dtype().with_nullability(deltas.dtype().nullability()),
142+
bases,
143+
deltas,
144+
stats_set: Default::default(),
145+
}
146+
}
147+
148+
#[inline]
149+
pub fn bases(&self) -> &ArrayRef {
150+
&self.bases
151+
}
152+
153+
#[inline]
154+
pub fn deltas(&self) -> &ArrayRef {
155+
&self.deltas
156+
}
157+
158+
#[inline]
159+
pub(crate) fn lanes(&self) -> usize {
160+
let ptype =
161+
PType::try_from(self.dtype()).vortex_expect("DeltaArray DType must be primitive");
162+
lane_count(ptype)
163+
}
164+
165+
#[inline]
166+
pub fn len(&self) -> usize {
167+
self.len
168+
}
169+
170+
#[inline]
171+
pub fn is_empty(&self) -> bool {
172+
self.len == 0
173+
}
174+
175+
#[inline]
176+
pub fn dtype(&self) -> &DType {
177+
&self.dtype
178+
}
179+
180+
#[inline]
181+
/// The logical offset into the first chunk of [`Self::deltas`].
182+
pub fn offset(&self) -> usize {
183+
self.offset
184+
}
185+
186+
#[inline]
187+
pub(crate) fn bases_len(&self) -> usize {
188+
self.bases.len()
189+
}
190+
191+
#[inline]
192+
pub(crate) fn deltas_len(&self) -> usize {
193+
self.deltas.len()
194+
}
195+
196+
#[inline]
197+
pub(crate) fn stats_set(&self) -> &ArrayStats {
198+
&self.stats_set
199+
}
200+
}
201+
202+
pub(crate) fn lane_count(ptype: PType) -> usize {
203+
match_each_unsigned_integer_ptype!(ptype, |T| { T::LANES })
204+
}

0 commit comments

Comments
 (0)