Skip to content

Commit 5b4679d

Browse files
authored
Fix RunEndArray filter (#1380)
fix #1368
1 parent 22552b3 commit 5b4679d

File tree

5 files changed

+98
-62
lines changed

5 files changed

+98
-62
lines changed

encodings/runend/src/compress.rs

Lines changed: 13 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
1-
use std::cmp::min;
2-
31
use arrow_buffer::BooleanBufferBuilder;
42
use itertools::Itertools;
5-
use num_traits::{AsPrimitive, FromPrimitive};
63
use vortex_array::array::{BoolArray, BooleanBuffer, PrimitiveArray};
74
use vortex_array::validity::Validity;
85
use vortex_array::variants::PrimitiveArrayTrait;
96
use vortex_array::ArrayDType;
107
use vortex_dtype::{match_each_integer_ptype, match_each_native_ptype, NativePType, Nullability};
11-
use vortex_error::{vortex_panic, VortexResult};
8+
use vortex_error::VortexResult;
9+
10+
use crate::iter::trimmed_ends_iter;
1211

1312
pub fn runend_encode(array: &PrimitiveArray) -> (PrimitiveArray, PrimitiveArray) {
1413
let validity = if array.dtype().nullability() == Nullability::NonNullable {
@@ -61,9 +60,8 @@ pub fn runend_decode_primitive(
6160
match_each_native_ptype!(values.ptype(), |$P| {
6261
match_each_integer_ptype!(ends.ptype(), |$E| {
6362
Ok(PrimitiveArray::from_vec(runend_decode_typed_primitive(
64-
ends.maybe_null_slice::<$E>(),
63+
trimmed_ends_iter(ends.maybe_null_slice::<$E>(), offset, length),
6564
values.maybe_null_slice::<$P>(),
66-
offset,
6765
length,
6866
), validity))
6967
})
@@ -79,67 +77,33 @@ pub fn runend_decode_bools(
7977
) -> VortexResult<BoolArray> {
8078
match_each_integer_ptype!(ends.ptype(), |$E| {
8179
BoolArray::try_new(runend_decode_typed_bool(
82-
ends.maybe_null_slice::<$E>(),
80+
trimmed_ends_iter(ends.maybe_null_slice::<$E>(), offset, length),
8381
values.boolean_buffer(),
84-
offset,
8582
length,
8683
), validity)
8784
})
8885
}
8986

90-
#[inline]
91-
fn trimmed_run_ends<E: NativePType + AsPrimitive<usize> + FromPrimitive + Ord>(
92-
run_ends: &[E],
93-
offset: usize,
94-
length: usize,
95-
) -> impl Iterator<Item = E> + use<'_, E> {
96-
let offset_e = E::from_usize(offset).unwrap_or_else(|| {
97-
vortex_panic!(
98-
"offset {} cannot be converted to {}",
99-
offset,
100-
std::any::type_name::<E>()
101-
)
102-
});
103-
let length_e = E::from_usize(length).unwrap_or_else(|| {
104-
vortex_panic!(
105-
"length {} cannot be converted to {}",
106-
length,
107-
std::any::type_name::<E>()
108-
)
109-
});
110-
run_ends
111-
.iter()
112-
.map(move |&v| v - offset_e)
113-
.map(move |v| min(v, length_e))
114-
}
115-
116-
pub fn runend_decode_typed_primitive<
117-
E: NativePType + AsPrimitive<usize> + FromPrimitive + Ord,
118-
T: NativePType,
119-
>(
120-
run_ends: &[E],
87+
pub fn runend_decode_typed_primitive<T: NativePType>(
88+
run_ends: impl Iterator<Item = usize>,
12189
values: &[T],
122-
offset: usize,
12390
length: usize,
12491
) -> Vec<T> {
125-
let trimmed_ends = trimmed_run_ends(run_ends, offset, length);
12692
let mut decoded = Vec::with_capacity(length);
127-
for (end, value) in trimmed_ends.zip_eq(values) {
128-
decoded.extend(std::iter::repeat_n(value, end.as_() - decoded.len()));
93+
for (end, value) in run_ends.zip_eq(values) {
94+
decoded.extend(std::iter::repeat_n(value, end - decoded.len()));
12995
}
13096
decoded
13197
}
13298

133-
pub fn runend_decode_typed_bool<E: NativePType + AsPrimitive<usize> + FromPrimitive + Ord>(
134-
run_ends: &[E],
99+
pub fn runend_decode_typed_bool(
100+
run_ends: impl Iterator<Item = usize>,
135101
values: BooleanBuffer,
136-
offset: usize,
137102
length: usize,
138103
) -> BooleanBuffer {
139-
let trimmed_ends = trimmed_run_ends(run_ends, offset, length);
140104
let mut decoded = BooleanBufferBuilder::new(length);
141-
for (end, value) in trimmed_ends.zip_eq(values.iter()) {
142-
decoded.append_n(end.as_() - decoded.len(), value);
105+
for (end, value) in run_ends.zip_eq(values.iter()) {
106+
decoded.append_n(end - decoded.len(), value);
143107
}
144108
decoded.finish()
145109
}

encodings/runend/src/compute.rs

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::cmp::min;
12
use std::ops::AddAssign;
23

34
use num_traits::AsPrimitive;
@@ -76,9 +77,8 @@ impl TakeFn for RunEndArray {
7677
let primitive_indices = indices.clone().into_primitive()?;
7778
let u64_indices = match_each_integer_ptype!(primitive_indices.ptype(), |$P| {
7879
primitive_indices
79-
.maybe_null_slice::<$P>()
80-
.iter()
81-
.copied()
80+
.into_maybe_null_slice::<$P>()
81+
.into_iter()
8282
.map(|idx| {
8383
let usize_idx = idx as usize;
8484
if usize_idx >= self.len() {
@@ -89,11 +89,11 @@ impl TakeFn for RunEndArray {
8989
})
9090
.collect::<VortexResult<Vec<u64>>>()?
9191
});
92-
let physical_indices: Vec<u64> = self
92+
let physical_indices = self
9393
.find_physical_indices(&u64_indices)?
94-
.iter()
95-
.map(|idx| *idx as u64)
96-
.collect();
94+
.into_iter()
95+
.map(|idx| idx as u64)
96+
.collect::<Vec<_>>();
9797
let physical_indices_array = PrimitiveArray::from(physical_indices).into_array();
9898
let dense_values = take(self.values(), &physical_indices_array, options)?;
9999

@@ -146,12 +146,12 @@ impl SliceFn for RunEndArray {
146146

147147
impl FilterFn for RunEndArray {
148148
fn filter(&self, mask: FilterMask) -> VortexResult<ArrayData> {
149+
let validity = self.validity().filter(&mask)?;
149150
let primitive_run_ends = self.ends().into_primitive()?;
150-
let (run_ends, mask) = match_each_unsigned_integer_ptype!(primitive_run_ends.ptype(), |$P| {
151-
filter_run_ends(primitive_run_ends.maybe_null_slice::<$P>(), mask)?
151+
let (run_ends, values_mask) = match_each_unsigned_integer_ptype!(primitive_run_ends.ptype(), |$P| {
152+
filter_run_ends(primitive_run_ends.maybe_null_slice::<$P>(), self.offset() as u64, self.len() as u64, mask)?
152153
});
153-
let validity = self.validity().filter(&mask)?;
154-
let values = filter(&self.values(), mask)?;
154+
let values = filter(&self.values(), values_mask)?;
155155

156156
RunEndArray::try_new(run_ends.into_array(), values, validity).map(|a| a.into_array())
157157
}
@@ -160,6 +160,8 @@ impl FilterFn for RunEndArray {
160160
// Code adapted from apache arrow-rs https://github.com/apache/arrow-rs/blob/b1f5c250ebb6c1252b4e7c51d15b8e77f4c361fa/arrow-select/src/filter.rs#L425
161161
fn filter_run_ends<R: NativePType + AddAssign + From<bool> + AsPrimitive<u64>>(
162162
run_ends: &[R],
163+
offset: u64,
164+
length: u64,
163165
mask: FilterMask,
164166
) -> VortexResult<(PrimitiveArray, FilterMask)> {
165167
let mut new_run_ends = vec![R::zero(); run_ends.len()];
@@ -171,7 +173,7 @@ fn filter_run_ends<R: NativePType + AddAssign + From<bool> + AsPrimitive<u64>>(
171173

172174
let new_mask: FilterMask = BooleanBuffer::collect_bool(run_ends.len(), |i| {
173175
let mut keep = false;
174-
let end = run_ends[i].as_();
176+
let end = min(run_ends[i].as_() - offset, length);
175177

176178
// Safety: predicate must be the same length as the array the ends have been taken from
177179
for pred in (start..end).map(|i| unsafe { filter_values.value_unchecked(i as usize) }) {
@@ -464,6 +466,34 @@ mod test {
464466
);
465467
}
466468

469+
#[test]
470+
fn filter_sliced_run_end() {
471+
let arr = slice(ree_array(), 2, 7).unwrap();
472+
let filtered = filter(
473+
&arr,
474+
FilterMask::from_iter([true, false, false, true, true]),
475+
)
476+
.unwrap();
477+
let filtered_run_end = RunEndArray::try_from(filtered).unwrap();
478+
479+
assert_eq!(
480+
filtered_run_end
481+
.ends()
482+
.into_primitive()
483+
.unwrap()
484+
.maybe_null_slice::<u64>(),
485+
[1, 2, 3]
486+
);
487+
assert_eq!(
488+
filtered_run_end
489+
.values()
490+
.into_primitive()
491+
.unwrap()
492+
.maybe_null_slice::<i32>(),
493+
[1, 4, 2]
494+
);
495+
}
496+
467497
#[test]
468498
fn compare_run_end() {
469499
let arr = ree_array();

encodings/runend/src/iter.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use std::cmp::min;
2+
3+
use num_traits::{AsPrimitive, FromPrimitive};
4+
use vortex_dtype::NativePType;
5+
use vortex_error::vortex_panic;
6+
7+
#[inline]
8+
pub fn trimmed_ends_iter<E: NativePType + FromPrimitive + AsPrimitive<usize> + Ord>(
9+
run_ends: &[E],
10+
offset: usize,
11+
length: usize,
12+
) -> impl Iterator<Item = usize> + use<'_, E> {
13+
let offset_e = E::from_usize(offset).unwrap_or_else(|| {
14+
vortex_panic!(
15+
"offset {} cannot be converted to {}",
16+
offset,
17+
std::any::type_name::<E>()
18+
)
19+
});
20+
let length_e = E::from_usize(length).unwrap_or_else(|| {
21+
vortex_panic!(
22+
"length {} cannot be converted to {}",
23+
length,
24+
std::any::type_name::<E>()
25+
)
26+
});
27+
run_ends
28+
.iter()
29+
.copied()
30+
.map(move |v| v - offset_e)
31+
.map(move |v| min(v, length_e))
32+
.map(|v| v.as_())
33+
}

encodings/runend/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ pub use array::*;
33
mod array;
44
pub mod compress;
55
mod compute;
6+
mod iter;

fuzz/fuzz_targets/array_ops.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,15 @@ fn assert_search_sorted(
9393
}
9494

9595
fn assert_array_eq(lhs: &ArrayData, rhs: &ArrayData, step: usize) {
96-
assert_eq!(lhs.len(), rhs.len());
96+
assert_eq!(
97+
lhs.len(),
98+
rhs.len(),
99+
"LHS len {} != RHS len {}, lhs is {} rhs is {} in step {step}",
100+
lhs.len(),
101+
rhs.len(),
102+
lhs.encoding().id(),
103+
rhs.encoding().id()
104+
);
97105
for idx in 0..lhs.len() {
98106
let l = scalar_at(lhs, idx).unwrap();
99107
let r = scalar_at(rhs, idx).unwrap();

0 commit comments

Comments
 (0)