Skip to content

Commit 98ac114

Browse files
authored
Vortex Layouts - chunk pruning (#1824)
1 parent e568fab commit 98ac114

File tree

7 files changed

+135
-64
lines changed

7 files changed

+135
-64
lines changed

vortex-expr/src/pruning.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ impl PruningPredicate {
127127
///
128128
/// Returns Ok(None) if any of the required statistics are not present in metadata.
129129
/// If it returns Ok(Some(array)), the array is a boolean array with the same length as the
130-
/// metadata, and the values indicate whether the corresponding chunk can be pruned.
130+
/// metadata, and a true value means the chunk _can_ be pruned.
131131
pub fn evaluate(&self, metadata: &ArrayData) -> VortexResult<Option<ArrayData>> {
132132
let known_stats = HashSet::from_iter(
133133
metadata

vortex-layout/src/layouts/chunked/scan.rs

Lines changed: 120 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
use std::sync::{Arc, OnceLock, RwLock};
22

3+
use arrow_buffer::BooleanBuffer;
34
use vortex_array::array::{ChunkedArray, StructArray};
45
use vortex_array::stats::{stats_from_bitset_bytes, Stat};
56
use vortex_array::validity::Validity;
6-
use vortex_array::{ArrayData, ContextRef, IntoArrayData};
7+
use vortex_array::{ArrayData, ContextRef, IntoArrayData, IntoArrayVariant};
78
use vortex_dtype::DType;
8-
use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult};
9+
use vortex_error::{vortex_err, vortex_panic, VortexError, VortexExpect, VortexResult};
10+
use vortex_expr::pruning::PruningPredicate;
911

1012
use crate::layouts::chunked::stats::StatsTable;
1113
use crate::layouts::chunked::ChunkedLayout;
@@ -29,13 +31,19 @@ use crate::{LayoutData, LayoutEncoding, RowMask};
2931
/// boundary we will read the chunk twice). However, if we have overlapping row ranges, as can
3032
/// happen if the parent is performing multiple scans (filter + projection), then we may read the
3133
/// same chunk many times.
34+
#[derive(Clone, Debug)]
3235
pub struct ChunkedScan {
3336
layout: LayoutData,
3437
scan: Scan,
3538
dtype: DType,
3639
ctx: ContextRef,
40+
// Shared stats table scanner
3741
stats_scanner: Arc<RwLock<Box<dyn Scanner>>>,
42+
// Cached pruning mask for the scan
43+
pruning_mask: Arc<OnceLock<Option<BooleanBuffer>>>,
44+
// Shared lazy chunk scanners
3845
chunk_scans: Arc<[OnceLock<Box<dyn LayoutScan>>]>,
46+
// The stats that are present in the layout
3947
present_stats: Arc<[Stat]>,
4048
}
4149

@@ -66,7 +74,6 @@ impl ChunkedScan {
6674
.unwrap_or_default()
6775
.into();
6876

69-
// Construct a scanner for the stats array
7077
let stats_scanner = layout
7178
.metadata()
7279
.is_some()
@@ -100,6 +107,7 @@ impl ChunkedScan {
100107
dtype,
101108
ctx,
102109
stats_scanner: Arc::new(RwLock::new(stats_scanner)),
110+
pruning_mask: Arc::new(OnceLock::new()),
103111
chunk_scans,
104112
present_stats,
105113
})
@@ -117,34 +125,23 @@ impl LayoutScan for ChunkedScan {
117125

118126
fn create_scanner(&self, mask: RowMask) -> VortexResult<Box<dyn Scanner>> {
119127
Ok(Box::new(ChunkedScanner {
120-
scan: self.scan.clone(),
121-
layout: self.layout.clone(),
122-
ctx: self.ctx.clone(),
123-
scan_dtype: self.dtype.clone(),
124-
present_stats: self.present_stats.clone(),
125-
stats_scanner: self.stats_scanner.clone(),
126-
chunk_scans: self.chunk_scans.clone(),
128+
chunked_scan: self.clone(),
127129
mask,
128130
chunk_states: None,
129131
}) as _)
130132
}
131133
}
132134

133135
/// A scanner for a chunked layout.
136+
#[derive(Debug)]
134137
struct ChunkedScanner {
135-
scan: Scan,
136-
layout: LayoutData,
137-
ctx: ContextRef,
138-
scan_dtype: DType,
139-
present_stats: Arc<[Stat]>,
140-
stats_scanner: Arc<RwLock<Box<dyn Scanner>>>,
141-
chunk_scans: Arc<[OnceLock<Box<dyn LayoutScan>>]>,
138+
chunked_scan: ChunkedScan,
142139
mask: RowMask,
143-
144140
// State for each chunk in the layout
145141
chunk_states: Option<Vec<ChunkState>>,
146142
}
147143

144+
#[derive(Debug)]
148145
enum ChunkState {
149146
Pending(Box<dyn Scanner>),
150147
Resolved(Option<ArrayData>),
@@ -155,33 +152,55 @@ impl Scanner for ChunkedScanner {
155152
// If we haven't set up our chunk state yet, then fetch the stats table and do so.
156153
if self.chunk_states.is_none() {
157154
// First, we grab the stats table
158-
let _stats_table = match self
155+
let stats_table = match self
156+
.chunked_scan
159157
.stats_scanner
160158
.write()
161159
.map_err(|_| vortex_err!("poisoned"))?
162160
.poll(segments)?
163161
{
164162
Poll::Some(stats_array) => StatsTable::try_new(
165-
self.layout.dtype().clone(),
163+
self.chunked_scan.layout.dtype().clone(),
166164
stats_array,
167-
self.present_stats.clone(),
165+
self.chunked_scan.present_stats.clone(),
168166
)?,
169167
Poll::NeedMore(segments) => {
170168
// Otherwise, we need more segments to read the stats table.
171169
return Ok(Poll::NeedMore(segments));
172170
}
173171
};
174172

173+
// And compute the pruning predicate
174+
let pruning_mask = self.chunked_scan.pruning_mask.get_or_try_init(|| {
175+
Ok::<_, VortexError>(
176+
self.chunked_scan
177+
.scan
178+
.filter
179+
.as_ref()
180+
.and_then(PruningPredicate::try_new)
181+
.and_then(|predicate| predicate.evaluate(stats_table.array()).transpose())
182+
.transpose()?
183+
.map(|mask| mask.into_bool())
184+
.transpose()?
185+
.map(|mask| mask.boolean_buffer()),
186+
)
187+
})?;
188+
175189
// Now we can set up the chunk state.
176-
let mut chunks = Vec::with_capacity(self.chunk_scans.len());
190+
let mut chunks = Vec::with_capacity(self.chunked_scan.chunk_scans.len());
177191
let mut row_offset = 0;
178-
for chunk_idx in 0..self.chunk_scans.len() {
179-
let chunk_scan = self.chunk_scans[chunk_idx].get_or_try_init(|| {
180-
self.layout
181-
.child(chunk_idx, self.layout.dtype().clone())
182-
.vortex_expect("Child index out of bounds")
183-
.new_scan(self.scan.clone(), self.ctx.clone())
184-
})?;
192+
for chunk_idx in 0..self.chunked_scan.chunk_scans.len() {
193+
let chunk_scan =
194+
self.chunked_scan.chunk_scans[chunk_idx].get_or_try_init(|| {
195+
self.chunked_scan
196+
.layout
197+
.child(chunk_idx, self.chunked_scan.layout.dtype().clone())
198+
.vortex_expect("Child index out of bounds")
199+
.new_scan(
200+
self.chunked_scan.scan.clone(),
201+
self.chunked_scan.ctx.clone(),
202+
)
203+
})?;
185204

186205
// Figure out the row range of the chunk
187206
let chunk_len = chunk_scan.layout().row_count();
@@ -194,8 +213,13 @@ impl Scanner for ChunkedScanner {
194213
continue;
195214
}
196215

197-
// Try to skip the chunk based on the stats table
198-
// TODO
216+
// Try to skip the chunk based on the pruning predicate
217+
if let Some(pruning_mask) = pruning_mask {
218+
if pruning_mask.value(chunk_idx) {
219+
chunks.push(ChunkState::Resolved(None));
220+
continue;
221+
}
222+
}
199223

200224
// Otherwise, we need to read it. So we set up a mask for the chunk range.
201225
let chunk_mask = self
@@ -250,7 +274,7 @@ impl Scanner for ChunkedScanner {
250274
.collect::<Vec<_>>();
251275

252276
Ok(Poll::Some(
253-
ChunkedArray::try_new(chunks, self.scan_dtype.clone())?.into_array(),
277+
ChunkedArray::try_new(chunks, self.chunked_scan.dtype.clone())?.into_array(),
254278
))
255279
}
256280
}
@@ -263,25 +287,37 @@ enum PollStats {
263287

264288
#[cfg(test)]
265289
mod test {
266-
use vortex_array::{ArrayDType, ArrayLen, IntoArrayData, IntoArrayVariant};
290+
use std::assert_matches::assert_matches;
291+
292+
use vortex_array::{ArrayLen, IntoArrayData, IntoArrayVariant};
267293
use vortex_buffer::buffer;
294+
use vortex_dtype::Nullability::NonNullable;
295+
use vortex_dtype::{DType, PType};
296+
use vortex_expr::{BinaryExpr, Identity, Literal, Operator};
268297

298+
use crate::layouts::chunked::scan::{ChunkState, ChunkedScan, ChunkedScanner};
269299
use crate::layouts::chunked::writer::ChunkedLayoutWriter;
270-
use crate::scanner::Scan;
300+
use crate::scanner::{Poll, Scan, Scanner};
271301
use crate::segments::test::TestSegments;
272302
use crate::strategies::LayoutWriterExt;
273-
use crate::LayoutData;
303+
use crate::{LayoutData, RowMask};
274304

275-
/// Create a chunked layout with three chunks of `1..=3` primitive arrays.
305+
/// Create a chunked layout with three chunks of primitive arrays.
276306
fn chunked_layout() -> (TestSegments, LayoutData) {
277-
let arr = buffer![1, 2, 3].into_array();
278307
let mut segments = TestSegments::default();
279-
let layout = ChunkedLayoutWriter::new(arr.dtype(), Default::default())
280-
.push_all(
281-
&mut segments,
282-
[Ok(arr.clone()), Ok(arr.clone()), Ok(arr.clone())],
283-
)
284-
.unwrap();
308+
let layout = ChunkedLayoutWriter::new(
309+
&DType::Primitive(PType::I32, NonNullable),
310+
Default::default(),
311+
)
312+
.push_all(
313+
&mut segments,
314+
[
315+
Ok(buffer![1, 2, 3].into_array()),
316+
Ok(buffer![4, 5, 6].into_array()),
317+
Ok(buffer![7, 8, 9].into_array()),
318+
],
319+
)
320+
.unwrap();
285321
(segments, layout)
286322
}
287323

@@ -293,6 +329,46 @@ mod test {
293329
let result = segments.do_scan(scan.as_ref()).into_primitive().unwrap();
294330

295331
assert_eq!(result.len(), 9);
296-
assert_eq!(result.as_slice::<i32>(), &[1, 2, 3, 1, 2, 3, 1, 2, 3]);
332+
assert_eq!(result.as_slice::<i32>(), &[1, 2, 3, 4, 5, 6, 7, 8, 9]);
333+
}
334+
335+
#[test]
336+
fn test_chunked_pruning_mask() {
337+
let (segments, layout) = chunked_layout();
338+
339+
let scan = ChunkedScan::try_new(
340+
layout,
341+
Scan {
342+
projection: Identity::new_expr(),
343+
filter: Some(BinaryExpr::new_expr(
344+
Identity::new_expr(),
345+
Operator::Gt,
346+
Literal::new_expr(6.into()),
347+
)),
348+
},
349+
Default::default(),
350+
)
351+
.unwrap();
352+
353+
// Populate the stats scanner so that we can compute the pruning mask
354+
_ = scan.stats_scanner.write().unwrap().poll(&segments).unwrap();
355+
356+
let mut scanner = ChunkedScanner {
357+
chunked_scan: scan,
358+
mask: RowMask::new_valid_between(0, 9),
359+
chunk_states: None,
360+
};
361+
362+
// Then we poll the chunked scanner without any segments so _only_ the stats were
363+
// available.
364+
let Poll::NeedMore(_segments) = scanner.poll(&TestSegments::default()).unwrap() else {
365+
unreachable!()
366+
};
367+
368+
// Now we validate that based on the pruning mask, we have excluded the first two chunks
369+
let chunk_states = scanner.chunk_states.as_ref().unwrap().as_slice();
370+
assert_matches!(chunk_states[0], ChunkState::Resolved(None));
371+
assert_matches!(chunk_states[1], ChunkState::Resolved(None));
372+
assert_matches!(chunk_states[2], ChunkState::Pending(_));
297373
}
298374
}

vortex-layout/src/layouts/flat/scan.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,12 @@ use crate::scanner::{LayoutScan, Poll, Scan, Scanner};
99
use crate::segments::{SegmentId, SegmentReader};
1010
use crate::{LayoutData, LayoutEncoding, RowMask};
1111

12-
#[derive(Clone, Eq, PartialEq)]
13-
enum State {
14-
Initial,
15-
}
16-
12+
#[derive(Debug)]
1713
pub struct FlatScan {
1814
layout: LayoutData,
1915
scan: Scan,
2016
dtype: DType,
2117
ctx: ContextRef,
22-
state: State,
2318
}
2419

2520
impl FlatScan {
@@ -33,7 +28,6 @@ impl FlatScan {
3328
scan,
3429
dtype,
3530
ctx,
36-
state: State::Initial,
3731
})
3832
}
3933
}
@@ -67,6 +61,9 @@ impl LayoutScan for FlatScan {
6761
}
6862
}
6963

64+
// TODO(ngates): this needs to move into a shared Scanner inside the Scan. Then each scanner can
65+
// share work.
66+
#[derive(Debug)]
7067
struct FlatScanner {
7168
segment_id: SegmentId,
7269
dtype: DType,
@@ -175,7 +172,7 @@ mod test {
175172
filter: Some(BinaryExpr::new_expr(
176173
Arc::new(Identity),
177174
Operator::Gt,
178-
Literal::new_expr(3.into()),
175+
Literal::new_expr(3i32.into()),
179176
)),
180177
};
181178

vortex-layout/src/layouts/struct_/scan.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::scanner::{LayoutScan, Poll, Scan, Scanner};
77
use crate::segments::SegmentReader;
88
use crate::{LayoutData, LayoutEncoding, RowMask};
99

10+
#[derive(Debug)]
1011
pub struct StructScan {
1112
layout: LayoutData,
1213
scan: Scan,
@@ -49,11 +50,12 @@ impl LayoutScan for StructScan {
4950
}
5051
}
5152

52-
#[derive(Clone)]
53+
#[derive(Clone, Debug)]
5354
enum State {
5455
Initial,
5556
}
5657

58+
#[derive(Debug)]
5759
struct StructScanner {
5860
layout: LayoutData,
5961
scan: Scan,

vortex-layout/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#![feature(once_cell_try)]
2+
#![feature(assert_matches)]
23
#![allow(dead_code)]
34
mod data;
45
pub mod scanner;

vortex-layout/src/scanner/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
mod scan;
22

3+
use std::fmt::Debug;
4+
35
pub use scan::*;
46
use vortex_array::ArrayData;
57
use vortex_dtype::DType;
@@ -9,7 +11,7 @@ use crate::segments::{SegmentId, SegmentReader};
911
use crate::{LayoutData, RowMask};
1012

1113
/// A [`LayoutScan`] provides an encapsulation of an invocation of a scan operation.
12-
pub trait LayoutScan: 'static + Send + Sync {
14+
pub trait LayoutScan: 'static + Send + Sync + Debug {
1315
/// Returns the [`LayoutData`] that this scan is operating on.
1416
fn layout(&self) -> &LayoutData;
1517

@@ -45,7 +47,7 @@ pub enum Poll {
4547
}
4648

4749
/// A trait for scanning a single row range of a layout.
48-
pub trait Scanner: 'static + Send + Sync {
50+
pub trait Scanner: 'static + Send + Sync + Debug {
4951
/// Attempts to return the [`ArrayData`] result of this ranged scan. If the scanner cannot
5052
/// make progress, it can return a vec of additional data segments using [`Poll::NeedMore`].
5153
///
@@ -67,6 +69,7 @@ pub trait ScannerExt: Scanner {
6769
impl<S: Scanner> ScannerExt for S {}
6870

6971
/// A scanner with an [`ArrayData`] that is always returned.
72+
#[derive(Debug)]
7073
pub struct ResolvedScanner(pub ArrayData);
7174

7275
impl Scanner for ResolvedScanner {

0 commit comments

Comments
 (0)