Skip to content

Commit f28031a

Browse files
authored
Vortex Layouts - Chunked (#1819)
This implementation could definitely be optimized, and doesn't yet use the statistics table for pruning.
1 parent 7ed8f94 commit f28031a

File tree

7 files changed

+330
-126
lines changed

7 files changed

+330
-126
lines changed

vortex-layout/src/data.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,11 @@ impl LayoutData {
243243
}
244244

245245
/// Create a scan of this layout.
246-
pub fn new_scan(self, scan: Scan, ctx: ContextRef) -> VortexResult<Box<dyn LayoutScan>> {
246+
pub fn new_scan(
247+
self,
248+
scan: Scan,
249+
ctx: ContextRef,
250+
) -> VortexResult<Box<dyn LayoutScan + 'static>> {
247251
self.encoding().scan(self, scan, ctx)
248252
}
249253
}
Lines changed: 201 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,109 @@
1-
use itertools::Itertools;
2-
use vortex_array::array::ChunkedArray;
1+
use std::sync::{Arc, OnceLock, RwLock};
2+
3+
use vortex_array::array::{ChunkedArray, StructArray};
4+
use vortex_array::stats::{stats_from_bitset_bytes, Stat};
5+
use vortex_array::validity::Validity;
36
use vortex_array::{ArrayData, ContextRef, IntoArrayData};
47
use vortex_dtype::DType;
58
use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult};
69

10+
use crate::layouts::chunked::stats::StatsTable;
711
use crate::layouts::chunked::ChunkedLayout;
8-
use crate::scanner::{LayoutScan, Poll, Scan, Scanner};
9-
use crate::segments::SegmentReader;
12+
use crate::scanner::{LayoutScan, Poll, ResolvedScanner, Scan, Scanner, ScannerExt};
13+
use crate::segments::{SegmentId, SegmentReader};
1014
use crate::{LayoutData, LayoutEncoding, RowMask};
1115

16+
/// Captures the scan state of a chunked layout.
17+
///
18+
/// This scan is used to generate multiple scanners, one per row-range. It may even be the case
19+
/// that the caller requests different row ranges for filter operations as for projection
20+
/// operations. As such, it's beneficial for us to re-use some state across different scanners.
21+
///
22+
/// The obvious state to re-use is that of the statistics table. Each range scan polls the same
23+
/// underlying statistics scanner since a scanner must continue to return its result for subsequent
24+
/// polls.
25+
///
26+
/// There is a question about whether we should lazily construct and share all chunk scanners too.
27+
/// We currently create a new chunk scanner for every chunk read by each range scan. This is sort
28+
/// of fine if we assume row ranges are non-overlapping (although if a range overlaps a chunk
29+
/// boundary we will read the chunk twice). However, if we have overlapping row ranges, as can
30+
/// happen if the parent is performing multiple scans (filter + projection), then we may read the
31+
/// same chunk many times.
1232
pub struct ChunkedScan {
1333
layout: LayoutData,
1434
scan: Scan,
1535
dtype: DType,
1636
ctx: ContextRef,
37+
stats_scanner: Arc<RwLock<Box<dyn Scanner>>>,
38+
chunk_scans: Arc<[OnceLock<Box<dyn LayoutScan>>]>,
39+
present_stats: Arc<[Stat]>,
40+
}
41+
42+
struct ChunkedStats {
43+
present_stats: Vec<Stat>,
44+
scanner: Box<dyn Scanner>,
45+
// Cached stats table once it's been read from the scanner.
46+
table: Option<StatsTable>,
1747
}
1848

1949
impl ChunkedScan {
2050
pub(super) fn try_new(layout: LayoutData, scan: Scan, ctx: ContextRef) -> VortexResult<Self> {
2151
if layout.encoding().id() != ChunkedLayout.id() {
2252
vortex_panic!("Mismatched layout ID")
2353
}
54+
55+
// The number of chunks
56+
let mut nchunks = layout.nchildren();
57+
if layout.metadata().is_some() {
58+
// The final child is the statistics table.
59+
nchunks -= 1;
60+
}
61+
62+
// Figure out which stats are present
63+
let present_stats: Arc<[Stat]> = layout
64+
.metadata()
65+
.map(|m| stats_from_bitset_bytes(m.as_ref()))
66+
.unwrap_or_default()
67+
.into();
68+
69+
// Construct a scanner for the stats array
70+
let stats_scanner = layout
71+
.metadata()
72+
.is_some()
73+
.then(|| {
74+
let stats_dtype = StatsTable::dtype_for_stats_table(layout.dtype(), &present_stats);
75+
let stats_layout = layout.child(layout.nchildren() - 1, stats_dtype)?;
76+
stats_layout
77+
.new_scan(Scan::all(), ctx.clone())?
78+
.create_scanner(RowMask::new_valid_between(0, nchunks as u64))
79+
})
80+
.transpose()?
81+
.unwrap_or_else(|| {
82+
// Otherwise we create a default stats array with no columns.
83+
ResolvedScanner(
84+
StructArray::try_new(vec![].into(), vec![], nchunks, Validity::NonNullable)
85+
.vortex_expect("cannot fail")
86+
.into_array(),
87+
)
88+
.boxed()
89+
});
90+
91+
// Construct a lazy scan for each chunk of the layout.
92+
let chunk_scans = (0..nchunks).map(|_| OnceLock::new()).collect();
93+
94+
// Compute the dtype of the scan.
2495
let dtype = scan.result_dtype(layout.dtype())?;
96+
2597
Ok(Self {
2698
layout,
2799
scan,
28100
dtype,
29101
ctx,
102+
stats_scanner: Arc::new(RwLock::new(stats_scanner)),
103+
chunk_scans,
104+
present_stats,
30105
})
31106
}
32-
33-
/// Returns the number of chunks in the layout.
34-
fn nchunks(&self) -> usize {
35-
let mut nchildren = self.layout.nchildren();
36-
if self.layout.metadata().is_some() {
37-
// The final child is the statistics table.
38-
nchildren -= 1;
39-
}
40-
nchildren
41-
}
42107
}
43108

44109
impl LayoutScan for ChunkedScan {
@@ -50,64 +115,121 @@ impl LayoutScan for ChunkedScan {
50115
&self.dtype
51116
}
52117

53-
/// Note that a [`Scanner`] is intended to return a single batch of data, therefore instead
54-
/// of reading chunks one by one, we attempt to make progress on reading all chunks at the same
55-
/// time. We therefore do as much pruning as we can now and then read all chunks in parallel.
56118
fn create_scanner(&self, mask: RowMask) -> VortexResult<Box<dyn Scanner>> {
57-
let mut chunk_scanners = Vec::with_capacity(self.layout.nchildren());
58-
59-
let mut row_start = 0;
60-
for chunk_idx in 0..self.nchunks() {
61-
let chunk_range = row_start..row_start + self.layout().child_row_count(chunk_idx);
62-
row_start = chunk_range.end;
63-
64-
if mask.is_disjoint(chunk_range.clone()) {
65-
// Skip this chunk if it's not in the mask.
66-
continue;
67-
}
68-
69-
let chunk_layout = self
70-
.layout
71-
.child(chunk_idx, self.layout.dtype().clone())
72-
.vortex_expect("Child index out of bound");
73-
let chunk_scan = chunk_layout.new_scan(self.scan.clone(), self.ctx.clone())?;
74-
let chunk_mask = mask
75-
.clone()
76-
// TODO(ngates): I would have thought slice would also shift?
77-
.slice(chunk_range.start, chunk_range.end)?
78-
.shift(chunk_range.start)?;
79-
let chunk_scanner = chunk_scan.create_scanner(chunk_mask)?;
80-
chunk_scanners.push(chunk_scanner);
81-
}
82-
83119
Ok(Box::new(ChunkedScanner {
84-
chunk_scanners,
85-
chunk_arrays: vec![None; self.nchunks()],
86-
dtype: self.dtype.clone(),
120+
scan: self.scan.clone(),
121+
layout: self.layout.clone(),
122+
ctx: self.ctx.clone(),
123+
scan_dtype: self.dtype.clone(),
124+
present_stats: self.present_stats.clone(),
125+
stats_scanner: self.stats_scanner.clone(),
126+
chunk_scans: self.chunk_scans.clone(),
127+
mask,
128+
chunk_states: None,
87129
}) as _)
88130
}
89131
}
90132

91133
/// A scanner for a chunked layout.
92134
struct ChunkedScanner {
93-
chunk_scanners: Vec<Box<dyn Scanner>>,
94-
chunk_arrays: Vec<Option<ArrayData>>,
95-
dtype: DType,
135+
scan: Scan,
136+
layout: LayoutData,
137+
ctx: ContextRef,
138+
scan_dtype: DType,
139+
present_stats: Arc<[Stat]>,
140+
stats_scanner: Arc<RwLock<Box<dyn Scanner>>>,
141+
chunk_scans: Arc<[OnceLock<Box<dyn LayoutScan>>]>,
142+
mask: RowMask,
143+
144+
// State for each chunk in the layout
145+
chunk_states: Option<Vec<ChunkState>>,
146+
}
147+
148+
enum ChunkState {
149+
Pending(Box<dyn Scanner>),
150+
Resolved(Option<ArrayData>),
96151
}
97152

98153
impl Scanner for ChunkedScanner {
99154
fn poll(&mut self, segments: &dyn SegmentReader) -> VortexResult<Poll> {
100-
// Otherwise, we need to read more data.
101-
let mut needed = vec![];
102-
for (chunk_idx, chunk) in self.chunk_scanners.iter_mut().enumerate() {
103-
if self.chunk_arrays[chunk_idx].is_some() {
104-
// We've already read this chunk, so skip it.
105-
continue;
155+
// If we haven't set up our chunk state yet, then fetch the stats table and do so.
156+
if self.chunk_states.is_none() {
157+
// First, we grab the stats table
158+
let _stats_table = match self
159+
.stats_scanner
160+
.write()
161+
.map_err(|_| vortex_err!("poisoned"))?
162+
.poll(segments)?
163+
{
164+
Poll::Some(stats_array) => StatsTable::try_new(
165+
self.layout.dtype().clone(),
166+
stats_array,
167+
self.present_stats.clone(),
168+
)?,
169+
Poll::NeedMore(segments) => {
170+
// Otherwise, we need more segments to read the stats table.
171+
return Ok(Poll::NeedMore(segments));
172+
}
173+
};
174+
175+
// Now we can set up the chunk state.
176+
let mut chunks = Vec::with_capacity(self.chunk_scans.len());
177+
let mut row_offset = 0;
178+
for chunk_idx in 0..self.chunk_scans.len() {
179+
let chunk_scan = self.chunk_scans[chunk_idx].get_or_try_init(|| {
180+
self.layout
181+
.child(chunk_idx, self.layout.dtype().clone())
182+
.vortex_expect("Child index out of bounds")
183+
.new_scan(self.scan.clone(), self.ctx.clone())
184+
})?;
185+
186+
// Figure out the row range of the chunk
187+
let chunk_len = chunk_scan.layout().row_count();
188+
let chunk_range = row_offset..row_offset + chunk_len;
189+
row_offset += chunk_len;
190+
191+
// Try to skip the chunk based on the row-mask
192+
if self.mask.is_disjoint(chunk_range.clone()) {
193+
chunks.push(ChunkState::Resolved(None));
194+
continue;
195+
}
196+
197+
// Try to skip the chunk based on the stats table
198+
// TODO
199+
200+
// Otherwise, we need to read it. So we set up a mask for the chunk range.
201+
let chunk_mask = self
202+
.mask
203+
.slice(chunk_range.start, chunk_range.end)?
204+
.shift(chunk_range.start)?;
205+
chunks.push(ChunkState::Pending(chunk_scan.create_scanner(chunk_mask)?));
106206
}
107207

108-
match chunk.poll(segments)? {
109-
Poll::Some(array) => self.chunk_arrays[chunk_idx] = Some(array),
110-
Poll::NeedMore(segment_ids) => needed.extend(segment_ids),
208+
self.chunk_states = Some(chunks);
209+
}
210+
211+
let chunk_states = self
212+
.chunk_states
213+
.as_mut()
214+
.vortex_expect("chunk state not set");
215+
216+
// Now we try to read the chunks.
217+
let mut needed = vec![];
218+
for chunk_state in chunk_states.iter_mut() {
219+
match chunk_state {
220+
ChunkState::Pending(scanner) => match scanner.poll(segments)? {
221+
Poll::Some(array) => {
222+
// Resolve the chunk
223+
*chunk_state = ChunkState::Resolved(Some(array));
224+
}
225+
Poll::NeedMore(segment_ids) => {
226+
// Request more segments
227+
needed.extend(segment_ids);
228+
}
229+
},
230+
ChunkState::Resolved(_) => {
231+
// Already resolved
232+
}
111233
}
112234
}
113235

@@ -117,26 +239,38 @@ impl Scanner for ChunkedScanner {
117239
}
118240

119241
// Otherwise, we've read all the chunks, so we're done.
120-
Ok(Poll::Some(ChunkedArray::try_new(
121-
self.chunk_arrays.iter_mut()
122-
.map(|array| array.take()
123-
.ok_or_else(|| vortex_err!("This is a bug. Missing a chunk array with no more segments to read")))
124-
.try_collect()?,
125-
self.dtype.clone(),
126-
)?.into_array()))
242+
let chunks = chunk_states
243+
.iter_mut()
244+
.filter_map(|state| match state {
245+
ChunkState::Resolved(array) => array.take(),
246+
_ => vortex_panic!(
247+
"This is a bug. Missing a chunk array with no more segments to read"
248+
),
249+
})
250+
.collect::<Vec<_>>();
251+
252+
Ok(Poll::Some(
253+
ChunkedArray::try_new(chunks, self.scan_dtype.clone())?.into_array(),
254+
))
127255
}
128256
}
129257

258+
enum PollStats {
259+
None,
260+
Some(StatsTable),
261+
NeedMore(Vec<SegmentId>),
262+
}
263+
130264
#[cfg(test)]
131265
mod test {
132266
use vortex_array::{ArrayDType, ArrayLen, IntoArrayData, IntoArrayVariant};
133267
use vortex_buffer::buffer;
134268

135269
use crate::layouts::chunked::writer::ChunkedLayoutWriter;
136-
use crate::scanner::{Poll, Scan};
270+
use crate::scanner::Scan;
137271
use crate::segments::test::TestSegments;
138272
use crate::strategies::LayoutWriterExt;
139-
use crate::{LayoutData, RowMask};
273+
use crate::LayoutData;
140274

141275
/// Create a chunked layout with three chunks of `1..=3` primitive arrays.
142276
fn chunked_layout() -> (TestSegments, LayoutData) {
@@ -161,37 +295,4 @@ mod test {
161295
assert_eq!(result.len(), 9);
162296
assert_eq!(result.as_slice::<i32>(), &[1, 2, 3, 1, 2, 3, 1, 2, 3]);
163297
}
164-
165-
#[test]
166-
fn test_chunked_scan_pruned() {
167-
let (_segments, layout) = chunked_layout();
168-
169-
let scan = layout.new_scan(Scan::all(), Default::default()).unwrap();
170-
let full_mask = RowMask::new_valid_between(0, scan.layout().row_count());
171-
172-
// We poll scanners with empty segments to count how many segments they would need.
173-
let mut full_scanner = scan.create_scanner(full_mask.clone()).unwrap();
174-
let Poll::NeedMore(full_segments) = full_scanner.poll(&TestSegments::default()).unwrap()
175-
else {
176-
unreachable!()
177-
};
178-
179-
// Using a row-mask that only covers one chunk should have 3x fewer segments.
180-
let mut one_chunk_scanner = scan.create_scanner(full_mask.slice(0, 3).unwrap()).unwrap();
181-
let Poll::NeedMore(one_segments) =
182-
one_chunk_scanner.poll(&TestSegments::default()).unwrap()
183-
else {
184-
unreachable!()
185-
};
186-
assert_eq!(one_segments.len() * 3, full_segments.len());
187-
188-
// Using a row-mask that covers two chunks should have 2/3 the full_segments.
189-
let mut two_chunk_scanner = scan.create_scanner(full_mask.slice(2, 5).unwrap()).unwrap();
190-
let Poll::NeedMore(two_segments) =
191-
two_chunk_scanner.poll(&TestSegments::default()).unwrap()
192-
else {
193-
unreachable!()
194-
};
195-
assert_eq!(two_segments.len(), one_segments.len() * 2);
196-
}
197298
}

0 commit comments

Comments
 (0)