Skip to content

Commit 5985aee

Browse files
committed
more tests
1 parent e97e9ad commit 5985aee

File tree

2 files changed

+51
-2
lines changed

2 files changed

+51
-2
lines changed

datafusion/datasource/src/file_stream.rs

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -779,7 +779,7 @@ mod tests {
779779
}
780780

781781
#[tokio::test]
782-
async fn morsel_framework_single_planner_single_morsel_no_io() -> Result<()> {
782+
async fn morsel_framework_single_morsel_no_io() -> Result<()> {
783783
let observer = MorselObserver::new();
784784
let morselizer = MockMorselizer::new()
785785
.with_file(
@@ -816,7 +816,7 @@ mod tests {
816816
}
817817

818818
#[tokio::test]
819-
async fn morsel_framework_single_planner_single_io_then_morsel() -> Result<()> {
819+
async fn morsel_framework_single_io() -> Result<()> {
820820
let observer = MorselObserver::new();
821821
let morselizer = MockMorselizer::new()
822822
.with_file(
@@ -843,7 +843,9 @@ mod tests {
843843
.planner_created(PlannerId(0))
844844
.planner_called(PlannerId(0))
845845
.io_future_created(PlannerId(0), IoFutureId(100))
846+
// `with_io(IoFutureId(100), 1)` means the first poll is pending.
846847
.io_future_polled(PlannerId(0), IoFutureId(100))
848+
// The second poll resolves the future.
847849
.io_future_polled(PlannerId(0), IoFutureId(100))
848850
.io_future_resolved(PlannerId(0), IoFutureId(100))
849851
.planner_called(PlannerId(0))
@@ -858,6 +860,47 @@ mod tests {
858860
Ok(())
859861
}
860862

863+
#[tokio::test]
864+
async fn morsel_framework_two_cpu_steps_before_morsel() -> Result<()> {
865+
let observer = MorselObserver::new();
866+
let morselizer = MockMorselizer::new()
867+
.with_file(
868+
"file1.parquet",
869+
MockPlanner::builder()
870+
.with_id(PlannerId(0))
871+
.return_plan(ReturnPlanBuilder::new())
872+
.return_plan(ReturnPlanBuilder::new())
873+
.return_morsel(MorselId(10), 42)
874+
.return_none()
875+
.build(),
876+
)
877+
.with_observer(observer.clone());
878+
879+
let config = test_config(vec!["file1.parquet"]);
880+
insta::assert_snapshot!(run_stream(morselizer, config).await.unwrap(), @r"
881+
**** Batch: 42
882+
**** Done
883+
");
884+
885+
assert_eq!(
886+
observer.events(),
887+
MorselEventsBuilder::new()
888+
.morselize_file("file1.parquet")
889+
.planner_created(PlannerId(0))
890+
.planner_called(PlannerId(0))
891+
.planner_called(PlannerId(0))
892+
.planner_called(PlannerId(0))
893+
.morsel_produced(PlannerId(0), MorselId(10))
894+
.planner_called(PlannerId(0))
895+
.morsel_stream_started(MorselId(10))
896+
.morsel_stream_batch_produced(MorselId(10), BatchId(42))
897+
.morsel_stream_finished(MorselId(10))
898+
.build()
899+
);
900+
901+
Ok(())
902+
}
903+
861904
fn test_config(file_names: Vec<&str>) -> FileScanConfig {
862905
let file_group = file_names
863906
.into_iter()

datafusion/datasource/src/morsel/test_utils.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,12 @@ impl ReturnPlanBuilder {
410410
///
411411
/// `io_polls` controls how many times that future returns `Poll::Pending`
412412
/// before it resolves with `Poll::Ready(Ok(()))`.
413+
///
414+
/// For example, `with_io(id, 1)` means:
415+
/// - first poll: `Poll::Pending`
416+
/// - second poll: `Poll::Ready(Ok(()))`
417+
///
418+
/// So the total number of polls observed in the trace is `io_polls + 1`.
413419
pub fn with_io(mut self, io_future_id: IoFutureId, io_polls: usize) -> Self {
414420
self.io_future_id = Some(io_future_id);
415421
self.io_polls = io_polls;

0 commit comments

Comments
 (0)