Skip to content

Commit 365308d

Browse files
authored
Merge branch 'develop' into operators-bench
2 parents 86b482d + f5ee1a0 commit 365308d

File tree

26 files changed

+667
-60
lines changed

26 files changed

+667
-60
lines changed

encodings/decimal-byte-parts/src/decimal_byte_parts/rules.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
use vortex_array::Array;
45
use vortex_array::ArrayRef;
56
use vortex_array::IntoArray;
67
use vortex_array::arrays::FilterArray;
@@ -38,8 +39,7 @@ impl ArrayParentReduceRule<DecimalBytePartsVTable> for DecimalBytePartsFilterPus
3839
return Ok(None);
3940
}
4041

41-
let new_msp =
42-
FilterArray::new(child.msp.clone(), parent.filter_mask().clone()).into_array();
42+
let new_msp = child.msp.filter(parent.filter_mask().clone())?;
4343
let new_child =
4444
DecimalBytePartsArray::try_new(new_msp, *child.decimal_dtype())?.into_array();
4545
Ok(Some(new_child))

encodings/fastlanes/src/bitpacking/compute/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod filter;
77
mod is_constant;
88
mod take;
99

10+
// TODO(connor): This is duplicated in `encodings/fastlanes/src/bitpacking/kernels/mod.rs`.
1011
fn chunked_indices<F: FnMut(usize, &[usize])>(
1112
mut indices: impl Iterator<Item = usize>,
1213
offset: usize,

encodings/fastlanes/src/bitpacking/compute/take.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use crate::BitPackedArray;
3131
use crate::BitPackedVTable;
3232
use crate::bitpack_decompress;
3333

34+
// TODO(connor): This is duplicated in `encodings/fastlanes/src/bitpacking/kernels/mod.rs`.
3435
/// assuming the buffer is already allocated (which will happen at most once) then unpacking
3536
/// all 1024 elements takes ~8.8x as long as unpacking a single element on an M2 Macbook Air.
3637
/// see https://github.com/vortex-data/vortex/pull/190#issue-2223752833
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::mem::MaybeUninit;
5+
use std::sync::Arc;
6+
7+
use fastlanes::BitPacking;
8+
use vortex_array::ExecutionCtx;
9+
use vortex_array::arrays::FilterArray;
10+
use vortex_array::arrays::FilterVTable;
11+
use vortex_array::kernel::ExecuteParentKernel;
12+
use vortex_array::kernel::ParentKernelSet;
13+
use vortex_array::matchers::Exact;
14+
use vortex_buffer::BufferMut;
15+
use vortex_compute::filter::Filter;
16+
use vortex_dtype::NativePType;
17+
use vortex_dtype::PType;
18+
use vortex_dtype::UnsignedPType;
19+
use vortex_dtype::match_each_integer_ptype;
20+
use vortex_error::VortexResult;
21+
use vortex_mask::Mask;
22+
use vortex_mask::MaskValues;
23+
use vortex_vector::Vector;
24+
use vortex_vector::VectorMutOps;
25+
use vortex_vector::primitive::PVectorMut;
26+
use vortex_vector::primitive::PrimitiveVectorMut;
27+
28+
use crate::BitPackedArray;
29+
use crate::BitPackedVTable;
30+
use crate::bitpacking::vtable::kernels::UNPACK_CHUNK_THRESHOLD;
31+
use crate::bitpacking::vtable::kernels::chunked_indices;
32+
33+
pub(crate) const PARENT_KERNELS: ParentKernelSet<BitPackedVTable> =
34+
ParentKernelSet::new(&[ParentKernelSet::lift(&BitPackingFilterKernel)]);
35+
36+
/// The threshold over which it is faster to fully unpack the entire [`BitPackedArray`] and then
37+
/// filter the result than to unpack only specific bitpacked values into the output buffer.
38+
pub const fn unpack_then_filter_threshold<T>() -> f64 {
39+
// TODO(connor): Where did these numbers come from? Add a public link after validating them.
40+
// These numbers probably don't work for in-place filtering either.
41+
match size_of::<T>() {
42+
1 => 0.03,
43+
2 => 0.03,
44+
4 => 0.075,
45+
_ => 0.09,
46+
// >8 bytes may have a higher threshold. These numbers are derived from a GCP c2-standard-4
47+
// with a "Cascade Lake" CPU.
48+
}
49+
}
50+
51+
/// Kernel to execute filtering directly on a bit-packed array.
52+
#[derive(Debug)]
53+
struct BitPackingFilterKernel;
54+
55+
impl ExecuteParentKernel<BitPackedVTable> for BitPackingFilterKernel {
56+
type Parent = Exact<FilterVTable>;
57+
58+
fn parent(&self) -> Self::Parent {
59+
Exact::from(&FilterVTable)
60+
}
61+
62+
fn execute_parent(
63+
&self,
64+
array: &BitPackedArray,
65+
parent: &FilterArray,
66+
_child_idx: usize,
67+
_ctx: &mut ExecutionCtx,
68+
) -> VortexResult<Option<Vector>> {
69+
let values = match parent.filter_mask() {
70+
Mask::AllTrue(_) | Mask::AllFalse(_) => {
71+
// No optimization for full or empty mask
72+
return Ok(None);
73+
}
74+
Mask::Values(values) => values,
75+
};
76+
77+
match_each_integer_ptype!(array.ptype(), |I| {
78+
// If the density is high enough, then we would rather decompress the whole array and then apply
79+
// a filter over decompressing values one by one.
80+
if values.density() > unpack_then_filter_threshold::<I>() {
81+
return Ok(None);
82+
}
83+
});
84+
85+
let mut primitive_vector: PrimitiveVectorMut = match array.ptype() {
86+
PType::U8 => filter_primitive_without_patches::<u8>(array, values)?.into(),
87+
PType::U16 => filter_primitive_without_patches::<u16>(array, values)?.into(),
88+
PType::U32 => filter_primitive_without_patches::<u32>(array, values)?.into(),
89+
PType::U64 => filter_primitive_without_patches::<u64>(array, values)?.into(),
90+
91+
// Since the fastlanes crate only supports unsigned integers, and since we know that all
92+
// numbers are going to be non-negative, we can safely "cast" to unsigned and back.
93+
PType::I8 => {
94+
let pvector = filter_primitive_without_patches::<u8>(array, values)?;
95+
unsafe { pvector.transmute::<i8>() }.into()
96+
}
97+
PType::I16 => {
98+
let pvector = filter_primitive_without_patches::<u16>(array, values)?;
99+
unsafe { pvector.transmute::<i16>() }.into()
100+
}
101+
PType::I32 => {
102+
let pvector = filter_primitive_without_patches::<u32>(array, values)?;
103+
unsafe { pvector.transmute::<i32>() }.into()
104+
}
105+
PType::I64 => {
106+
let pvector = filter_primitive_without_patches::<u64>(array, values)?;
107+
unsafe { pvector.transmute::<i64>() }.into()
108+
}
109+
other => {
110+
unreachable!("Unsupported ptype {other} for bitpacking, we also checked this above")
111+
}
112+
};
113+
114+
// TODO(connor): We want a `PatchesArray` or patching compute functions instead of this.
115+
let patches = array
116+
.patches()
117+
.map(|patches| patches.filter(&Mask::Values(values.clone())))
118+
.transpose()?
119+
.flatten();
120+
if let Some(patches) = patches {
121+
primitive_vector = patches.apply_to_primitive_vector(primitive_vector);
122+
}
123+
124+
Ok(Some(primitive_vector.freeze().into()))
125+
}
126+
}
127+
128+
/// Specialized filter kernel for primitive bit-packed arrays.
129+
///
130+
/// Because the FastLanes bit-packing kernels are only implemented for unsigned types, the provided
131+
/// `U` should be promoted to the unsigned variant for any target bit width.
132+
/// For example, if the array is bit-packed `i16`, this function should be called with `U = u16`.
133+
///
134+
/// This function fully decompresses the array for all but the most selective masks because the
135+
/// FastLanes decompression is so fast and the bookkeepping necessary to decompress individual
136+
/// elements is relatively slow.
137+
fn filter_primitive_without_patches<U: UnsignedPType + BitPacking>(
138+
array: &BitPackedArray,
139+
selection: &Arc<MaskValues>,
140+
) -> VortexResult<PVectorMut<U>> {
141+
let values = filter_with_indices(array, selection.indices());
142+
let validity = array
143+
.validity_mask()
144+
.filter(&Mask::Values(selection.clone()))
145+
.into_mut();
146+
147+
debug_assert_eq!(
148+
values.len(),
149+
validity.len(),
150+
"`filter_with_indices` was somehow incorrect"
151+
);
152+
153+
Ok(unsafe { PVectorMut::new_unchecked(values, validity) })
154+
}
155+
156+
fn filter_with_indices<T: NativePType + BitPacking>(
157+
array: &BitPackedArray,
158+
indices: &[usize],
159+
) -> BufferMut<T> {
160+
let offset = array.offset() as usize;
161+
let bit_width = array.bit_width() as usize;
162+
let mut values = BufferMut::with_capacity(indices.len());
163+
164+
// Some re-usable memory to store per-chunk indices.
165+
let mut unpacked = [const { MaybeUninit::<T>::uninit() }; 1024];
166+
let packed_bytes = array.packed_slice::<T>();
167+
168+
// Group the indices by the FastLanes chunk they belong to.
169+
let chunk_size = 128 * bit_width / size_of::<T>();
170+
171+
chunked_indices(indices, offset, |chunk_idx, indices_within_chunk| {
172+
let packed = &packed_bytes[chunk_idx * chunk_size..][..chunk_size];
173+
174+
if indices_within_chunk.len() == 1024 {
175+
// Unpack the entire chunk.
176+
unsafe {
177+
let values_len = values.len();
178+
values.set_len(values_len + 1024);
179+
BitPacking::unchecked_unpack(
180+
bit_width,
181+
packed,
182+
&mut values.as_mut_slice()[values_len..],
183+
);
184+
}
185+
} else if indices_within_chunk.len() > UNPACK_CHUNK_THRESHOLD {
186+
// Unpack into a temporary chunk and then copy the values.
187+
unsafe {
188+
let dst: &mut [MaybeUninit<T>] = &mut unpacked;
189+
let dst: &mut [T] = std::mem::transmute(dst);
190+
BitPacking::unchecked_unpack(bit_width, packed, dst);
191+
}
192+
values.extend_trusted(
193+
indices_within_chunk
194+
.iter()
195+
.map(|&idx| unsafe { unpacked.get_unchecked(idx).assume_init() }),
196+
);
197+
} else {
198+
// Otherwise, unpack each element individually.
199+
values.extend_trusted(indices_within_chunk.iter().map(|&idx| unsafe {
200+
BitPacking::unchecked_unpack_single(bit_width, packed, idx)
201+
}));
202+
}
203+
});
204+
205+
values
206+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
pub(crate) mod filter;
5+
6+
/// Assuming the buffer is already allocated (which will happen at most once), then unpacking all
7+
/// 1024 elements takes ~8.8x as long as unpacking a single element on an M2 Macbook Air.
8+
///
9+
/// See https://github.com/vortex-data/vortex/pull/190#issue-2223752833
10+
const UNPACK_CHUNK_THRESHOLD: usize = 8;
11+
12+
fn chunked_indices<F: FnMut(usize, &[usize])>(indices: &[usize], offset: usize, mut chunk_fn: F) {
13+
if indices.is_empty() {
14+
return;
15+
}
16+
17+
let mut indices_within_chunk: Vec<usize> = Vec::with_capacity(1024);
18+
19+
let first_idx = indices[0];
20+
let mut current_chunk_idx = (first_idx + offset) / 1024;
21+
indices_within_chunk.push((first_idx + offset) % 1024);
22+
23+
for idx in &indices[1..] {
24+
let new_chunk_idx = (idx + offset) / 1024;
25+
26+
if new_chunk_idx != current_chunk_idx {
27+
chunk_fn(current_chunk_idx, &indices_within_chunk);
28+
indices_within_chunk.clear();
29+
}
30+
31+
current_chunk_idx = new_chunk_idx;
32+
indices_within_chunk.push((idx + offset) % 1024);
33+
}
34+
35+
if !indices_within_chunk.is_empty() {
36+
chunk_fn(current_chunk_idx, &indices_within_chunk);
37+
}
38+
}

encodings/fastlanes/src/bitpacking/vtable/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@ use vortex_vector::VectorMutOps;
3030

3131
use crate::BitPackedArray;
3232
use crate::bitpack_decompress::unpack_to_primitive_vector;
33+
use crate::bitpacking::vtable::kernels::filter::PARENT_KERNELS;
3334

3435
mod array;
3536
mod canonical;
3637
mod encode;
38+
mod kernels;
3739
mod operations;
3840
mod validity;
3941
mod visitor;
@@ -246,6 +248,15 @@ impl VTable for BitPackedVTable {
246248
fn execute(array: &Self::Array, _ctx: &mut ExecutionCtx) -> VortexResult<Vector> {
247249
Ok(unpack_to_primitive_vector(array).freeze().into())
248250
}
251+
252+
fn execute_parent(
253+
array: &Self::Array,
254+
parent: &ArrayRef,
255+
child_idx: usize,
256+
ctx: &mut ExecutionCtx,
257+
) -> VortexResult<Option<Vector>> {
258+
PARENT_KERNELS.execute(array, parent, child_idx, ctx)
259+
}
249260
}
250261

251262
#[derive(Debug)]

encodings/fastlanes/src/for/vtable/rules.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
use vortex_array::Array;
45
use vortex_array::ArrayRef;
56
use vortex_array::IntoArray;
67
use vortex_array::arrays::FilterArray;
@@ -34,8 +35,7 @@ impl ArrayParentReduceRule<FoRVTable> for FoRFilterPushDownRule {
3435
) -> VortexResult<Option<ArrayRef>> {
3536
let new_array = unsafe {
3637
FoRArray::new_unchecked(
37-
FilterArray::new(child.encoded().clone(), parent.filter_mask().clone())
38-
.into_array(),
38+
child.encoded.filter(parent.filter_mask().clone())?,
3939
child.reference.clone(),
4040
)
4141
};

encodings/zstd/src/array.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,7 @@ impl ZstdArray {
330330
.get(i + 1)
331331
.copied()
332332
.unwrap_or(value_bytes.len());
333+
333334
let uncompressed = &value_bytes.slice(frame_byte_starts[i]..frame_byte_end);
334335
let compressed = compressor
335336
.compress(uncompressed)
@@ -366,8 +367,12 @@ impl ZstdArray {
366367
};
367368

368369
let value_bytes = values.byte_buffer();
370+
// Align frames to buffer alignment. This is necessary for overaligned buffers.
371+
let alignment = *value_bytes.alignment();
372+
let step_width = (values_per_frame * byte_width).div_ceil(alignment) * alignment;
373+
369374
let frame_byte_starts = (0..n_values * byte_width)
370-
.step_by(values_per_frame * byte_width)
375+
.step_by(step_width)
371376
.collect::<Vec<_>>();
372377
let Frames {
373378
dictionary,

encodings/zstd/src/test.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use vortex_array::arrays::VarBinViewArray;
99
use vortex_array::assert_arrays_eq;
1010
use vortex_array::validity::Validity;
1111
use vortex_array::vtable::ValidityHelper;
12+
use vortex_buffer::Alignment;
1213
use vortex_buffer::Buffer;
1314
use vortex_dtype::DType;
1415
use vortex_dtype::Nullability;
@@ -202,3 +203,16 @@ fn test_sliced_array_children() {
202203
let sliced = compressed.slice(0..4);
203204
sliced.children();
204205
}
206+
207+
/// Tests that each beginning of a frame in ZSTD matches
208+
/// the buffer alignment when compressing primitive arrays.
209+
#[test]
210+
fn test_zstd_frame_start_buffer_alignment() {
211+
let data = vec![0u8; 2];
212+
let aligned_buffer = Buffer::copy_from_aligned(&data, Alignment::new(8));
213+
// u8 array now has a 8-byte alignment.
214+
let array = PrimitiveArray::new(aligned_buffer, Validity::NonNullable);
215+
let compressed = ZstdArray::from_primitive(&array, 0, 1);
216+
217+
assert!(compressed.is_ok());
218+
}

0 commit comments

Comments
 (0)