Skip to content

Commit e97e9ad

Browse files
committed
comments and test with IO
1 parent dda13e4 commit e97e9ad

File tree

2 files changed

+65
-2
lines changed

2 files changed

+65
-2
lines changed

datafusion/datasource/src/file_stream.rs

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -613,8 +613,8 @@ impl FileStreamMetrics {
613613
mod tests {
614614
use crate::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
615615
use crate::morsel::test_utils::{
616-
BatchId, MockMorselizer, MockPlanner, MorselEventsBuilder, MorselId,
617-
MorselObserver, PlannerId,
616+
BatchId, IoFutureId, MockMorselizer, MockPlanner, MorselEventsBuilder, MorselId,
617+
MorselObserver, PlannerId, ReturnPlanBuilder,
618618
};
619619
use crate::tests::make_partition;
620620
use crate::{PartitionedFile, TableSchema};
@@ -815,6 +815,49 @@ mod tests {
815815
Ok(())
816816
}
817817

818+
#[tokio::test]
819+
async fn morsel_framework_single_planner_single_io_then_morsel() -> Result<()> {
820+
let observer = MorselObserver::new();
821+
let morselizer = MockMorselizer::new()
822+
.with_file(
823+
"file1.parquet",
824+
MockPlanner::builder()
825+
.with_id(PlannerId(0))
826+
.return_plan(ReturnPlanBuilder::new().with_io(IoFutureId(100), 1))
827+
.return_morsel(MorselId(10), 42)
828+
.return_none()
829+
.build(),
830+
)
831+
.with_observer(observer.clone());
832+
833+
let config = test_config(vec!["file1.parquet"]);
834+
insta::assert_snapshot!(run_stream(morselizer, config).await.unwrap(), @r"
835+
**** Batch: 42
836+
**** Done
837+
");
838+
839+
assert_eq!(
840+
observer.events(),
841+
MorselEventsBuilder::new()
842+
.morselize_file("file1.parquet")
843+
.planner_created(PlannerId(0))
844+
.planner_called(PlannerId(0))
845+
.io_future_created(PlannerId(0), IoFutureId(100))
846+
.io_future_polled(PlannerId(0), IoFutureId(100))
847+
.io_future_polled(PlannerId(0), IoFutureId(100))
848+
.io_future_resolved(PlannerId(0), IoFutureId(100))
849+
.planner_called(PlannerId(0))
850+
.morsel_produced(PlannerId(0), MorselId(10))
851+
.planner_called(PlannerId(0))
852+
.morsel_stream_started(MorselId(10))
853+
.morsel_stream_batch_produced(MorselId(10), BatchId(42))
854+
.morsel_stream_finished(MorselId(10))
855+
.build()
856+
);
857+
858+
Ok(())
859+
}
860+
818861
fn test_config(file_names: Vec<&str>) -> FileScanConfig {
819862
let file_group = file_names
820863
.into_iter()

datafusion/datasource/src/morsel/test_utils.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,33 +370,53 @@ impl MockPlannerBuilder {
370370
/// Builder for `PlannerStep::ReturnPlan`.
371371
#[derive(Debug, Default)]
372372
pub struct ReturnPlanBuilder {
373+
/// Morsels that should be returned immediately by this planner step.
373374
morsels: Vec<MockMorselSpec>,
375+
/// Child planners that should be returned immediately by this planner step.
374376
planners: Vec<MockPlanner>,
377+
/// Identifier for the mock I/O future returned by this step, if any.
375378
io_future_id: Option<IoFutureId>,
379+
/// Number of `Poll::Pending` results the mock I/O future should yield
380+
/// before resolving successfully.
381+
///
382+
/// This is a deterministic test-only knob. It does not model elapsed time
383+
/// or bytes read; it only controls how many scheduler polls are required
384+
/// before the mock I/O future becomes ready.
376385
io_polls: usize,
377386
}
378387

379388
impl ReturnPlanBuilder {
389+
/// Create an empty return-plan builder.
380390
pub fn new() -> Self {
381391
Self::default()
382392
}
383393

394+
/// Add a morsel that should be returned immediately by this planner step.
384395
pub fn with_morsel(mut self, morsel: MockMorselSpec) -> Self {
385396
self.morsels.push(morsel);
386397
self
387398
}
388399

400+
/// Add a child planner that should be returned immediately by this step.
389401
pub fn with_planner(mut self, planner: MockPlanner) -> Self {
390402
self.planners.push(planner);
391403
self
392404
}
393405

406+
/// Return a mock I/O future from this step.
407+
///
408+
/// `io_future_id` is recorded in the emitted `MorselEvent`s so tests can
409+
/// distinguish multiple I/O phases from the same planner.
410+
///
411+
/// `io_polls` controls how many times that future returns `Poll::Pending`
412+
/// before it resolves with `Poll::Ready(Ok(()))`.
394413
pub fn with_io(mut self, io_future_id: IoFutureId, io_polls: usize) -> Self {
395414
self.io_future_id = Some(io_future_id);
396415
self.io_polls = io_polls;
397416
self
398417
}
399418

419+
/// Build the corresponding [`PlannerStep::ReturnPlan`]
400420
pub fn build(self) -> PlannerStep {
401421
PlannerStep::ReturnPlan {
402422
morsels: self.morsels,

0 commit comments

Comments
 (0)