Skip to content

Commit 8c2ab3d

Browse files
authored
refactor: Extract RowMask creation and filtering to separate struct (#1272)
fix #1243
1 parent 42b7527 commit 8c2ab3d

File tree

7 files changed

+309
-99
lines changed

7 files changed

+309
-99
lines changed

vortex-array/src/array/sparse/mod.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ pub struct SparseMetadata {
2727
indices_offset: usize,
2828
indices_len: usize,
2929
fill_value: ScalarValue,
30+
u64_indices: bool,
3031
}
3132

3233
impl Display for SparseMetadata {
@@ -52,7 +53,7 @@ impl SparseArray {
5253
indices_offset: usize,
5354
fill_value: ScalarValue,
5455
) -> VortexResult<Self> {
55-
if !matches!(indices.dtype(), &DType::IDX) {
56+
if !matches!(indices.dtype(), &DType::IDX | &DType::IDX_32) {
5657
vortex_bail!("Cannot use {} as indices", indices.dtype());
5758
}
5859
if !fill_value.is_instance_of(values.dtype()) {
@@ -85,6 +86,7 @@ impl SparseArray {
8586
indices_offset,
8687
indices_len: indices.len(),
8788
fill_value,
89+
u64_indices: matches!(indices.dtype(), &DType::IDX),
8890
},
8991
[indices, values].into(),
9092
StatsSet::new(),
@@ -106,7 +108,15 @@ impl SparseArray {
106108
#[inline]
107109
pub fn indices(&self) -> Array {
108110
self.as_ref()
109-
.child(0, &DType::IDX, self.metadata().indices_len)
111+
.child(
112+
0,
113+
if self.metadata().u64_indices {
114+
&DType::IDX
115+
} else {
116+
&DType::IDX_32
117+
},
118+
self.metadata().indices_len,
119+
)
110120
.vortex_expect("Missing indices array in SparseArray")
111121
}
112122

vortex-array/src/array/sparse/variants.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ impl BoolArrayTrait for SparseArray {
6464
}
6565

6666
fn maybe_null_indices_iter(&self) -> Box<dyn Iterator<Item = usize>> {
67-
Box::new(self.resolved_indices().into_iter())
67+
// TODO(robert): Indices of the array can include true and false values, fill value can be true
68+
todo!()
6869
}
6970

7071
fn maybe_null_slices_iter(&self) -> Box<dyn Iterator<Item = (usize, usize)>> {

vortex-dtype/src/dtype.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ impl DType {
4747
/// The default DType for indices
4848
pub const IDX: Self = Primitive(PType::U64, Nullability::NonNullable);
4949

50+
/// The DType for small indices (primarily created from bitmaps)
51+
pub const IDX_32: Self = Primitive(PType::U32, Nullability::NonNullable);
52+
5053
/// Get the nullability of the DType
5154
pub fn nullability(&self) -> Nullability {
5255
self.is_nullable().into()

vortex-serde/src/file/read/mask.rs

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ use std::fmt::{Display, Formatter};
33

44
use arrow_buffer::{BooleanBuffer, MutableBuffer};
55
use croaring::Bitmap;
6-
use vortex_array::array::{BoolArray, PrimitiveArray};
6+
use vortex_array::array::{BoolArray, PrimitiveArray, SparseArray};
77
use vortex_array::compute::{filter, slice, take};
88
use vortex_array::validity::{LogicalValidity, Validity};
9-
use vortex_array::{iterate_integer_array, Array, IntoArray};
9+
use vortex_array::{iterate_integer_array, Array, IntoArray, IntoArrayVariant};
1010
use vortex_dtype::PType;
1111
use vortex_error::{vortex_bail, vortex_err, VortexResult};
1212

@@ -95,7 +95,7 @@ impl RowMask {
9595

9696
/// Construct a RowMask from an integral array.
9797
///
98-
/// The array values are intepreted as indices and those indices are kept by the returned mask.
98+
/// The array values are interpreted as indices and those indices are kept by the returned mask.
9999
pub fn from_index_array(array: &Array, begin: usize, end: usize) -> VortexResult<Self> {
100100
array.with_dyn(|a| {
101101
let err = || vortex_err!(InvalidArgument: "index array must be integers in the range [0, 2^32)");
@@ -119,6 +119,16 @@ impl RowMask {
119119
})
120120
}
121121

122+
/// Combine the RowMask with bitmask values resulting in new RowMask containing only values true in the bitmask
123+
pub fn and_bitmask(self, bitmask: Array) -> VortexResult<Self> {
124+
// TODO(robert): Avoid densifying sparse values just to get true indices
125+
let sparse_mask =
126+
SparseArray::try_new(self.to_indices_array()?, bitmask, self.len(), false.into())?
127+
.into_array()
128+
.into_bool()?;
129+
Self::from_mask_array(sparse_mask.as_ref(), self.begin(), self.end())
130+
}
131+
122132
pub fn is_empty(&self) -> bool {
123133
self.values.is_empty()
124134
}
@@ -243,12 +253,24 @@ mod tests {
243253
use crate::file::read::mask::RowMask;
244254

245255
#[rstest]
246-
#[case(RowMask::try_new((0..2).chain(9..10).collect(), 0, 10).unwrap(), (0, 1), RowMask::try_new((0..1).collect(), 0, 1).unwrap())]
247-
#[case(RowMask::try_new((5..8).chain(9..10).collect(), 0, 10).unwrap(), (2, 5), RowMask::try_new(Bitmap::new(), 2, 5).unwrap())]
248-
#[case(RowMask::try_new((0..4).collect(), 0, 10).unwrap(), (2, 5), RowMask::try_new((0..2).collect(), 2, 5).unwrap())]
249-
#[case(RowMask::try_new((0..3).chain(5..6).collect(), 0, 10).unwrap(), (2, 6), RowMask::try_new((0..1).chain(3..4).collect(), 2, 6).unwrap())]
250-
#[case(RowMask::try_new((5..10).collect(), 0, 10).unwrap(), (7, 11), RowMask::try_new((0..3).collect(), 7, 10).unwrap())]
251-
#[case(RowMask::try_new((1..6).collect(), 3, 9).unwrap(), (0, 5), RowMask::try_new((1..2).collect(), 3, 5).unwrap())]
256+
#[case(
257+
RowMask::try_new((0..2).chain(9..10).collect(), 0, 10).unwrap(), (0, 1),
258+
RowMask::try_new((0..1).collect(), 0, 1).unwrap())]
259+
#[case(
260+
RowMask::try_new((5..8).chain(9..10).collect(), 0, 10).unwrap(), (2, 5),
261+
RowMask::try_new(Bitmap::new(), 2, 5).unwrap())]
262+
#[case(
263+
RowMask::try_new((0..4).collect(), 0, 10).unwrap(), (2, 5),
264+
RowMask::try_new((0..2).collect(), 2, 5).unwrap())]
265+
#[case(
266+
RowMask::try_new((0..3).chain(5..6).collect(), 0, 10).unwrap(), (2, 6),
267+
RowMask::try_new((0..1).chain(3..4).collect(), 2, 6).unwrap())]
268+
#[case(
269+
RowMask::try_new((5..10).collect(), 0, 10).unwrap(), (7, 11),
270+
RowMask::try_new((0..3).collect(), 7, 10).unwrap())]
271+
#[case(
272+
RowMask::try_new((1..6).collect(), 3, 9).unwrap(), (0, 5),
273+
RowMask::try_new((1..2).collect(), 3, 5).unwrap())]
252274
#[cfg_attr(miri, ignore)]
253275
fn slice(#[case] first: RowMask, #[case] range: (usize, usize), #[case] expected: RowMask) {
254276
assert_eq!(first.slice(range.0, range.1), expected);

vortex-serde/src/file/read/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ mod filtering;
1414
pub mod layouts;
1515
mod mask;
1616
mod recordbatchreader;
17+
mod splits;
1718
mod stream;
1819

1920
pub use builder::initial_read::*;
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
use std::collections::{BTreeSet, VecDeque};
2+
use std::mem;
3+
use std::sync::atomic::{AtomicBool, Ordering};
4+
5+
use itertools::Itertools;
6+
use vortex_array::stats::ArrayStatistics;
7+
use vortex_error::{vortex_bail, VortexExpect, VortexResult};
8+
9+
use crate::file::{BatchRead, LayoutReader, MessageLocator, RowMask};
10+
11+
pub enum SplitMask {
12+
ReadMore(Vec<MessageLocator>),
13+
Mask(RowMask),
14+
}
15+
16+
enum SplitState {
17+
Ranges(Box<dyn Iterator<Item = (usize, usize)> + Send>),
18+
Splits(BTreeSet<usize>),
19+
}
20+
21+
/// Iterator over row ranges of a vortex file with bitmaps of valid values in those ranges
22+
pub trait MaskIterator: Iterator<Item = VortexResult<SplitMask>> + Send {
23+
/// Register additional horizontal row boundaries to split the generated layout on
24+
fn additional_splits(&mut self, splits: &mut BTreeSet<usize>) -> VortexResult<()>;
25+
}
26+
27+
/// MaskIterator that reads boolean arrays out of provided reader and further filters generated masks
28+
///
29+
/// Arrays returned by the reader must be of boolean dtype.
30+
pub struct FilteringRowSplitIterator {
31+
reader: Box<dyn LayoutReader>,
32+
static_splits: FixedSplitIterator,
33+
in_progress_masks: VecDeque<RowMask>,
34+
registered_splits: AtomicBool,
35+
}
36+
37+
impl FilteringRowSplitIterator {
38+
pub fn new(reader: Box<dyn LayoutReader>, row_count: u64, row_mask: Option<RowMask>) -> Self {
39+
let static_splits = FixedSplitIterator::new(row_count, row_mask);
40+
Self {
41+
reader,
42+
static_splits,
43+
in_progress_masks: VecDeque::new(),
44+
registered_splits: AtomicBool::new(false),
45+
}
46+
}
47+
48+
/// Read given mask out of the reader
49+
fn read_mask(&mut self, mask: RowMask) -> VortexResult<Option<SplitMask>> {
50+
if let Some(rs) = self.reader.read_selection(&mask)? {
51+
return match rs {
52+
BatchRead::ReadMore(rm) => {
53+
// If the reader needs more data we put the mask back into queue for to come back to it later
54+
self.in_progress_masks.push_back(mask);
55+
Ok(Some(SplitMask::ReadMore(rm)))
56+
}
57+
BatchRead::Batch(batch) => {
58+
// If the mask is all FALSE we can safely discard it
59+
if batch
60+
.statistics()
61+
.compute_true_count()
62+
.vortex_expect("must be a bool array if it's a result of a filter")
63+
== 0
64+
{
65+
return Ok(None);
66+
}
67+
// Combine requested mask with the result of filter read
68+
Ok(Some(SplitMask::Mask(mask.and_bitmask(batch)?)))
69+
}
70+
};
71+
}
72+
Ok(None)
73+
}
74+
75+
/// Return next not all false mask or request to read more data.
76+
fn next_mask(&mut self) -> VortexResult<Option<SplitMask>> {
77+
if !self.registered_splits.swap(true, Ordering::SeqCst) {
78+
let mut own_splits = BTreeSet::new();
79+
self.reader.add_splits(0, &mut own_splits)?;
80+
self.static_splits.additional_splits(&mut own_splits)?;
81+
}
82+
83+
// First consider masks we have previously started reading to return them in order
84+
while let Some(mask) = self.in_progress_masks.pop_front() {
85+
if let Some(read_mask) = self.read_mask(mask)? {
86+
return Ok(Some(read_mask));
87+
}
88+
}
89+
90+
// Lastly take next statically generated mask and perform read with it on our reader
91+
while let Some(mask) = self.static_splits.next() {
92+
match mask? {
93+
SplitMask::ReadMore(_) => {
94+
unreachable!("StaticSplitProducer never returns ReadMore")
95+
}
96+
SplitMask::Mask(m) => {
97+
if let Some(read_mask) = self.read_mask(m)? {
98+
return Ok(Some(read_mask));
99+
}
100+
}
101+
}
102+
}
103+
Ok(None)
104+
}
105+
}
106+
107+
impl MaskIterator for FilteringRowSplitIterator {
108+
fn additional_splits(&mut self, splits: &mut BTreeSet<usize>) -> VortexResult<()> {
109+
self.static_splits.additional_splits(splits)
110+
}
111+
}
112+
113+
impl Iterator for FilteringRowSplitIterator {
114+
type Item = VortexResult<SplitMask>;
115+
116+
fn next(&mut self) -> Option<Self::Item> {
117+
self.next_mask().transpose()
118+
}
119+
}
120+
121+
pub struct FixedSplitIterator {
122+
splits: SplitState,
123+
row_mask: Option<RowMask>,
124+
}
125+
126+
impl FixedSplitIterator {
127+
pub fn new(row_count: u64, row_mask: Option<RowMask>) -> Self {
128+
let mut splits = BTreeSet::new();
129+
splits.insert(row_count as usize);
130+
Self {
131+
splits: SplitState::Splits(splits),
132+
row_mask,
133+
}
134+
}
135+
}
136+
137+
impl MaskIterator for FixedSplitIterator {
138+
fn additional_splits(&mut self, splits: &mut BTreeSet<usize>) -> VortexResult<()> {
139+
match &mut self.splits {
140+
SplitState::Ranges(_) => {
141+
vortex_bail!("Can't insert additional splits if we started producing row ranges")
142+
}
143+
SplitState::Splits(s) => {
144+
s.append(splits);
145+
Ok(())
146+
}
147+
}
148+
}
149+
}
150+
151+
impl Iterator for FixedSplitIterator {
152+
type Item = VortexResult<SplitMask>;
153+
154+
fn next(&mut self) -> Option<Self::Item> {
155+
match &mut self.splits {
156+
SplitState::Ranges(ranges) => {
157+
// Find next range that's not filtered out by supplied row_mask
158+
for (begin, end) in ranges {
159+
return if let Some(ref row_mask) = self.row_mask {
160+
if row_mask.slice(begin, end).is_empty() {
161+
continue;
162+
}
163+
Some(Ok(SplitMask::Mask(row_mask.slice(begin, end))))
164+
} else {
165+
Some(Ok(SplitMask::Mask(RowMask::new_valid_between(begin, end))))
166+
};
167+
}
168+
None
169+
}
170+
SplitState::Splits(s) => {
171+
self.splits = SplitState::Ranges(Box::new(
172+
mem::take(s).into_iter().tuple_windows::<(usize, usize)>(),
173+
));
174+
self.next()
175+
}
176+
}
177+
}
178+
}
179+
180+
#[cfg(test)]
181+
mod tests {
182+
use std::collections::BTreeSet;
183+
184+
use vortex_error::VortexResult;
185+
186+
use crate::file::read::splits::{FixedSplitIterator, MaskIterator, SplitMask};
187+
use crate::file::RowMask;
188+
189+
#[test]
190+
#[should_panic]
191+
#[cfg_attr(miri, ignore)]
192+
fn register_after_start() {
193+
let mut mask_iter = FixedSplitIterator::new(10, None);
194+
mask_iter
195+
.additional_splits(&mut BTreeSet::from([0, 1, 2]))
196+
.unwrap();
197+
assert!(mask_iter.next().is_some());
198+
mask_iter
199+
.additional_splits(&mut BTreeSet::from([5]))
200+
.unwrap();
201+
mask_iter.next();
202+
}
203+
204+
#[test]
205+
#[cfg_attr(miri, ignore)]
206+
fn filters_empty() {
207+
let mut mask_iter =
208+
FixedSplitIterator::new(10, Some(RowMask::try_new((4..6).collect(), 0, 10).unwrap()));
209+
mask_iter
210+
.additional_splits(&mut BTreeSet::from([2, 4, 6, 8, 10]))
211+
.unwrap();
212+
assert_eq!(
213+
mask_iter
214+
.map(|split| split.map(|mask| match mask {
215+
SplitMask::ReadMore(_) => unreachable!("Will never read more"),
216+
SplitMask::Mask(m) => m,
217+
}))
218+
.collect::<VortexResult<Vec<_>>>()
219+
.unwrap(),
220+
vec![RowMask::new_valid_between(4, 6)]
221+
);
222+
}
223+
}

0 commit comments

Comments
 (0)