Skip to content

Commit 8dee0b3

Browse files
committed
Cleanup
1 parent 9629955 commit 8dee0b3

File tree

2 files changed

+36
-51
lines changed

2 files changed

+36
-51
lines changed

datafusion/datasource/src/file_stream.rs

Lines changed: 25 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -663,8 +663,8 @@ impl FileStreamMetrics {
663663
mod tests {
664664
use crate::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
665665
use crate::morsel::test_utils::{
666-
IoFutureId, MockMorselSpec, MockMorselizer, MockPlanner, MorselId,
667-
MorselObserver, PlannerId, ReturnPlanBuilder,
666+
IoFutureId, MockMorselSpec, MockMorselizer, MockPlanner, MorselId, PlannerId,
667+
ReturnPlanBuilder,
668668
};
669669
use crate::tests::make_partition;
670670
use crate::{PartitionedFile, TableSchema};
@@ -834,7 +834,6 @@ mod tests {
834834
struct MorselTest {
835835
morselizer: MockMorselizer,
836836
file_names: Vec<String>,
837-
observer: Option<MorselObserver>,
838837
preserve_order: bool,
839838
}
840839

@@ -843,7 +842,6 @@ mod tests {
843842
Self {
844843
morselizer: MockMorselizer::new(),
845844
file_names: vec![],
846-
observer: None,
847845
preserve_order: false,
848846
}
849847
}
@@ -855,24 +853,18 @@ mod tests {
855853
self
856854
}
857855

858-
fn reset_observer(mut self) -> Self {
859-
let observer = MorselObserver::new();
860-
self.morselizer = self.morselizer.with_observer(observer);
861-
self
862-
}
863-
864-
fn with_observer(mut self, observer: MorselObserver) -> Self {
865-
self.morselizer = self.morselizer.with_observer(observer.clone());
866-
self.observer = Some(observer);
867-
self
868-
}
869-
870856
fn with_preserve_order(mut self, preserve_order: bool) -> Self {
871857
self.preserve_order = preserve_order;
872858
self
873859
}
874860

875861
async fn run(self) -> Result<String> {
862+
// handle to shared observer
863+
let observer = self.morselizer.observer().clone();
864+
// Clear any prior observer events before running the test, so the
865+
// snapshot only includes events from this run.
866+
observer.clear();
867+
876868
let file_names = self.file_names.iter().map(String::as_str).collect();
877869
let config = test_config(file_names);
878870
let metrics_set = ExecutionPlanMetricsSet::new();
@@ -908,10 +900,8 @@ mod tests {
908900
// together. This makes scheduler changes much easier to review than
909901
// maintaining long hand-written event assertions separately.
910902
let mut parts = vec!["----- Output Stream -----".to_string(), output];
911-
if let Some(observer) = self.observer {
912903
parts.push("----- File Stream Events -----".to_string());
913904
parts.push(observer.format_events());
914-
}
915905
Ok(parts.join("\n"))
916906
}
917907
}
@@ -920,7 +910,6 @@ mod tests {
920910
/// morsel immediately, and the morsel is then scanned to completion.
921911
#[tokio::test]
922912
async fn morsel_framework_single_morsel_no_io() -> Result<()> {
923-
let observer = MorselObserver::new();
924913
let test = MorselTest::new()
925914
.with_file(
926915
"file1.parquet",
@@ -929,8 +918,7 @@ mod tests {
929918
.return_morsel(MorselId(10), 42)
930919
.return_none()
931920
.build(),
932-
)
933-
.with_observer(observer.clone());
921+
);
934922

935923
insta::assert_snapshot!(test.run().await.unwrap(), @r"
936924
----- Output Stream -----
@@ -954,7 +942,6 @@ mod tests {
954942
/// then produce its morsel.
955943
#[tokio::test]
956944
async fn morsel_framework_single_morsel_io() -> Result<()> {
957-
let observer = MorselObserver::new();
958945
let test = MorselTest::new()
959946
.with_file(
960947
"file1.parquet",
@@ -964,8 +951,7 @@ mod tests {
964951
.return_morsel(MorselId(10), 42)
965952
.return_none()
966953
.build(),
967-
)
968-
.with_observer(observer.clone());
954+
);
969955

970956
insta::assert_snapshot!(test.run().await.unwrap(), @r"
971957
----- Output Stream -----
@@ -995,7 +981,6 @@ mod tests {
995981
/// the Parquet morsel planner.
996982
#[tokio::test]
997983
async fn morsel_framework_two_cpu_steps_before_morsel() -> Result<()> {
998-
let observer = MorselObserver::new();
999984
let test = MorselTest::new()
1000985
.with_file(
1001986
"file1.parquet",
@@ -1006,8 +991,7 @@ mod tests {
1006991
.return_morsel(MorselId(10), 42)
1007992
.return_none()
1008993
.build(),
1009-
)
1010-
.with_observer(observer.clone());
994+
);
1011995

1012996
insta::assert_snapshot!(test.run().await.unwrap(), @r"
1013997
----- Output Stream -----
@@ -1033,7 +1017,6 @@ mod tests {
10331017
/// batches produced by any returned child planners.
10341018
#[tokio::test]
10351019
async fn morsel_framework_morsels_before_child_planner() -> Result<()> {
1036-
let observer = MorselObserver::new();
10371020
let child_planner = MockPlanner::builder()
10381021
.with_id(PlannerId(1))
10391022
.return_morsel(MorselId(11), 43)
@@ -1052,8 +1035,7 @@ mod tests {
10521035
.build();
10531036

10541037
let test = MorselTest::new()
1055-
.with_file("file1.parquet", parent_planner)
1056-
.with_observer(observer.clone());
1038+
.with_file("file1.parquet", parent_planner);
10571039

10581040
insta::assert_snapshot!(test.run().await.unwrap(), @r"
10591041
----- Output Stream -----
@@ -1086,7 +1068,6 @@ mod tests {
10861068
/// second planner's batches are emitted first.
10871069
#[tokio::test]
10881070
async fn morsel_framework_child_planner_reorder() -> Result<()> {
1089-
let observer = MorselObserver::new();
10901071
let planner_1 = MockPlanner::builder()
10911072
.with_id(PlannerId(1))
10921073
// Note IO required 2 polls
@@ -1113,8 +1094,7 @@ mod tests {
11131094
.build();
11141095

11151096
let test = MorselTest::new()
1116-
.with_file("file1.parquet", parent_planner)
1117-
.with_observer(observer.clone());
1097+
.with_file("file1.parquet", parent_planner);
11181098

11191099
// Expect both futures to be polled, but second planner's (42) batch to be
11201100
// produced first
@@ -1158,7 +1138,7 @@ mod tests {
11581138
// Run same test using `with_preserve_order(true)`, but expect the first
11591139
// planner's batch (41) to be produced before the second's (42), even
11601140
// though the second planner's I/O resolves first.
1161-
let test = test.reset_observer().with_preserve_order(true);
1141+
let test = test.with_preserve_order(true);
11621142

11631143
insta::assert_snapshot!(test.run().await.unwrap(), @r"
11641144
----- Output Stream -----
@@ -1174,27 +1154,27 @@ mod tests {
11741154
planner_called: PlannerId(0)
11751155
planner_called: PlannerId(1)
11761156
io_future_created: PlannerId(1), IoFutureId(100)
1157+
io_future_polled: PlannerId(1), IoFutureId(100)
1158+
io_future_polled: PlannerId(1), IoFutureId(100)
1159+
io_future_polled: PlannerId(1), IoFutureId(100)
1160+
io_future_resolved: PlannerId(1), IoFutureId(100)
11771161
planner_called: PlannerId(2)
11781162
io_future_created: PlannerId(2), IoFutureId(101)
1179-
io_future_polled: PlannerId(1), IoFutureId(100)
11801163
io_future_polled: PlannerId(2), IoFutureId(101)
1181-
io_future_polled: PlannerId(1), IoFutureId(100)
11821164
io_future_polled: PlannerId(2), IoFutureId(101)
11831165
io_future_resolved: PlannerId(2), IoFutureId(101)
1184-
planner_called: PlannerId(2)
1185-
morsel_produced: PlannerId(2), MorselId(12)
1186-
planner_called: PlannerId(2)
1187-
io_future_polled: PlannerId(1), IoFutureId(100)
1188-
io_future_resolved: PlannerId(1), IoFutureId(100)
11891166
planner_called: PlannerId(1)
11901167
morsel_produced: PlannerId(1), MorselId(11)
11911168
planner_called: PlannerId(1)
1192-
morsel_stream_started: MorselId(12)
1193-
morsel_stream_batch_produced: MorselId(12), BatchId(42)
1194-
morsel_stream_finished: MorselId(12)
11951169
morsel_stream_started: MorselId(11)
11961170
morsel_stream_batch_produced: MorselId(11), BatchId(41)
11971171
morsel_stream_finished: MorselId(11)
1172+
planner_called: PlannerId(2)
1173+
morsel_produced: PlannerId(2), MorselId(12)
1174+
planner_called: PlannerId(2)
1175+
morsel_stream_started: MorselId(12)
1176+
morsel_stream_batch_produced: MorselId(12), BatchId(42)
1177+
morsel_stream_finished: MorselId(12)
11981178
");
11991179

12001180
Ok(())
@@ -1205,7 +1185,6 @@ mod tests {
12051185
/// second.
12061186
#[tokio::test]
12071187
async fn morsel_framework_two_files_overlapping_io() -> Result<()> {
1208-
let observer = MorselObserver::new();
12091188
let test = MorselTest::new()
12101189
.with_file(
12111190
"file1.parquet",
@@ -1224,8 +1203,7 @@ mod tests {
12241203
.return_morsel(MorselId(11), 43)
12251204
.return_none()
12261205
.build(),
1227-
)
1228-
.with_observer(observer.clone());
1206+
);
12291207

12301208
insta::assert_snapshot!(test.run().await.unwrap(), @r"
12311209
----- Output Stream -----

datafusion/datasource/src/morsel/test_utils.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,18 @@ impl MorselObserver {
109109
Self::default()
110110
}
111111

112+
/// Clear any previously buffered events
113+
pub fn clear(&self) {
114+
self.events.lock().unwrap().clear();
115+
}
116+
117+
/// Push a new [`MorselEvent`]
118+
112119
pub fn push(&self, event: MorselEvent) {
113120
self.events.lock().unwrap().push(event);
114121
}
115122

123+
/// Return a copy of the current list of [`MorselEvents`]
116124
pub fn events(&self) -> Vec<MorselEvent> {
117125
self.events.lock().unwrap().clone()
118126
}
@@ -151,10 +159,9 @@ impl MockMorselizer {
151159
}
152160
}
153161

154-
/// Set the observer
155-
pub fn with_observer(mut self, observer: MorselObserver) -> Self {
156-
self.observer = observer;
157-
self
162+
/// Return a reference to the observer
163+
pub fn observer(&self) -> &MorselObserver {
164+
&self.observer
158165
}
159166

160167
/// Add the description of how a file will be planned

0 commit comments

Comments
 (0)