Skip to content

Commit bdb8cf8

Browse files
authored
chore: cleanup scan task execution to be clearer (#3489)
1 parent 7ea2a52 commit bdb8cf8

File tree

4 files changed

+149
-99
lines changed

4 files changed

+149
-99
lines changed

vortex-expr/src/transform/immediate_access.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ impl<'a> NodeVisitor<'a> for ImmediateScopeAccessesAnalysis<'a> {
6565
fn visit_down(&mut self, node: &'a Self::NodeTy) -> VortexResult<TraversalOrder> {
6666
assert!(
6767
!node.as_any().is::<Select>(),
68-
"cannot analyse select, simply the expression"
68+
"cannot analyze select, simplify the expression"
6969
);
7070
if let Some(get_item) = node.as_any().downcast_ref::<GetItem>() {
7171
if is_root(get_item.child()) {

vortex-layout/src/scan/mod.rs

Lines changed: 28 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@ use arrow_array::RecordBatch;
66
use arrow_schema::SchemaRef;
77
pub use executor::*;
88
use futures::executor::LocalPool;
9-
use futures::future::ok;
109
use futures::task::LocalSpawnExt;
11-
use futures::{FutureExt, Stream, StreamExt, stream};
10+
use futures::{Stream, StreamExt, stream};
1211
use itertools::Itertools;
1312
pub use selection::*;
1413
pub use split_by::*;
@@ -18,18 +17,21 @@ use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
1817
use vortex_array::{ArrayRef, ToCanonical};
1918
use vortex_buffer::Buffer;
2019
use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
21-
use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_err};
20+
use vortex_error::{VortexExpect, VortexResult, vortex_err};
2221
use vortex_expr::transform::immediate_access::immediate_scope_access;
2322
use vortex_expr::transform::simplify_typed::simplify_typed;
2423
use vortex_expr::{ExprRef, ScopeDType, root};
2524
use vortex_metrics::VortexMetrics;
2625

2726
use crate::LayoutReader;
2827
use crate::layouts::filter::FilterLayoutReader;
28+
use crate::scan::tasks::{TaskContext, split_exec};
29+
2930
mod executor;
3031
pub mod row_mask;
3132
mod selection;
3233
mod split_by;
34+
mod tasks;
3335

3436
/// A struct for building a scan operation.
3537
pub struct ScanBuilder<A> {
@@ -53,7 +55,7 @@ pub struct ScanBuilder<A> {
5355
file_stats: Option<Arc<[StatsSet]>>,
5456
}
5557

56-
impl<A: 'static + Send> ScanBuilder<A> {
58+
impl<A: 'static + Send + Sync> ScanBuilder<A> {
5759
pub fn with_filter(mut self, filter: ExprRef) -> Self {
5860
self.filter = Some(filter);
5961
self
@@ -153,10 +155,12 @@ impl<A: 'static + Send> ScanBuilder<A> {
153155
pub fn build(self) -> VortexResult<Vec<impl Future<Output = VortexResult<Option<A>>>>> {
154156
// Spin up the root layout reader, and wrap it in a FilterLayoutReader to perform
155157
// conjunction splitting if a filter is provided.
156-
let mut layout_reader = self.layout_reader;
157-
if self.filter.is_some() {
158-
layout_reader = Arc::new(FilterLayoutReader::new(layout_reader));
159-
}
158+
let layout_reader = if self.filter.is_some() {
159+
Arc::new(FilterLayoutReader::new(self.layout_reader))
160+
} else {
161+
self.layout_reader
162+
};
163+
160164
let ctx = ScopeDType::new(layout_reader.dtype().clone());
161165

162166
// Normalize and simplify the expressions.
@@ -177,98 +181,25 @@ impl<A: 'static + Send> ScanBuilder<A> {
177181
.collect();
178182
let splits = self.split_by.splits(layout_reader.as_ref(), &field_mask)?;
179183

180-
let row_masks = splits
184+
// Create a task that executes the full scan pipeline for each split.
185+
let split_tasks = splits
181186
.into_iter()
182-
.filter_map(|row_range| {
183-
if let Some(scan_range) = &self.row_range {
184-
// If the row range is fully within the scan range, return it.
185-
if row_range.start >= scan_range.end || row_range.end < scan_range.start {
186-
return None;
187-
}
188-
// Otherwise, take the intersection of the range.
189-
return Some(
190-
row_range.start.max(scan_range.start)..row_range.end.min(scan_range.end),
191-
);
192-
} else {
193-
Some(row_range)
194-
}
187+
.map(move |split_range| {
188+
let ctx = Arc::new(TaskContext {
189+
row_range: self.row_range.clone(),
190+
selection: self.selection.clone(),
191+
filter: self.filter.clone(),
192+
reader: layout_reader.clone(),
193+
projection: projection.clone(),
194+
mapper: self.map_fn.clone(),
195+
task_executor: None,
196+
});
197+
198+
split_exec(ctx, split_range)
195199
})
196-
.map(|row_range| self.selection.row_mask(&row_range))
197-
.filter(|mask| !mask.mask().all_false())
198-
.map(|row_mask| {
199-
let row_range = row_mask.row_range();
200-
(row_range, ok(row_mask.mask().clone()).boxed())
201-
})
202-
.collect_vec();
203-
204-
// NOTE(ngates): since segment prefetching occurs in insertion order, we construct
205-
// all pruning tasks, then all filter tasks, then all projection tasks. When a task
206-
// explicitly polls a segment, it jumps to the front of the queue so this shouldn't
207-
// impact the time-to-first-chunk latency.
208-
209-
// If a filter expression is provided, then we set up pruning and filter evaluations.
210-
let row_masks = if let Some(filter) = &filter {
211-
// Map the row masks through the pruning evaluation
212-
let row_masks: Vec<_> = row_masks
213-
.into_iter()
214-
.map(|(row_range, mask_fut)| {
215-
let eval = layout_reader.pruning_evaluation(&row_range, filter)?;
216-
let mask_fut = async move {
217-
let mask = mask_fut.await?;
218-
if mask.all_false() {
219-
Ok(mask)
220-
} else {
221-
eval.invoke(mask).await
222-
}
223-
}
224-
.boxed();
225-
Ok::<_, VortexError>((row_range, mask_fut))
226-
})
227-
.try_collect()?;
228-
229-
// Map the row masks through the filter evaluation
230-
row_masks
231-
.into_iter()
232-
.map(|(row_range, mask_fut)| {
233-
let eval = layout_reader.filter_evaluation(&row_range, filter)?;
234-
let mask_fut = async move {
235-
let mask = mask_fut.await?;
236-
if mask.all_false() {
237-
Ok(mask)
238-
} else {
239-
eval.invoke(mask).await
240-
}
241-
}
242-
.boxed();
243-
Ok::<_, VortexError>((row_range, mask_fut))
244-
})
245-
.try_collect()?
246-
} else {
247-
row_masks
248-
};
200+
.try_collect()?;
249201

250-
// Finally, map the row masks through the projection evaluation and spawn.
251-
row_masks
252-
.into_iter()
253-
.map(|(row_range, mask_fut)| {
254-
let map_fn = self.map_fn.clone();
255-
let eval = layout_reader.projection_evaluation(&row_range, &projection)?;
256-
let array_fut = async move {
257-
let mask = mask_fut.await?;
258-
if mask.all_false() {
259-
Ok(None)
260-
} else {
261-
map_fn(eval.invoke(mask).await?).map(Some)
262-
}
263-
}
264-
.boxed();
265-
266-
Ok(match &self.executor {
267-
None => array_fut,
268-
Some(executor) => executor.spawn(array_fut),
269-
})
270-
})
271-
.try_collect()
202+
Ok(split_tasks)
272203
}
273204

274205
/// Returns a stream over the scan objects.

vortex-layout/src/scan/selection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::scan::row_mask::RowMask;
88

99
/// A selection identifies a set of rows to include in the scan (in addition to applying any
1010
/// filter predicates).
11-
#[derive(Default)]
11+
#[derive(Default, Clone)]
1212
pub enum Selection {
1313
/// No selection, all rows are included.
1414
#[default]

vortex-layout/src/scan/tasks.rs

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
//! Split scanning task implementation.
2+
3+
use std::ops::Range;
4+
use std::sync::Arc;
5+
6+
use futures::FutureExt;
7+
use futures::future::{BoxFuture, ok};
8+
use vortex_array::ArrayRef;
9+
use vortex_error::VortexResult;
10+
use vortex_expr::ExprRef;
11+
12+
use crate::LayoutReader;
13+
use crate::scan::{Selection, TaskExecutor, TaskExecutorExt};
14+
15+
pub type TaskFuture<A> = BoxFuture<'static, VortexResult<A>>;
16+
17+
/// Logic for executing a single split reading task.
18+
///
19+
/// # Task execution flow
20+
///
21+
/// First, the tasks's row range (split) is intersected with the global file row-range requested,
22+
/// if any.
23+
///
24+
/// Then intersected row range is then further reduced via expression-based pruning. After pruning
25+
/// has eliminated more blocks, the full filter is executed over the remainder of the split.
26+
///
27+
/// This mask is then provided to the reader to perform a filtered projection over the split data,
28+
/// finally mapping the Vortex columnar record batches into some result type `A`.
29+
pub(super) fn split_exec<A: 'static + Send + Sync>(
30+
ctx: Arc<TaskContext<A>>,
31+
split: Range<u64>,
32+
) -> VortexResult<TaskFuture<Option<A>>> {
33+
// Step 1: using the caller-provided row range and selection, attempt to disregard this split.
34+
let read_range = match &ctx.row_range {
35+
None => split,
36+
Some(row_range) => {
37+
if row_range.start >= split.end || row_range.end < split.start {
38+
// No overlap for this task
39+
return Ok(ok(None).boxed());
40+
}
41+
42+
let intersect_start = row_range.start.max(split.start);
43+
let intersect_end = row_range.end.min(split.end);
44+
intersect_start..intersect_end
45+
}
46+
};
47+
48+
// Apply the selection to calculate a read mask
49+
let read_mask = ctx.selection.row_mask(&read_range);
50+
let row_range = read_mask.row_range();
51+
let row_mask = read_mask.mask().clone();
52+
if row_mask.all_false() {
53+
return Ok(ok(None).boxed());
54+
}
55+
56+
let filter = match ctx.filter.as_ref() {
57+
// No filter == immediate task
58+
None => ok(row_mask).boxed(),
59+
Some(filter) => {
60+
// Step 2: if there is a filter provided, attempt to prune this range based on the filter.
61+
// NOTE: it's very important that the pruning and filter evaluations are built OUTSIDE
62+
// of the future. Registering these row ranges eagerly is a hint to the IO system that
63+
// we want to start prefetching the IO for this split.
64+
let prune = ctx.reader.pruning_evaluation(&row_range, filter)?;
65+
let eval = ctx.reader.filter_evaluation(&row_range, filter)?;
66+
67+
async move {
68+
let pruned_mask = prune.invoke(row_mask).await?;
69+
70+
// Step 3: apply exact filtering. The pruning step has already eliminated entire blocks
71+
// where we know the filter won't match any rows, so the amount of work to do here
72+
// should be a lot less.
73+
eval.invoke(pruned_mask).await
74+
}
75+
.boxed()
76+
}
77+
};
78+
79+
// Step 4: execute the projection, only at the mask for rows which match the filter
80+
let exec = ctx
81+
.reader
82+
.projection_evaluation(&row_range, &ctx.projection)?;
83+
let mapper = ctx.mapper.clone();
84+
let array_fut = async move {
85+
let filtered_mask = filter.await?;
86+
let array_ref = exec.invoke(filtered_mask).await?;
87+
mapper(array_ref).map(Some)
88+
};
89+
90+
match &ctx.task_executor {
91+
None => Ok(array_fut.boxed()),
92+
// If caller provided an executor for the CPU work, spawn onto that and await the result
93+
Some(executor) => Ok(executor.clone().spawn(array_fut.boxed())),
94+
}
95+
}
96+
97+
/// Information needed to execute a single split task.
98+
pub(super) struct TaskContext<A> {
99+
/// A caller-provided range of the file to read. All tasks should intersect their reads
100+
/// with this range to ensure that they are split as well.
101+
pub(super) row_range: Option<Range<u64>>,
102+
103+
/// A row selection to apply.
104+
pub(super) selection: Selection,
105+
106+
/// The filter expression for the current task.
107+
pub(super) filter: Option<ExprRef>,
108+
109+
/// The layout reader.
110+
pub(super) reader: Arc<dyn LayoutReader>,
111+
112+
/// The projection expression to apply to gather the scanned rows.
113+
pub(super) projection: ExprRef,
114+
115+
/// Function that maps into an A.
116+
pub(super) mapper: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
117+
118+
pub(super) task_executor: Option<Arc<dyn TaskExecutor>>,
119+
}

0 commit comments

Comments
 (0)