Skip to content

Commit 9629955

Browse files
committed
Add with_preserve_order flag
1 parent 9ed8aa4 commit 9629955

File tree

2 files changed

+150
-49
lines changed

2 files changed

+150
-49
lines changed

datafusion/datasource/src/file_stream.rs

Lines changed: 138 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,17 @@ pub struct FileStream {
117117
baseline_metrics: BaselineMetrics,
118118
/// Describes the behavior of the `FileStream` if file opening or scanning fails
119119
on_error: OnError,
120+
/// Preserve the logical planner/morsel order defined by the
121+
/// [`MorselPlan`] API?
122+
///
123+
/// If false (the default) morsels will be produced in the order
124+
/// that they are ready to be run.
125+
///
126+
/// If true, Morsels will be produced in the logical order defined on
127+
/// [`MorselPlan`]
128+
///
129+
/// [`MorselPlan`]: crate::morsel::MorselPlan
130+
preserve_order: bool,
120131
/// Is the stream complete?
121132
state: StreamState,
122133
}
@@ -169,6 +180,7 @@ impl FileStream {
169180
file_stream_metrics: FileStreamMetrics::new(metrics, partition),
170181
baseline_metrics: BaselineMetrics::new(metrics, partition),
171182
on_error: OnError::Fail,
183+
preserve_order: false,
172184
state: StreamState::Active,
173185
})
174186
}
@@ -181,6 +193,13 @@ impl FileStream {
181193
self
182194
}
183195

196+
/// Specify whether this `FileStream` should preserve the logical output
197+
/// order implied by `MorselPlan`s.
198+
pub fn with_preserve_order(mut self, preserve_order: bool) -> Self {
199+
self.preserve_order = preserve_order;
200+
self
201+
}
202+
184203
/// Run a planner on CPU until it either needs I/O or fully completes.
185204
///
186205
/// Any morsels produced along the way are appended to `self.morsels`. If
@@ -224,6 +243,18 @@ impl FileStream {
224243
while (self.waiting_planners.len() + self.ready_planners.len())
225244
< TARGET_CONCURRENT_PLANNERS
226245
{
246+
// In ordered mode, do not admit later files while there is any
247+
// earlier file work still buffered, waiting on I/O, or actively
248+
// being scanned. This keeps file-level planning from introducing
249+
// later output ahead of earlier files.
250+
if self.preserve_order
251+
&& (self.reader.is_some()
252+
|| !self.morsels.is_empty()
253+
|| !self.ready_planners.is_empty()
254+
|| !self.waiting_planners.is_empty())
255+
{
256+
break;
257+
}
227258
if self.morsels.len() >= max_buffered_morsels {
228259
break;
229260
}
@@ -307,6 +338,17 @@ impl FileStream {
307338
// reader is currently active. This avoids starving planner work
308339
// behind a reader that is itself waiting on I/O.
309340
while self.morsels.len() < max_buffered_morsels() {
341+
// In ordered mode, once an earlier planner has produced a
342+
// morsel or is blocked on I/O, do not advance later sibling
343+
// planners yet. This preserves the logical `MorselPlan` order:
344+
// direct morsels first, then child planners in API order.
345+
if self.preserve_order
346+
&& (self.reader.is_some()
347+
|| !self.morsels.is_empty()
348+
|| !self.waiting_planners.is_empty())
349+
{
350+
break;
351+
}
310352
let Some(planner) = self.ready_planners.pop_front() else {
311353
break;
312354
};
@@ -337,6 +379,14 @@ impl FileStream {
337379
// planners get CPU time before we consider returning `Pending`.
338380
if !self.ready_planners.is_empty()
339381
&& self.morsels.len() < max_buffered_morsels()
382+
// In ordered mode, only loop back for more planner CPU when
383+
// there is no earlier reader, buffered morsel, or waiting I/O
384+
// that should be drained first. Otherwise, drop to
385+
// `start_next_morsel()` so output is produced in order.
386+
&& (!self.preserve_order
387+
|| (self.reader.is_none()
388+
&& self.morsels.is_empty()
389+
&& self.waiting_planners.is_empty()))
340390
{
341391
continue;
342392
}
@@ -780,10 +830,12 @@ mod tests {
780830

781831
/// Helper for morsel-driven `FileStream` tests that bundles the mock
782832
/// `Morselizer` setup with the corresponding `FileScanConfig`.
833+
#[derive(Clone)]
783834
struct MorselTest {
784835
morselizer: MockMorselizer,
785836
file_names: Vec<String>,
786837
observer: Option<MorselObserver>,
838+
preserve_order: bool,
787839
}
788840

789841
impl MorselTest {
@@ -792,6 +844,7 @@ mod tests {
792844
morselizer: MockMorselizer::new(),
793845
file_names: vec![],
794846
observer: None,
847+
preserve_order: false,
795848
}
796849
}
797850

@@ -802,16 +855,54 @@ mod tests {
802855
self
803856
}
804857

858+
fn reset_observer(mut self) -> Self {
859+
let observer = MorselObserver::new();
860+
self.morselizer = self.morselizer.with_observer(observer);
861+
self
862+
}
863+
805864
fn with_observer(mut self, observer: MorselObserver) -> Self {
806865
self.morselizer = self.morselizer.with_observer(observer.clone());
807866
self.observer = Some(observer);
808867
self
809868
}
810869

870+
fn with_preserve_order(mut self, preserve_order: bool) -> Self {
871+
self.preserve_order = preserve_order;
872+
self
873+
}
874+
811875
async fn run(self) -> Result<String> {
812876
let file_names = self.file_names.iter().map(String::as_str).collect();
813877
let config = test_config(file_names);
814-
let output = run_stream(self.morselizer, config).await?;
878+
let metrics_set = ExecutionPlanMetricsSet::new();
879+
let mut stream = FileStream::new_with_morselizer(
880+
&config,
881+
0,
882+
Box::new(self.morselizer),
883+
&metrics_set,
884+
)?
885+
.with_preserve_order(self.preserve_order);
886+
887+
let mut stream_contents = Vec::new();
888+
while let Some(result) = stream.next().await {
889+
match result {
890+
Ok(batch) => {
891+
// Each batch should have a single int32 column with the
892+
// mocked batch id, which keeps snapshot output compact.
893+
let col = batch.column(0).as_primitive::<Int32Type>();
894+
assert_eq!(col.len(), 1);
895+
assert!(col.is_valid(0));
896+
let batch_id = col.value(0);
897+
stream_contents.push(format!("Batch: {batch_id}"));
898+
}
899+
Err(e) => {
900+
stream_contents.push(format!("Error: {e}"));
901+
}
902+
}
903+
}
904+
stream_contents.push("Done".to_string());
905+
let output = stream_contents.join("\n");
815906

816907
// Snapshot both the produced output and the scheduler trace
817908
// together. This makes scheduler changes much easier to review than
@@ -998,12 +1089,14 @@ mod tests {
9981089
let observer = MorselObserver::new();
9991090
let planner_1 = MockPlanner::builder()
10001091
.with_id(PlannerId(1))
1092+
// Note IO required 2 polls
10011093
.return_plan(ReturnPlanBuilder::new().with_io(IoFutureId(100), 2))
10021094
.return_morsel(MorselId(11), 41)
10031095
.return_none()
10041096
.build();
10051097
let planner_2 = MockPlanner::builder()
10061098
.with_id(PlannerId(2))
1099+
// IO only requies 1 poll, so it will resolve before planner 1's IO
10071100
.return_plan(ReturnPlanBuilder::new().with_io(IoFutureId(101), 1)) // IO returns after 1 poll
10081101
.return_morsel(MorselId(12), 42)
10091102
.return_none()
@@ -1023,9 +1116,9 @@ mod tests {
10231116
.with_file("file1.parquet", parent_planner)
10241117
.with_observer(observer.clone());
10251118

1026-
// Expect both futures to be polled, but second planner (42) batch to be
1119+
// Expect both futures to be polled, but second planner's (42) batch to be
10271120
// produced first
1028-
insta::assert_snapshot!(test.run().await.unwrap(), @r"
1121+
insta::assert_snapshot!(test.clone().run().await.unwrap(), @r"
10291122
----- Output Stream -----
10301123
Batch: 42
10311124
Batch: 41
@@ -1062,6 +1155,48 @@ mod tests {
10621155
morsel_stream_finished: MorselId(11)
10631156
");
10641157

1158+
// Run same test using `with_preserve_order(true)`, but expect the first
1159+
// planner's batch (41) to be produced before the second's (42), even
1160+
// though the second planner's I/O resolves first.
1161+
let test = test.reset_observer().with_preserve_order(true);
1162+
1163+
insta::assert_snapshot!(test.run().await.unwrap(), @r"
1164+
----- Output Stream -----
1165+
Batch: 41
1166+
Batch: 42
1167+
Done
1168+
----- File Stream Events -----
1169+
morselize_file: file1.parquet
1170+
planner_created: PlannerId(0)
1171+
planner_called: PlannerId(0)
1172+
planner_produced_child: PlannerId(0) -> PlannerId(1)
1173+
planner_produced_child: PlannerId(0) -> PlannerId(2)
1174+
planner_called: PlannerId(0)
1175+
planner_called: PlannerId(1)
1176+
io_future_created: PlannerId(1), IoFutureId(100)
1177+
planner_called: PlannerId(2)
1178+
io_future_created: PlannerId(2), IoFutureId(101)
1179+
io_future_polled: PlannerId(1), IoFutureId(100)
1180+
io_future_polled: PlannerId(2), IoFutureId(101)
1181+
io_future_polled: PlannerId(1), IoFutureId(100)
1182+
io_future_polled: PlannerId(2), IoFutureId(101)
1183+
io_future_resolved: PlannerId(2), IoFutureId(101)
1184+
planner_called: PlannerId(2)
1185+
morsel_produced: PlannerId(2), MorselId(12)
1186+
planner_called: PlannerId(2)
1187+
io_future_polled: PlannerId(1), IoFutureId(100)
1188+
io_future_resolved: PlannerId(1), IoFutureId(100)
1189+
planner_called: PlannerId(1)
1190+
morsel_produced: PlannerId(1), MorselId(11)
1191+
planner_called: PlannerId(1)
1192+
morsel_stream_started: MorselId(12)
1193+
morsel_stream_batch_produced: MorselId(12), BatchId(42)
1194+
morsel_stream_finished: MorselId(12)
1195+
morsel_stream_started: MorselId(11)
1196+
morsel_stream_batch_produced: MorselId(11), BatchId(41)
1197+
morsel_stream_finished: MorselId(11)
1198+
");
1199+
10651200
Ok(())
10661201
}
10671202

@@ -1146,44 +1281,6 @@ mod tests {
11461281
.build()
11471282
}
11481283

1149-
/// Creates a [`FileStream`] for reading the specified config, reads all
1150-
/// record batches, and returns a stringified version of the results for
1151-
/// easy comparison in tests.
1152-
///
1153-
/// If the FileStream returns an error during execution, formats that error
1154-
/// in the output stream, rather than returning an error
1155-
async fn run_stream(
1156-
morselizer: MockMorselizer,
1157-
config: FileScanConfig,
1158-
) -> Result<String> {
1159-
let metrics_set = ExecutionPlanMetricsSet::new();
1160-
let mut stream = FileStream::new_with_morselizer(
1161-
&config,
1162-
0,
1163-
Box::new(morselizer),
1164-
&metrics_set,
1165-
)?;
1166-
1167-
let mut stream_contents = Vec::new();
1168-
while let Some(result) = stream.next().await {
1169-
match result {
1170-
Ok(batch) => {
1171-
// each batch should have a single int32 column with batch id
1172-
let col = batch.column(0).as_primitive::<Int32Type>();
1173-
assert_eq!(col.len(), 1);
1174-
assert!(col.is_valid(0));
1175-
let batch_id = col.value(0);
1176-
stream_contents.push(format!("Batch: {batch_id}"));
1177-
}
1178-
Err(e) => {
1179-
stream_contents.push(format!("Error: {e}"));
1180-
}
1181-
}
1182-
}
1183-
stream_contents.push("Done".to_string());
1184-
Ok(stream_contents.join("\n"))
1185-
}
1186-
11871284
#[tokio::test]
11881285
async fn on_error_opening() -> Result<()> {
11891286
let batches = FileStreamTest::new()

datafusion/datasource/src/morsel/mod.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,12 @@ pub trait Morselizer: Send + Sync + Debug {
6565
/// single scan of the file. Returning multiple MorselPlanners allows for
6666
/// multiple concurrent scans of the same file.
6767
///
68-
/// This may involve CPU work, such as parsing parquet metadata and evaluating pruning predicates.
69-
/// It should NOT do any IO work, such as reading from the file. If IO is required, it should
70-
/// return a future that the caller can poll to drive the IO work to completion, and once the future
71-
/// is complete, the caller can call `morselize` again to get the next morsels.
68+
/// This may involve CPU work, such as parsing parquet metadata and
69+
/// evaluating pruning predicates. It should NOT do any IO work, such as
70+
/// reading from the file. If IO is required, it should return a future that
71+
/// the caller can poll to drive the IO work to completion, and once the
72+
/// future is complete, the caller can call `morselize` again to get the
73+
/// next morsels.
7274
fn morselize(&self, file: PartitionedFile) -> Result<Vec<Box<dyn MorselPlanner>>>;
7375
}
7476

@@ -103,22 +105,24 @@ pub trait MorselPlanner: Send + Debug {
103105
///
104106
/// Returns `None` if the MorselPlanner has no more work to do (is done).
105107
///
106-
/// # Notes:
108+
/// # Empty Morsel Plans
109+
///
107110
/// It may return Some(..) with an empty MorselPlan, which means it is ready
108111
/// for more CPU work and should be called again.
109112
///
110-
/// See the comments on [`MorselPlan`] for ordering
113+
/// # Output Ordering
114+
///
115+
/// See the comments on [`MorselPlan`] for the logical output order
111116
fn plan(&mut self) -> Result<Option<MorselPlan>>;
112117
}
113118

114119
/// Return result of [`MorselPlanner::plan`]
115120
///
116-
/// # Ordering
121+
/// # Logical Ordering
117122
/// For plans where the output order of rows is maintained, the output order of
118123
/// a [`MorselPlanner`] is logically defined as follows:
119124
/// 1. All morsels that are directly produced
120125
/// 2. (recursively) All morsels produced by the returned `planners`
121-
///
122126
#[derive(Default)]
123127
pub struct MorselPlan {
124128
/// Any Morsels that are ready for processing.

0 commit comments

Comments
 (0)