@@ -1317,21 +1317,29 @@ mod tests {
13171317 /// Helper for multi-stream morsel tests that share one
13181318 /// [`SharedFileStreamState`].
13191319 #[ derive( Clone ) ]
1320- struct SiblingMorselTest {
1320+ struct MultiStreamMorselTest {
13211321 /// Shared mock morselizer used by all sibling streams in the test.
13221322 morselizer : MockMorselizer ,
13231323 /// Per-partition file assignments used to build one sibling
13241324 /// `FileStream` per partition.
13251325 partitions : Vec < Vec < String > > ,
1326+ /// The sequence of sibling streams to poll while exercising the
1327+ /// stealing scenario under test.
1328+ reads : Vec < TestStreamId > ,
13261329 }
13271330
1328- impl SiblingMorselTest {
1331+ /// Identifies one sibling stream in a [`MultiStreamMorselTest`].
1332+ #[ derive( Debug , Clone , Copy ) ]
1333+ struct TestStreamId ( usize ) ;
1334+
1335+ impl MultiStreamMorselTest {
13291336 /// Create a sibling-stream test harness with `num_partitions`
13301337 /// independent `FileStream`s.
13311338 fn new ( num_partitions : usize ) -> Self {
13321339 Self {
13331340 morselizer : MockMorselizer :: new ( ) ,
13341341 partitions : vec ! [ vec![ ] ; num_partitions] ,
1342+ reads : vec ! [ ] ,
13351343 }
13361344 }
13371345
@@ -1351,6 +1359,13 @@ mod tests {
13511359 self
13521360 }
13531361
1362+ /// Configure the order in which sibling streams are polled while the
1363+ /// test scenario is executing.
1364+ fn with_reads ( mut self , reads : Vec < TestStreamId > ) -> Self {
1365+ self . reads = reads;
1366+ self
1367+ }
1368+
13541369 /// Build a multi-partition `FileScanConfig` matching the configured
13551370 /// sibling test layout.
13561371 fn test_config ( & self ) -> FileScanConfig {
@@ -1378,7 +1393,7 @@ mod tests {
13781393 /// This is the core helper for stealing tests: separate streams have
13791394 /// distinct local queues, but share the same outstanding-I/O budget
13801395 /// and shared ready-work queues.
1381- fn build_streams ( self ) -> Result < ( MorselObserver , Vec < FileStream > ) > {
1396+ fn build_streams ( & self ) -> Result < ( MorselObserver , Vec < FileStream > ) > {
13821397 let observer = self . morselizer . observer ( ) . clone ( ) ;
13831398 observer. clear ( ) ;
13841399
@@ -1401,6 +1416,44 @@ mod tests {
14011416
14021417 Ok ( ( observer, streams) )
14031418 }
1419+
1420+ /// Run the configured poll sequence and format the per-stream outputs
1421+ /// plus shared scheduler events into one snapshot string.
1422+ async fn run ( self ) -> Result < String > {
1423+ let reads = self . reads . clone ( ) ;
1424+ let ( observer, mut streams) = self . build_streams ( ) ?;
1425+ let mut outputs = vec ! [ vec![ ] ; streams. len( ) ] ;
1426+
1427+ for stream_id in reads {
1428+ let batch_id = next_batch_id ( & mut streams[ stream_id. 0 ] ) . await ?;
1429+ assert ! (
1430+ batch_id. is_some( ) ,
1431+ "expected stream {:?} to produce a batch" ,
1432+ stream_id
1433+ ) ;
1434+ outputs[ stream_id. 0 ] . push ( batch_id. unwrap ( ) ) ;
1435+ }
1436+
1437+ for stream in & mut streams {
1438+ assert_eq ! ( next_batch_id( stream) . await ?, None ) ;
1439+ }
1440+
1441+ let mut parts = vec ! [ ] ;
1442+ for ( idx, output) in outputs. iter ( ) . enumerate ( ) {
1443+ parts. push ( format ! ( "----- Stream {idx} Output -----" ) ) ;
1444+ parts. push (
1445+ output
1446+ . iter ( )
1447+ . map ( |batch_id| format ! ( "Batch: {batch_id}" ) )
1448+ . chain ( std:: iter:: once ( "Done" . to_string ( ) ) )
1449+ . collect :: < Vec < _ > > ( )
1450+ . join ( "\n " ) ,
1451+ ) ;
1452+ }
1453+ parts. push ( "----- File Stream Events -----" . to_string ( ) ) ;
1454+ parts. push ( observer. format_summary_events ( ) ) ;
1455+ Ok ( parts. join ( "\n " ) )
1456+ }
14041457 }
14051458
14061459 /// Read the next single-row batch from a test stream and return its batch
@@ -1415,33 +1468,6 @@ mod tests {
14151468 } ) )
14161469 }
14171470
1418- /// Format the outputs of two sibling streams plus the shared scheduler
1419- /// event trace into one snapshot string.
1420- fn format_sibling_outputs (
1421- stream_0 : & [ i32 ] ,
1422- stream_1 : & [ i32 ] ,
1423- observer : & MorselObserver ,
1424- ) -> String {
1425- [
1426- "----- Stream 0 Output -----" . to_string ( ) ,
1427- stream_0
1428- . iter ( )
1429- . map ( |batch_id| format ! ( "Batch: {batch_id}" ) )
1430- . chain ( std:: iter:: once ( "Done" . to_string ( ) ) )
1431- . collect :: < Vec < _ > > ( )
1432- . join ( "\n " ) ,
1433- "----- Stream 1 Output -----" . to_string ( ) ,
1434- stream_1
1435- . iter ( )
1436- . map ( |batch_id| format ! ( "Batch: {batch_id}" ) )
1437- . chain ( std:: iter:: once ( "Done" . to_string ( ) ) )
1438- . collect :: < Vec < _ > > ( )
1439- . join ( "\n " ) ,
1440- "----- File Stream Events -----" . to_string ( ) ,
1441- observer. format_summary_events ( ) ,
1442- ]
1443- . join ( "\n " )
1444- }
14451471
14461472 /// Verifies the simplest morsel-driven flow: one planner produces one
14471473 /// morsel immediately, and the morsel is then scanned to completion.
@@ -1841,31 +1867,27 @@ mod tests {
18411867 /// it has no local files of its own.
18421868 #[ tokio:: test]
18431869 async fn morsel_framework_sibling_stream_steals_when_only_one_has_files ( ) -> Result < ( ) > {
1844- let test = SiblingMorselTest :: new ( 2 ) . with_file_in_partition (
1845- 0 ,
1846- "file1.parquet" ,
1847- MockPlanner :: builder ( )
1848- . with_id ( PlannerId ( 0 ) )
1849- . return_plan (
1850- ReturnPlanBuilder :: new ( )
1851- . with_morsel ( MockMorselSpec :: single_batch ( MorselId ( 10 ) , 41 ) )
1852- . with_morsel ( MockMorselSpec :: single_batch ( MorselId ( 11 ) , 42 ) ) ,
1853- )
1854- . return_none ( )
1855- . build ( ) ,
1856- ) ;
1857-
1858- let ( observer, mut streams) = test. build_streams ( ) ?;
1859- let mut stream_0 = vec ! [ ] ;
1860- let mut stream_1 = vec ! [ ] ;
1861-
1862- stream_0. push ( next_batch_id ( & mut streams[ 0 ] ) . await ?. unwrap ( ) ) ;
1863- stream_1. push ( next_batch_id ( & mut streams[ 1 ] ) . await ?. unwrap ( ) ) ;
1864- assert_eq ! ( next_batch_id( & mut streams[ 0 ] ) . await ?, None ) ;
1865- assert_eq ! ( next_batch_id( & mut streams[ 1 ] ) . await ?, None ) ;
1870+ let test = MultiStreamMorselTest :: new ( 2 )
1871+ . with_file_in_partition (
1872+ 0 ,
1873+ "file1.parquet" ,
1874+ MockPlanner :: builder ( )
1875+ . with_id ( PlannerId ( 0 ) )
1876+ . return_plan (
1877+ ReturnPlanBuilder :: new ( )
1878+ . return_morsel ( MorselId ( 10 ) , 41 )
1879+ . return_morsel ( MorselId ( 11 ) , 42 ) ,
1880+ )
1881+ . return_none ( )
1882+ . build ( ) ,
1883+ )
1884+ // Poll sibling 0 first so it discovers the file and publishes
1885+ // ready morsels. Poll sibling 1 next: because it has no local
1886+ // files, any batch it returns must have been stolen from sibling 0.
1887+ . with_reads ( vec ! [ TestStreamId ( 0 ) , TestStreamId ( 1 ) ] ) ;
18661888
18671889 insta:: assert_snapshot!(
1868- format_sibling_outputs ( & stream_0 , & stream_1 , & observer ) ,
1890+ test . run ( ) . await . unwrap ( ) ,
18691891 @r"
18701892 ----- Stream 0 Output -----
18711893 Batch: 41
@@ -1891,22 +1913,16 @@ mod tests {
18911913 #[ tokio:: test]
18921914 async fn morsel_framework_sibling_stream_steals_while_own_file_waits_on_io (
18931915 ) -> Result < ( ) > {
1894- let test = SiblingMorselTest :: new ( 2 )
1916+ let test = MultiStreamMorselTest :: new ( 2 )
18951917 . with_file_in_partition (
18961918 0 ,
18971919 "fast.parquet" ,
18981920 MockPlanner :: builder ( )
18991921 . with_id ( PlannerId ( 0 ) )
19001922 . return_plan (
19011923 ReturnPlanBuilder :: new ( )
1902- . with_morsel ( MockMorselSpec :: single_batch (
1903- MorselId ( 10 ) ,
1904- 41 ,
1905- ) )
1906- . with_morsel ( MockMorselSpec :: single_batch (
1907- MorselId ( 11 ) ,
1908- 42 ,
1909- ) ) ,
1924+ . return_morsel ( MorselId ( 10 ) , 41 )
1925+ . return_morsel ( MorselId ( 11 ) , 42 ) ,
19101926 )
19111927 . return_none ( )
19121928 . build ( ) ,
@@ -1920,20 +1936,16 @@ mod tests {
19201936 . return_morsel ( MorselId ( 12 ) , 51 )
19211937 . return_none ( )
19221938 . build ( ) ,
1923- ) ;
1924-
1925- let ( observer, mut streams) = test. build_streams ( ) ?;
1926- let mut stream_0 = vec ! [ ] ;
1927- let mut stream_1 = vec ! [ ] ;
1928-
1929- stream_0. push ( next_batch_id ( & mut streams[ 0 ] ) . await ?. unwrap ( ) ) ;
1930- stream_1. push ( next_batch_id ( & mut streams[ 1 ] ) . await ?. unwrap ( ) ) ;
1931- stream_0. push ( next_batch_id ( & mut streams[ 0 ] ) . await ?. unwrap ( ) ) ;
1932- assert_eq ! ( next_batch_id( & mut streams[ 1 ] ) . await ?, None ) ;
1933- assert_eq ! ( next_batch_id( & mut streams[ 0 ] ) . await ?, None ) ;
1939+ )
1940+ // Poll sibling 0 first so it publishes one ready morsel from the
1941+ // fast file. Poll sibling 1 next while its own file is still
1942+ // blocked on I/O: the batch it returns at that point must have
1943+ // been stolen from sibling 0. Poll sibling 0 again last so it can
1944+ // finish once sibling 1's local I/O has resolved.
1945+ . with_reads ( vec ! [ TestStreamId ( 0 ) , TestStreamId ( 1 ) , TestStreamId ( 0 ) ] ) ;
19341946
19351947 insta:: assert_snapshot!(
1936- format_sibling_outputs ( & stream_0 , & stream_1 , & observer ) ,
1948+ test . run ( ) . await . unwrap ( ) ,
19371949 @r"
19381950 ----- Stream 0 Output -----
19391951 Batch: 41
0 commit comments