Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 18 additions & 30 deletions encodings/fastlanes/benches/pipeline_bitpacking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,33 +66,21 @@ pub fn decompress_bitpacking_late_filter<T: NativePType>(bencher: Bencher, fract
.bench_values(|mask| filter(array.to_canonical().as_ref(), &mask).unwrap());
}

// TODO(ngates): bring back benchmarks once operator API is stable.
// #[divan::bench(types = [i8, i16, i32, i64], args = TRUE_COUNT)]
// pub fn decompress_bitpacking_pipeline_filter<T: Element + NativePType>(
// bencher: Bencher,
// fraction_kept: f64,
// ) {
// let mut rng = StdRng::seed_from_u64(0);
// let values = (0..LENGTH)
// .map(|_| T::from(rng.random_range(0..100)).unwrap())
// .collect::<BufferMut<T>>()
// .into_array()
// .to_primitive();
// let array = bitpack_to_best_bit_width(&values).unwrap();
//
// let mask = (0..LENGTH)
// .map(|_| rng.random_bool(fraction_kept))
// .collect::<BooleanBuffer>();
//
// bencher
// .with_inputs(|| Mask::from_buffer(mask.clone()))
// .bench_local_values(|mask| {
// export_canonical_pipeline_expr(
// array.dtype(),
// array.len(),
// array.to_operator().unwrap().unwrap().as_ref(),
// &mask,
// )
// .unwrap()
// });
// }
#[divan::bench(types = [i8, i16, i32, i64], args = TRUE_COUNT)]
pub fn decompress_bitpacking_pipeline_filter<T: NativePType>(bencher: Bencher, fraction_kept: f64) {
let mut rng = StdRng::seed_from_u64(0);
let values = (0..LENGTH)
.map(|_| T::from(rng.random_range(0..100)).unwrap())
.collect::<BufferMut<T>>()
.into_array()
.to_primitive();
let array = bitpack_to_best_bit_width(&values).unwrap();

let mask = (0..LENGTH)
.map(|_| rng.random_bool(fraction_kept))
.collect::<BitBuffer>();

bencher
.with_inputs(|| Mask::from(mask.clone()))
.bench_local_values(|mask| array.execute_with_selection(&mask).unwrap());
}
126 changes: 52 additions & 74 deletions encodings/fastlanes/src/bitpacking/array/bitpack_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@

use fastlanes::{BitPacking, FastLanes};
use static_assertions::const_assert_eq;
use std::mem::{transmute, transmute_copy};
use vortex_array::pipeline::bit_view::BitView;
use vortex_array::pipeline::{BindContext, Kernel, KernelCtx, N, PipelineInputs, PipelinedNode};
use vortex_array::pipeline::{BindContext, Kernel, KernelCtx, PipelineInputs, PipelinedNode, N};
use vortex_buffer::Buffer;
use vortex_dtype::{PTypeDowncastExt, PhysicalPType, match_each_integer_ptype};
use vortex_dtype::{match_each_integer_ptype, PTypeDowncastExt, PhysicalPType};
use vortex_error::VortexResult;
use vortex_mask::Mask;
use vortex_mask::MaskMut;
use vortex_vector::primitive::PVectorMut;
use vortex_vector::{VectorMut, VectorMutOps};
use vortex_vector::VectorMut;
use vortex_vector::VectorMutOps;

use crate::BitPackedArray;

Expand Down Expand Up @@ -55,7 +57,7 @@ impl PipelinedNode for BitPackedArray {
Ok(Box::new(AlignedBitPackedKernel::<T>::new(
packed_bit_width,
packed_buffer,
self.validity.to_mask(self.len()),
self.validity.to_mask(self.len()).into_mut(),
)) as Box<dyn Kernel>)
})
}
Expand Down Expand Up @@ -84,7 +86,7 @@ pub struct AlignedBitPackedKernel<BP: PhysicalPType<Physical: BitPacking>> {
packed_buffer: Buffer<BP::Physical>,

/// The validity mask for the bitpacked array.
validity: Mask,
validity: MaskMut,

/// The total number of bitpacked chunks we have unpacked.
num_chunks_unpacked: usize,
Expand All @@ -93,8 +95,9 @@ pub struct AlignedBitPackedKernel<BP: PhysicalPType<Physical: BitPacking>> {
impl<BP: PhysicalPType<Physical: BitPacking>> AlignedBitPackedKernel<BP> {
pub fn new(
packed_bit_width: usize,
// TODO(ngates): hold an iterator over chunks instead of the full buffer?
packed_buffer: Buffer<BP::Physical>,
validity: Mask,
validity: MaskMut,
) -> Self {
let packed_stride =
packed_bit_width * <<BP as PhysicalPType>::Physical as FastLanes>::LANES;
Expand All @@ -118,30 +121,28 @@ impl<BP: PhysicalPType<Physical: BitPacking>> AlignedBitPackedKernel<BP> {
impl<BP: PhysicalPType<Physical: BitPacking>> Kernel for AlignedBitPackedKernel<BP> {
fn step(
&mut self,
_ctx: &KernelCtx,
_ctx: &mut KernelCtx,
selection: &BitView,
out: &mut VectorMut,
) -> VortexResult<()> {
let output_vector: &mut PVectorMut<BP::Physical> = out.as_primitive_mut().downcast();
debug_assert!(output_vector.is_empty());
out: VectorMut,
) -> VortexResult<VectorMut> {
if selection.true_count() == 0 {
debug_assert!(out.is_empty());
return Ok(out);
}

let packed_offset = self.num_chunks_unpacked * self.packed_stride;
let not_yet_unpacked_values = &self.packed_buffer.as_slice()[packed_offset..];
let mut output: PVectorMut<BP> = out.into_primitive().downcast();
debug_assert!(output.is_empty());

let true_count = selection.true_count();
let chunk_offset = self.num_chunks_unpacked * N;
let array_len = self.validity.len();
debug_assert!(chunk_offset < array_len);
let packed_offset = self.num_chunks_unpacked * self.packed_stride;
let packed_bytes = &self.packed_buffer[packed_offset..][..self.packed_stride];

// If the true count is very small (the selection is sparse), we can unpack individual
// elements directly into the output vector.
if true_count < SCALAR_UNPACK_THRESHOLD {
output_vector.reserve(true_count);
debug_assert!(true_count <= output_vector.capacity());
if selection.true_count() < SCALAR_UNPACK_THRESHOLD {
output.reserve(selection.true_count());

selection.iter_ones(|idx| {
let absolute_idx = chunk_offset + idx;
if self.validity.value(absolute_idx) {
if self.validity.value(idx) {
// SAFETY:
// - The documentation for `packed_bit_width` explains that the size is valid.
// - We know that the size of the `next_packed_chunk` we provide is equal to
Expand All @@ -150,84 +151,61 @@ impl<BP: PhysicalPType<Physical: BitPacking>> Kernel for AlignedBitPackedKernel<
let unpacked_value = unsafe {
BitPacking::unchecked_unpack_single(
self.packed_bit_width,
not_yet_unpacked_values,
packed_bytes,
idx,
)
};

// SAFETY: We just reserved enough capacity to push these values.
unsafe { output_vector.push_unchecked(unpacked_value) };
unsafe { output.push_unchecked(transmute_copy(&unpacked_value)) };
} else {
output_vector.append_nulls(1);
output.append_nulls(1);
}
});
} else {
// Otherwise if the mask is dense, it is faster to fully unpack the entire 1024
// element lane with SIMD / FastLanes and let other nodes in the pipeline decide if they
// want to perform the selection filter themselves.
output_vector.reserve(N);
debug_assert!(N <= output_vector.capacity());

let next_packed_chunk = &not_yet_unpacked_values[..self.packed_stride];
debug_assert_eq!(
next_packed_chunk.len(),
FL_VECTOR_SIZE * self.packed_bit_width / BP::Physical::T
);

// SAFETY: We have just reserved enough capacity for the elements buffer to set the
// length, and we are about to initialize all of the values **without** reading the
// memory.
unsafe { output_vector.elements_mut().set_len(N) };

// SAFETY:
// - The documentation for `packed_bit_width` explains that the size is valid.
// - We know that the size of the `next_packed_chunk` we provide is equal to
// `self.packed_stride`, and we explain why this is correct in its documentation.
// - It is clear that the output buffer has length 1024.
unsafe {
BitPacking::unchecked_unpack(
self.packed_bit_width,
next_packed_chunk,
output_vector.as_mut(),
);
}
self.num_chunks_unpacked += 1;
return Ok(output.into());
}

if array_len < chunk_offset + N {
let vector_len = array_len - chunk_offset;
debug_assert!(vector_len < N, "math is broken");
// Otherwise if the mask is dense, it is faster to fully unpack the entire 1024
// element lane with SIMD / FastLanes and let other nodes in the pipeline decide if they
// want to perform the selection filter themselves.
let (mut elements, _validity) = output.into_parts();

// SAFETY: This must be less than `N` so this is just a truncate.
unsafe { output_vector.elements_mut().set_len(vector_len) };
elements.reserve(N);
// SAFETY: we just reserved enough capacity.
unsafe { elements.set_len(N) };

let chunk_mask = self.validity.slice(chunk_offset..array_len);
unsafe {
BitPacking::unchecked_unpack(
self.packed_bit_width,
packed_bytes,
transmute(elements.as_mut()),
);
}

// SAFETY: We have just set the elements length to N, and the validity buffer has
// capacity for N elements.
unsafe { output_vector.validity_mut() }.append_mask(&chunk_mask);
} else {
let chunk_mask = self.validity.slice(chunk_offset..chunk_offset + N);
// Prepare the output validity mask for this chunk.
let mut chunk_validity = self.validity.split_off(N.min(self.validity.capacity()));
std::mem::swap(&mut self.validity, &mut chunk_validity);

// SAFETY: We have just set the elements length to N, and the validity buffer has
// capacity for N elements.
unsafe { output_vector.validity_mut() }.append_mask(&chunk_mask);
}
}
// For the final chunk, we may have fewer than N elements to unpack.
// So we just set the length of the output to the correct value.
unsafe { elements.set_len(chunk_validity.len()) };

self.num_chunks_unpacked += 1;

Ok(())
Ok(PVectorMut::new(elements, chunk_validity).into())
}
}

#[cfg(test)]
mod tests {
use crate::BitPackedArray;
use vortex_array::arrays::PrimitiveArray;
use vortex_dtype::PTypeDowncast;
use vortex_mask::Mask;
use vortex_vector::VectorOps;

use crate::BitPackedArray;

#[test]
fn test_bitpack_pipeline_basic() {
// Create exactly 1024 elements (0 to 1023).
Expand Down
1 change: 1 addition & 0 deletions vortex-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ vortex-scalar = { workspace = true }
vortex-session = { workspace = true }
vortex-utils = { workspace = true }
vortex-vector = { workspace = true }
bytes = "1.10.1"

[features]
arbitrary = [
Expand Down
14 changes: 9 additions & 5 deletions vortex-array/src/pipeline/bit_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,21 +323,23 @@ pub trait BitViewExt {
/// # Panics
///
/// If the bit buffer's bit-offset is not zero.
fn iter_bit_views(&self) -> impl Iterator<Item = BitView<'_>> + '_;
fn iter_bit_views(&self) -> impl Iterator<Item = (BitView<'_>, usize)> + '_;
}

impl BitViewExt for BitBuffer {
fn iter_bit_views(&self) -> impl Iterator<Item = BitView<'_>> + '_ {
fn iter_bit_views(&self) -> impl Iterator<Item = (BitView<'_>, usize)> + '_ {
assert_eq!(
self.offset(),
0,
"BitView iteration requires zero bit offset"
);
let n_views = self.len().div_ceil(N);
let final_view_len = self.len() % N;
BitViewIterator {
bits: self.inner().as_ref(),
view_idx: 0,
n_views,
final_view_len,
}
}
}
Expand All @@ -348,10 +350,12 @@ struct BitViewIterator<'a> {
view_idx: usize,
// The total number of views
n_views: usize,
// Final view len
final_view_len: usize,
}

impl<'a> Iterator for BitViewIterator<'a> {
type Item = BitView<'a>;
type Item = (BitView<'a>, usize);

fn next(&mut self) -> Option<Self::Item> {
if self.view_idx == self.n_views {
Expand All @@ -363,13 +367,13 @@ impl<'a> Iterator for BitViewIterator<'a> {

let bits = if end_byte <= self.bits.len() {
// Full view from the original bits
BitView::from_slice(&self.bits[start_byte..end_byte])
(BitView::from_slice(&self.bits[start_byte..end_byte]), N)
} else {
// Partial view, copy to scratch
let remaining_bytes = self.bits.len() - start_byte;
let mut remaining = [0u8; N_BYTES];
remaining[..remaining_bytes].copy_from_slice(&self.bits[start_byte..]);
BitView::new_owned(remaining)
(BitView::new_owned(remaining), self.final_view_len)
};

self.view_idx += 1;
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/pipeline/driver/allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
use vortex_error::{VortexExpect, VortexResult};
use vortex_vector::VectorMut;

use crate::Array;
use crate::pipeline::driver::{Node, NodeId};
use crate::pipeline::{N, VectorId};
use crate::pipeline::{VectorId, N};
use crate::Array;

#[derive(Debug)]
pub struct VectorAllocation {
Expand Down
12 changes: 6 additions & 6 deletions vortex-array/src/pipeline/driver/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ impl InputKernel {
impl Kernel for InputKernel {
fn step(
&mut self,
_ctx: &KernelCtx,
_ctx: &mut KernelCtx,
selection: &BitView,
out: &mut VectorMut,
) -> VortexResult<()> {
mut out: VectorMut,
) -> VortexResult<VectorMut> {
let mut batch = self
.batch
.take()
Expand All @@ -48,7 +48,7 @@ impl Kernel for InputKernel {
selection.iter_ones(|idx| {
out.extend_from_vector(&immutable.slice(idx..idx + 1));
});
return Ok(());
return Ok(out);
}

// We split off from our owned batch vector in chunks of size N, and then unsplit onto the
Expand All @@ -67,13 +67,13 @@ impl Kernel for InputKernel {

self.batch = Some(batch);

Ok(())
Ok(out)
}
}

#[cfg(test)]
mod test {
use vortex_buffer::{BitBuffer, bitbuffer, buffer};
use vortex_buffer::{bitbuffer, buffer, BitBuffer};
use vortex_dtype::PTypeDowncastExt;
use vortex_mask::Mask;

Expand Down
Loading
Loading