@@ -611,22 +611,26 @@ impl FileStreamMetrics {
611611
612612#[ cfg( test) ]
613613mod tests {
614- use crate :: PartitionedFile ;
615- use crate :: file_scan_config:: FileScanConfigBuilder ;
614+ use crate :: file_scan_config:: { FileScanConfig , FileScanConfigBuilder } ;
615+ use crate :: morsel:: test_utils:: {
616+ MockMorselSpec , MockMorselizer , MockPlannerSteps , MorselEvent , MorselObserver ,
617+ PlannerStep ,
618+ } ;
616619 use crate :: tests:: make_partition;
620+ use crate :: { PartitionedFile , TableSchema } ;
621+ use arrow:: datatypes:: Int32Type ;
617622 use datafusion_common:: error:: Result ;
618623 use datafusion_execution:: object_store:: ObjectStoreUrl ;
619624 use datafusion_physical_plan:: metrics:: ExecutionPlanMetricsSet ;
620- use futures:: { FutureExt as _, StreamExt as _} ;
621625 use std:: sync:: Arc ;
622626 use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
623627
624628 use crate :: file_stream:: { FileOpenFuture , FileOpener , FileStream , OnError } ;
625629 use crate :: test_util:: MockSource ;
626- use arrow:: array:: RecordBatch ;
627- use arrow:: datatypes:: Schema ;
628-
630+ use arrow:: array:: { Array , AsArray , RecordBatch } ;
631+ use arrow:: datatypes:: { DataType , Field , Schema } ;
629632 use datafusion_common:: { assert_batches_eq, exec_err, internal_err} ;
633+ use futures:: { FutureExt , StreamExt } ;
630634
631635 /// Test `FileOpener` which will simulate errors during file opening or scanning
632636 #[ derive( Default ) ]
@@ -741,7 +745,7 @@ mod tests {
741745
742746 let on_error = self . on_error ;
743747
744- let table_schema = crate :: table_schema :: TableSchema :: new ( file_schema, vec ! [ ] ) ;
748+ let table_schema = TableSchema :: new ( file_schema, vec ! [ ] ) ;
745749 let config = FileScanConfigBuilder :: new (
746750 ObjectStoreUrl :: parse ( "test:///" ) . unwrap ( ) ,
747751 Arc :: new ( MockSource :: new ( table_schema) ) ,
@@ -774,6 +778,113 @@ mod tests {
774778 . expect ( "error executing stream" )
775779 }
776780
781+ #[ tokio:: test]
782+ async fn morsel_framework_single_planner_single_morsel_no_io ( ) -> Result < ( ) > {
783+ let observer = MorselObserver :: new ( ) ;
784+ let morselizer = MockMorselizer :: new ( )
785+ . with_file (
786+ "file1.parquet" ,
787+ vec ! [ MockPlannerSteps :: new(
788+ 0 ,
789+ vec![
790+ PlannerStep :: ReturnPlan {
791+ morsels: vec![ MockMorselSpec :: single_batch( 10 , 42 ) ] ,
792+ planners: vec![ ] ,
793+ io_future_id: None ,
794+ io_polls: 0 ,
795+ } ,
796+ PlannerStep :: ReturnNone ,
797+ ] ,
798+ ) ] ,
799+ )
800+ . with_observer ( observer. clone ( ) ) ;
801+
802+ let config = test_config ( vec ! [ "file1.parquet" ] ) ;
803+ insta:: assert_snapshot!( run_stream( morselizer, config) . await . unwrap( ) , @r"
804+ **** Batch: 42
805+ **** Done
806+ " ) ;
807+
808+ assert_eq ! (
809+ observer. events( ) ,
810+ vec![
811+ MorselEvent :: MorselizeFile {
812+ path: "file1.parquet" . to_string( )
813+ } ,
814+ MorselEvent :: PlannerCreated { planner_id: 0 } ,
815+ MorselEvent :: PlannerPlanCalled { planner_id: 0 } ,
816+ MorselEvent :: MorselProduced {
817+ planner_id: 0 ,
818+ morsel_id: 10
819+ } ,
820+ MorselEvent :: PlannerPlanCalled { planner_id: 0 } ,
821+ MorselEvent :: MorselStreamStarted { morsel_id: 10 } ,
822+ MorselEvent :: MorselStreamBatchProduced {
823+ morsel_id: 10 ,
824+ batch_id: 42
825+ } ,
826+ MorselEvent :: MorselStreamFinished { morsel_id: 10 } ,
827+ ]
828+ ) ;
829+
830+ Ok ( ( ) )
831+ }
832+
833+ fn test_config ( file_names : Vec < & str > ) -> FileScanConfig {
834+ let file_group = file_names
835+ . into_iter ( )
836+ . map ( |name| PartitionedFile :: new ( name, 10 ) )
837+ . collect ( ) ;
838+ let table_schema = TableSchema :: new (
839+ Arc :: new ( Schema :: new ( vec ! [ Field :: new( "i" , DataType :: Int32 , false ) ] ) ) ,
840+ vec ! [ ] ,
841+ ) ;
842+ FileScanConfigBuilder :: new (
843+ ObjectStoreUrl :: parse ( "test:///" ) . unwrap ( ) ,
844+ Arc :: new ( MockSource :: new ( table_schema) ) ,
845+ )
846+ . with_file_group ( file_group)
847+ . build ( )
848+ }
849+
850+ /// Creates a [`FileStream`] for reading the specified config, reads all
851+ /// record batches, and returns a stringified version of the results for
852+ /// easy comparison in tests.
853+ ///
854+ /// If the FileStream returns an error during execution, formats that error
855+ /// in the output stream, rather than returning an error
856+ async fn run_stream (
857+ morselizer : MockMorselizer ,
858+ config : FileScanConfig ,
859+ ) -> Result < String > {
860+ let metrics_set = ExecutionPlanMetricsSet :: new ( ) ;
861+ let mut stream = FileStream :: new_with_morselizer (
862+ & config,
863+ 0 ,
864+ Box :: new ( morselizer) ,
865+ & metrics_set,
866+ ) ?;
867+
868+ let mut stream_contents = Vec :: new ( ) ;
869+ while let Some ( result) = stream. next ( ) . await {
870+ match result {
871+ Ok ( batch) => {
872+ // each batch should have a single int32 column with batch id
873+ let col = batch. column ( 0 ) . as_primitive :: < Int32Type > ( ) ;
874+ assert_eq ! ( col. len( ) , 1 ) ;
875+ assert ! ( col. is_valid( 0 ) ) ;
876+ let batch_id = col. value ( 0 ) ;
877+ stream_contents. push ( format ! ( "**** Batch: {batch_id}" ) ) ;
878+ }
879+ Err ( e) => {
880+ stream_contents. push ( format ! ( "**** Error: {e}" ) ) ;
881+ }
882+ }
883+ }
884+ stream_contents. push ( "**** Done" . to_string ( ) ) ;
885+ Ok ( stream_contents. join ( "\n " ) )
886+ }
887+
777888 #[ tokio:: test]
778889 async fn on_error_opening ( ) -> Result < ( ) > {
779890 let batches = FileStreamTest :: new ( )
0 commit comments