Skip to content

Commit a2b63ac

Browse files
committed
Vector pipeline
Signed-off-by: Nicholas Gates <[email protected]>
1 parent 546ede2 commit a2b63ac

File tree

4 files changed

+139
-7
lines changed

4 files changed

+139
-7
lines changed

vortex-array/src/pipeline/driver/output.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ impl Sink for OutputSink {
3737
_ => {
3838
// Otherwise, we know that the vector has not yet been filtered.
3939
assert_eq!(vector.len(), N, "it must therefore be len = N");
40+
// TODO(ngates): it would be great to have a `filter_into` that avoids the extra
41+
// copy here.
4042
let vector = vector.filter(selection);
4143
self.vector.extend_from_vector(&vector);
4244
}

vortex-compute/src/filter/buffer.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,7 @@ impl<const NB: usize, T: Copy> Filter<BitView<'_, NB>> for &Buffer<T> {
5656
type Output = Buffer<T>;
5757

5858
fn filter(self, selection: &BitView<'_, NB>) -> Self::Output {
59-
// TODO(ngates): this is very very slow!
60-
let elems = self.as_slice();
61-
let mut out = BufferMut::<T>::with_capacity(selection.true_count());
62-
selection.iter_ones(|idx| {
63-
unsafe { out.push_unchecked(elems[idx]) };
64-
});
65-
out.freeze()
59+
self.as_slice().filter(selection)
6660
}
6761
}
6862

vortex-compute/src/filter/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
mod bitbuffer;
77
mod buffer;
88
mod mask;
9+
mod slice;
910
mod slice_mut;
1011
mod vector;
1112

vortex-compute/src/filter/slice.rs

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
//! In-place filter implementations over mutable slices.
5+
//!
6+
//! Note that there is no `slice` module in `vortex-buffer` because slices always require a copy
7+
//! to filter them. Therefore, it's likely better to implement the filter against the actual
8+
//! zero-copy container type e.g. Buffer.
9+
10+
// This is modeled after the constant with the equivalent name in arrow-rs.
11+
pub(super) const FILTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.8;
12+
13+
use std::ptr;
14+
15+
use vortex_buffer::{BitView, Buffer, BufferMut};
16+
use vortex_mask::{Mask, MaskIter};
17+
18+
use crate::filter::Filter;
19+
20+
impl<T: Copy> Filter<Mask> for &[T] {
21+
type Output = Buffer<T>;
22+
23+
fn filter(self, selection_mask: &Mask) -> Buffer<T> {
24+
assert_eq!(
25+
selection_mask.len(),
26+
self.len(),
27+
"Selection mask length must equal the buffer length"
28+
);
29+
30+
match selection_mask {
31+
Mask::AllTrue(_) => Buffer::<T>::copy_from(self),
32+
Mask::AllFalse(_) => Buffer::empty(),
33+
Mask::Values(v) => match v.threshold_iter(FILTER_SLICES_SELECTIVITY_THRESHOLD) {
34+
MaskIter::Indices(indices) => filter_indices(self, indices),
35+
MaskIter::Slices(slices) => {
36+
filter_slices(self, selection_mask.true_count(), slices).freeze()
37+
}
38+
},
39+
}
40+
}
41+
}
42+
43+
pub(super) fn filter_indices<T: Copy>(values: &[T], indices: &[usize]) -> Buffer<T> {
44+
Buffer::<T>::from_trusted_len_iter(indices.iter().map(|&idx| values[idx]))
45+
}
46+
47+
pub(super) fn filter_slices<T>(
48+
values: &[T],
49+
output_len: usize,
50+
slices: &[(usize, usize)],
51+
) -> BufferMut<T> {
52+
let mut out = BufferMut::<T>::with_capacity(output_len);
53+
for (start, end) in slices {
54+
out.extend_from_slice(&values[*start..*end]);
55+
}
56+
out
57+
}
58+
59+
impl<const NB: usize, T: Copy> Filter<BitView<'_, NB>> for &[T] {
60+
type Output = Buffer<T>;
61+
62+
fn filter(self, mask: &BitView<NB>) -> Self::Output {
63+
assert_eq!(
64+
self.len(),
65+
BitView::<NB>::N,
66+
"Mask length must equal the slice length"
67+
);
68+
69+
let mut read_ptr = self.as_ptr();
70+
71+
let mut write = BufferMut::<T>::with_capacity(mask.true_count());
72+
unsafe { write.set_len(mask.true_count()) };
73+
74+
let mut write_ptr = write.as_mut_ptr();
75+
76+
// First we loop 64 elements at a time (usize::BITS)
77+
for mut word in mask.iter_words() {
78+
match word {
79+
0usize => {
80+
// No bits set => skip usize::BITS slice.
81+
unsafe {
82+
read_ptr = read_ptr.add(usize::BITS as usize);
83+
}
84+
}
85+
usize::MAX => {
86+
// All slice => copy usize::BITS slice.
87+
unsafe {
88+
ptr::copy_nonoverlapping(read_ptr, write_ptr, usize::BITS as usize);
89+
read_ptr = read_ptr.add(usize::BITS as usize);
90+
write_ptr = write_ptr.add(usize::BITS as usize);
91+
}
92+
}
93+
_ => {
94+
// Iterate the bits in a word, attempting to copy contiguous runs of values.
95+
let mut read_pos = 0;
96+
let mut write_pos = 0;
97+
while word != 0 {
98+
let tz = word.trailing_zeros();
99+
if tz > 0 {
100+
// shift off the trailing zeros since they are unselected.
101+
// this advances the read head, but not the write head.
102+
read_pos += tz;
103+
word >>= tz;
104+
continue;
105+
}
106+
107+
// copy the next several values to our out pointer.
108+
let extent = word.trailing_ones();
109+
unsafe {
110+
ptr::copy_nonoverlapping(
111+
read_ptr.add(read_pos as usize),
112+
write_ptr.add(write_pos as usize),
113+
extent as usize,
114+
);
115+
}
116+
// Advance the reader and writer by the number of values
117+
// we just copied.
118+
read_pos += extent;
119+
write_pos += extent;
120+
121+
// shift off the low bits of the word so we can copy the next run.
122+
word >>= extent;
123+
}
124+
125+
unsafe {
126+
read_ptr = read_ptr.add(usize::BITS as usize);
127+
write_ptr = write_ptr.add(write_pos as usize);
128+
};
129+
}
130+
}
131+
}
132+
133+
write.freeze()
134+
}
135+
}

0 commit comments

Comments
 (0)