Skip to content

Commit 4178feb

Browse files
committed
implement bind for BitPackedArray
Signed-off-by: Connor Tsui <[email protected]>
1 parent 28c5ce5 commit 4178feb

File tree

5 files changed

+107
-199
lines changed

5 files changed

+107
-199
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

encodings/fastlanes/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@ prost = { workspace = true }
2828
rand = { workspace = true, optional = true }
2929
vortex-array = { workspace = true }
3030
vortex-buffer = { workspace = true }
31+
vortex-compute = { workspace = true }
3132
vortex-dtype = { workspace = true }
3233
vortex-error = { workspace = true }
3334
vortex-mask = { workspace = true }
3435
vortex-scalar = { workspace = true }
3536
vortex-utils = { workspace = true }
37+
vortex-vector = { workspace = true }
3638

3739
[dev-dependencies]
3840
divan = { workspace = true }

encodings/fastlanes/src/bitpacking/compute/filter.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,15 @@ use crate::{BitPackedArray, BitPackedVTable};
2020

2121
impl FilterKernel for BitPackedVTable {
2222
fn filter(&self, array: &BitPackedArray, mask: &Mask) -> VortexResult<ArrayRef> {
23-
let primitive = match_each_unsigned_integer_ptype!(array.ptype().to_unsigned(), |I| {
24-
filter_primitive::<I>(array, mask)
25-
});
26-
Ok(primitive?.into_array())
23+
// Since the fastlanes crate only supports unsigned integers, and since we know that all
24+
// numbers are going to be non-negative, we can safely "cast" to unsigned.
25+
let ptype = array.ptype().to_unsigned();
26+
27+
match_each_unsigned_integer_ptype!(ptype, |U| {
28+
// Note that the `filter_primitive` function will reinterpret cast the array back to the
29+
// correct `PType`, even if it was changed in `to_unsigned` above.
30+
Ok(filter_primitive::<U>(array, mask)?.into_array())
31+
})
2732
}
2833
}
2934

Lines changed: 72 additions & 195 deletions
Original file line numberDiff line numberDiff line change
@@ -1,208 +1,85 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4-
// TODO(connor): Refactor this entire module!
5-
6-
use std::any::Any;
7-
use std::cmp::min;
8-
use std::hash::{Hash, Hasher};
9-
use std::sync::Arc;
10-
11-
use fastlanes::{BitPacking, FastLanes};
12-
use vortex_array::operator::{
13-
LengthBounds, Operator, OperatorEq, OperatorHash, OperatorId, OperatorRef,
14-
};
15-
use vortex_array::pipeline::bits::BitView;
16-
use vortex_array::pipeline::view::ViewMut;
17-
use vortex_array::pipeline::{
18-
BindContext, Element, Kernel, KernelContext, N, PipelinedOperator, RowSelection,
19-
};
20-
use vortex_array::vtable::OperatorVTable;
21-
use vortex_buffer::Buffer;
22-
use vortex_dtype::{DType, PhysicalPType, match_each_integer_ptype};
4+
use fastlanes::BitPacking;
5+
use vortex_array::execution::{BatchKernelRef, BindCtx, MaskExecution, kernel};
6+
use vortex_array::vtable::{OperatorVTable, ValidityHelper};
7+
use vortex_array::{ArrayRef, IntoArray, ToCanonical, compute};
8+
use vortex_buffer::{Buffer, byte_buffer_to_buffer};
9+
use vortex_compute::filter::Filter;
10+
use vortex_dtype::{NativePType, match_each_unsigned_integer_ptype};
2311
use vortex_error::VortexResult;
12+
use vortex_vector::PVector;
2413

2514
use crate::{BitPackedArray, BitPackedVTable};
2615

2716
impl OperatorVTable<BitPackedVTable> for BitPackedVTable {
28-
fn to_operator(array: &BitPackedArray) -> VortexResult<Option<OperatorRef>> {
29-
if array.dtype.is_nullable() {
30-
log::trace!("BitPackedVTable does not support nullable arrays");
31-
return Ok(None);
32-
}
33-
if array.patches.is_some() {
34-
log::trace!("BitPackedVTable does not support nullable arrays");
35-
return Ok(None);
36-
}
37-
if array.offset != 0 {
38-
log::trace!("BitPackedVTable does not support non-zero offsets");
39-
return Ok(None);
40-
}
41-
42-
Ok(Some(Arc::new(array.clone())))
43-
}
44-
}
45-
46-
impl OperatorHash for BitPackedArray {
47-
fn operator_hash<H: Hasher>(&self, state: &mut H) {
48-
self.offset.hash(state);
49-
self.len.hash(state);
50-
self.dtype.hash(state);
51-
self.bit_width.hash(state);
52-
self.packed.operator_hash(state);
53-
// We don't care about patches because they're not yet supported by the operator.
54-
// OperatorHash(&self.patches).hash(state);
55-
self.validity.operator_hash(state);
56-
}
57-
}
58-
59-
impl OperatorEq for BitPackedArray {
60-
fn operator_eq(&self, other: &Self) -> bool {
61-
self.offset == other.offset
62-
&& self.len == other.len
63-
&& self.dtype == other.dtype
64-
&& self.bit_width == other.bit_width
65-
&& self.packed.operator_eq(&other.packed)
66-
&& self.validity.operator_eq(&other.validity)
67-
}
68-
}
69-
70-
impl Operator for BitPackedArray {
71-
fn id(&self) -> OperatorId {
72-
self.encoding_id()
73-
}
74-
75-
fn as_any(&self) -> &dyn Any {
76-
self
77-
}
78-
79-
fn dtype(&self) -> &DType {
80-
&self.dtype
81-
}
82-
83-
fn bounds(&self) -> LengthBounds {
84-
self.len.into()
85-
}
86-
87-
fn children(&self) -> &[OperatorRef] {
88-
&[]
89-
}
90-
91-
fn with_children(self: Arc<Self>, _children: Vec<OperatorRef>) -> VortexResult<OperatorRef> {
92-
Ok(self)
93-
}
94-
}
95-
96-
impl PipelinedOperator for BitPackedArray {
97-
fn row_selection(&self) -> RowSelection {
98-
RowSelection::Domain(self.len)
99-
}
100-
101-
fn bind(&self, _ctx: &dyn BindContext) -> VortexResult<Box<dyn Kernel>> {
102-
assert!(self.bit_width > 0);
103-
match_each_integer_ptype!(self.ptype(), |T| {
104-
let packed_stride =
105-
self.bit_width as usize * <<T as PhysicalPType>::Physical as FastLanes>::LANES;
106-
let buffer = Buffer::<<T as PhysicalPType>::Physical>::from_byte_buffer(
107-
self.packed.clone().into_byte_buffer(),
108-
);
109-
110-
if self.offset == 0 {
111-
Ok(Box::new(BitPackedKernel::<T>::new(
112-
self.bit_width as usize,
113-
packed_stride,
114-
buffer,
115-
)) as Box<dyn Kernel>)
116-
} else {
117-
// TODO(ngates): the unaligned kernel needs fixing for the non-masked API
118-
// Ok(Box::new(BitPackedUnalignedKernel::<T>::new(
119-
// self.bit_width as usize,
120-
// packed_stride,
121-
// buffer,
122-
// 0,
123-
// self.offset,
124-
// )) as Box<dyn Kernel>)
125-
unreachable!("Offset must be zero")
126-
}
17+
fn bind(
18+
array: &BitPackedArray,
19+
selection: Option<&ArrayRef>,
20+
ctx: &mut dyn BindCtx,
21+
) -> VortexResult<BatchKernelRef> {
22+
let selection_mask = ctx.bind_selection(array.len(), selection)?;
23+
let validity = ctx.bind_validity(array.validity(), array.len(), selection)?;
24+
25+
// Since the fastlanes crate only supports unsigned integers, and since we know that all
26+
// numbers are going to be non-negative, we can safely "cast" to unsigned.
27+
let ptype = array.ptype().to_unsigned();
28+
29+
match_each_unsigned_integer_ptype!(ptype, |U| {
30+
Ok(bitpack_filter_kernel::<U>(array, selection_mask, validity))
12731
})
12832
}
129-
130-
fn vector_children(&self) -> Vec<usize> {
131-
vec![]
132-
}
133-
134-
fn batch_children(&self) -> Vec<usize> {
135-
vec![]
136-
}
137-
}
138-
139-
// TODO(ngates): we should try putting the const bit width as a generic here, to avoid
140-
// a switch in the fastlanes library on every invocation of `unchecked_unpack`.
141-
#[derive(Clone)]
142-
pub struct BitPackedKernel<T: PhysicalPType<Physical: BitPacking>> {
143-
width: usize,
144-
packed_stride: usize,
145-
buffer: Buffer<<T as PhysicalPType>::Physical>,
146-
}
147-
148-
impl<T: PhysicalPType<Physical: BitPacking>> BitPackedKernel<T> {
149-
pub fn new(
150-
width: usize,
151-
packed_stride: usize,
152-
buffer: Buffer<<T as PhysicalPType>::Physical>,
153-
) -> Self {
154-
Self {
155-
width,
156-
packed_stride,
157-
buffer,
158-
}
159-
}
16033
}
16134

162-
impl<T> Kernel for BitPackedKernel<T>
163-
where
164-
T: PhysicalPType<Physical: BitPacking>,
165-
T: Element,
166-
<T as PhysicalPType>::Physical: Element,
167-
{
168-
fn step(
169-
&self,
170-
_ctx: &KernelContext,
171-
chunk_idx: usize,
172-
_selection: &BitView,
173-
out: &mut ViewMut,
174-
) -> VortexResult<()> {
175-
assert_eq!(
176-
N % 1024,
177-
0,
178-
"BitPackedKernel assumes N is a multiple of 1024"
179-
);
180-
181-
// We re-interpret the output view as the unsigned bitpacked type.
182-
out.reinterpret_as::<<T as PhysicalPType>::Physical>();
183-
184-
let elements = out.as_array_mut::<<T as PhysicalPType>::Physical>();
185-
186-
let packed_offset = ((chunk_idx * N) / 1024) * self.packed_stride;
187-
let packed = &self.buffer.as_slice()[packed_offset..];
188-
189-
// We compute the number of FastLanes vectors for this chunk.
190-
let nvecs = min(N / 1024, packed.len() / self.packed_stride);
191-
192-
for i in 0..nvecs {
193-
// TODO(ngates): decide if the selection mask is sufficiently sparse to warrant
194-
// unpacking only the selected elements.
195-
unsafe {
196-
BitPacking::unchecked_unpack(
197-
self.width,
198-
&packed[(i * self.packed_stride)..][..self.packed_stride],
199-
&mut elements[(i * 1024)..],
200-
);
201-
}
202-
}
203-
204-
out.reinterpret_as::<T>();
205-
206-
Ok(())
207-
}
35+
/// Creates the [`BitPackedArray`] filter kernel.
36+
///
37+
/// Note that the generic type parameter `U` may be the unsigned version of the signed [`PType`] of
38+
/// the input array. This is fine because we know that all values in bitpacked arrays are
39+
/// non-negative.
40+
fn bitpack_filter_kernel<U: NativePType + BitPacking>(
41+
array: &BitPackedArray,
42+
selection_mask: MaskExecution,
43+
validity: MaskExecution,
44+
) -> BatchKernelRef {
45+
let array = array.clone(); // Cheap clone due to many internal `Arc`s.
46+
kernel(move || {
47+
let selection_mask = selection_mask.execute()?;
48+
let filtered_validity = validity.execute()?;
49+
50+
// TODO(connor): This function is implemented in a very roundabout way where we use the
51+
// existing `BitPackedArray` `filter` implementation that gives us an array, and then we
52+
// extract out the underlying buffer of the `PrimitiveArray` to create a `PrimitiveVector`.
53+
//
54+
// Ideally, we should take the underlying `ByteBuffer` of the `BitPackedArray` and unpack
55+
// that directly into a `Buffer<T>` via a `BufferMut<T>`. This is a much more general
56+
// solution that does not force everyone to use `PrimitiveBuilder`.
57+
//
58+
// However, the current decompression implementation for `BitPackedArray` is heavily tied
59+
// to the `PrimitiveBuilder` and `UninitRange` API. What we really need to do is _replace_
60+
// the `PrimitiveBuilder` with `PrimitiveVectorMut`, where instead of `UninitRange` we can
61+
// write directly to a `PVectorMut`.
62+
//
63+
// For the sake of time to get this working, we have implemented this like so.
64+
// When we eventually replace our builders with vectors, we can revisit this.
65+
66+
// Use the existing `filter` implementation over `PrimitiveArray` and extract the underlying
67+
// `ByteBuffer`.
68+
let filtered_array = compute::filter(&array.into_array(), &selection_mask)?.to_primitive();
69+
debug_assert_eq!(filtered_array.ptype().byte_width(), size_of::<U>());
70+
71+
let byte_buffer = filtered_array.into_byte_buffer();
72+
73+
// SAFETY: The `filter` compute function maintains the type of the bitpacked array, which
74+
// must have the same byte representation and alignment as `U`, so it is safe to reinterpret
75+
// this buffer.
76+
let buffer: Buffer<U> = unsafe { byte_buffer_to_buffer(byte_buffer) };
77+
let filtered_buffer = buffer.filter(&selection_mask);
78+
79+
debug_assert_eq!(filtered_buffer.len(), filtered_validity.len());
80+
81+
// SAFETY: The buffer and validity (which should have started with the same length) were
82+
// filtered by the same mask, which means their new lengths should also be the same.
83+
Ok(unsafe { PVector::new_unchecked(filtered_buffer, filtered_validity) }.into())
84+
})
20885
}

vortex-buffer/src/buffer.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -641,6 +641,28 @@ impl<T> From<BufferMut<T>> for Buffer<T> {
641641
}
642642
}
643643

644+
/// Unsafely reinterprets a `Buffer<u8>` into a `Buffer<U>`.
645+
///
646+
/// # Safety
647+
///
648+
/// The caller must ensure:
649+
/// - The alignment of the [`ByteBuffer`] is the same as the alignment of `T`.
650+
/// - The number of bytes is a multiple of `size_of::<T>`.
651+
/// - The bytes of the [`ByteBuffer`] make up valid `U` values.
652+
pub unsafe fn byte_buffer_to_buffer<T: Copy>(byte_buffer: ByteBuffer) -> Buffer<T> {
653+
debug_assert_eq!(byte_buffer.alignment, Alignment::of::<T>());
654+
655+
debug_assert!(byte_buffer.length.is_multiple_of(size_of::<T>()));
656+
let new_length = byte_buffer.length / size_of::<T>();
657+
658+
Buffer {
659+
bytes: byte_buffer.bytes,
660+
length: new_length,
661+
alignment: byte_buffer.alignment,
662+
_marker: PhantomData,
663+
}
664+
}
665+
644666
#[cfg(test)]
645667
mod test {
646668
use bytes::Buf;

0 commit comments

Comments
 (0)