Skip to content

Commit add50da

Browse files
committed
less abstraction
1 parent 451f335 commit add50da

File tree

2 files changed

+41
-31
lines changed

2 files changed

+41
-31
lines changed

datafusion/datasource/src/file_stream.rs

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ pub struct FileStream {
6262
/// Any morsels that have already been prepared for processing
6363
/// (TODO steal morsels from other streams)
6464
morsels: Vec<Box<dyn Morsel>>,
65-
/// The current RecordBatches, if any
65+
/// The current reader, if any
6666
reader: Option<BoxStream<'static, Result<RecordBatch>>>,
6767
/// The stream schema (file schema including partition columns and after
6868
/// projection).
@@ -133,7 +133,8 @@ impl FileStream {
133133
.expect("planner queue checked to be non-empty");
134134
self.file_stream_metrics.time_opening.start();
135135
self.state = FileStreamState::Active {
136-
morsel_plan: MorselPlan::new(planner),
136+
planner,
137+
io_future: None,
137138
};
138139
Ok(true)
139140
}
@@ -155,7 +156,8 @@ impl FileStream {
155156
.expect("planner queue checked to be non-empty");
156157
self.file_stream_metrics.time_opening.start();
157158
self.state = FileStreamState::Active {
158-
morsel_plan: MorselPlan::new(planner),
159+
planner,
160+
io_future: None,
159161
};
160162
Ok(())
161163
}
@@ -183,19 +185,16 @@ impl FileStream {
183185
/// stream is either pending or has produced/failed with a batch.
184186
fn poll_active_state(
185187
&mut self,
186-
mut morsel_plan: MorselPlan,
188+
planner: Arc<dyn MorselPlanner>,
189+
mut io_future: Option<BoxFuture<'static, Result<()>>>,
187190
cx: &mut Context<'_>,
188191
) -> StatePoll {
189192
// State transitions for the active planner:
190193
// 1. Poll any outstanding planner I/O before touching batch readers.
191-
// 2. If a morsel is already ready, promote/drain it before asking the
192-
// planner for more work.
193-
// 3. If there is no queued morsel, run the planner's CPU-only `plan`
194-
// step to produce more work or discover completion.
195194

196195
let mut io_completed = false;
197-
if let Some(mut io_future) = morsel_plan.take_io_future() {
198-
match io_future.as_mut().poll(cx) {
196+
if let Some(mut future) = io_future.take() {
197+
match future.as_mut().poll(cx) {
199198
Poll::Ready(Ok(())) => {
200199
self.file_stream_metrics.files_opened.add(1);
201200
self.file_stream_metrics.time_opening.stop();
@@ -217,25 +216,32 @@ impl FileStream {
217216
}
218217
}
219218
Poll::Pending => {
220-
morsel_plan.set_io_future(io_future);
221-
self.state = FileStreamState::Active { morsel_plan };
219+
self.state = FileStreamState::Active {
220+
planner,
221+
io_future: Some(future),
222+
};
222223
return StatePoll::Return(Poll::Pending);
223224
}
224225
}
225226
}
226227

228+
// If there is no active reader, start one from the next available morsel
227229
if self.reader.is_none() {
230+
// TODO: also try and drive any other planners to get more morsels /
231+
// discover more IO to before launching into the next morsel.
228232
if let Some(morsel) = self.morsels.pop() {
229233
self.reader = Some(morsel.into_stream());
230234
self.file_stream_metrics.time_scanning_until_data.start();
231235
self.file_stream_metrics.time_scanning_total.start();
232236
}
233237
}
234238

239+
240+
// drive to the next record batch from the active reader, if any
235241
if let Some(reader) = self.reader.as_mut() {
236242
match reader.poll_next_unpin(cx) {
237243
Poll::Pending => {
238-
self.state = FileStreamState::Active { morsel_plan };
244+
self.state = FileStreamState::Active { planner, io_future };
239245
return StatePoll::Return(Poll::Pending);
240246
}
241247
Poll::Ready(Some(Ok(batch))) => {
@@ -258,7 +264,7 @@ impl FileStream {
258264
None => batch,
259265
};
260266
self.file_stream_metrics.time_scanning_total.start();
261-
self.state = FileStreamState::Active { morsel_plan };
267+
self.state = FileStreamState::Active { planner, io_future };
262268
return StatePoll::Return(Poll::Ready(Some(Ok(batch))));
263269
}
264270
Poll::Ready(Some(Err(err))) => {
@@ -293,14 +299,16 @@ impl FileStream {
293299
}
294300
}
295301

296-
if io_completed || !morsel_plan.has_io_future() {
297-
let planner = morsel_plan.planner();
298-
let current_plan = mem::replace(&mut morsel_plan, MorselPlan::new(planner));
302+
if io_completed || io_future.is_none() {
303+
let current_plan = MorselPlan::new(Arc::clone(&planner));
299304
match current_plan.plan() {
300305
Ok(Some(mut new_plan)) => {
301306
self.morsels.extend(new_plan.take_morsels());
307+
let next_planner = new_plan.planner();
308+
let next_io_future = new_plan.take_io_future();
302309
self.state = FileStreamState::Active {
303-
morsel_plan: new_plan,
310+
planner: next_planner,
311+
io_future: next_io_future,
304312
};
305313
return StatePoll::Continue;
306314
}
@@ -329,7 +337,7 @@ impl FileStream {
329337
}
330338
}
331339

332-
self.state = FileStreamState::Active { morsel_plan };
340+
self.state = FileStreamState::Active { planner, io_future };
333341
StatePoll::Return(Poll::Pending)
334342
}
335343

@@ -340,10 +348,12 @@ impl FileStream {
340348
StatePoll::Continue => continue,
341349
StatePoll::Return(result) => return result,
342350
},
343-
FileStreamState::Active { morsel_plan } => match self.poll_active_state(morsel_plan, cx) {
344-
StatePoll::Continue => continue,
345-
StatePoll::Return(result) => return result,
346-
},
351+
FileStreamState::Active { planner, io_future } => {
352+
match self.poll_active_state(planner, io_future, cx) {
353+
StatePoll::Continue => continue,
354+
StatePoll::Return(result) => return result,
355+
}
356+
}
347357
FileStreamState::Error | FileStreamState::Limit => {
348358
return Poll::Ready(None);
349359
}
@@ -422,12 +432,12 @@ pub enum NextOpen {
422432
pub enum FileStreamState {
423433
/// The idle state, no file is currently being read
424434
Idle,
425-
/// Actively planning morsels
435+
/// Actively managing a single planner and its in-flight I/O, if any.
426436
Active {
427-
/// Plan to get more readers
428-
///
429-
/// TODO make make the number of outstanding plans configurable to allow more parallelism
430-
morsel_plan: MorselPlan
437+
/// Planner that will produce the next morsels for this file.
438+
planner: Arc<dyn MorselPlanner>,
439+
/// In-flight I/O for the active planner, if it is currently blocked.
440+
io_future: Option<BoxFuture<'static, Result<()>>>,
431441
},
432442
/// Encountered an error
433443
Error,

datafusion/datasource/src/morsel/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,9 @@ pub trait Morselizer: Send + Sync + Debug {
7474
/// A Morsel Planner is responsible for creating morsels for a given scan.
7575
///
7676
/// The MorselPlanner is the unit of I/O -- there is only ever a single IO
77-
/// outstanding for a MorselPlanner. DataFusion will potentially run multiple
78-
/// MorselPlanners in parallel which corresponds to multiple parallel I/O
79-
/// requests.
77+
/// outstanding for a specific MorselPlanner. DataFusion will potentially run
78+
/// multiple MorselPlanners in parallel which corresponds to multiple parallel
79+
/// I/O requests.
8080
///
8181
/// It is not a Rust [`Stream`] so that it can explicitly separate CPU bound
8282
/// work from IO work.

0 commit comments

Comments
 (0)