@@ -65,13 +65,16 @@ pub struct ScanExec {
6565 pub input_source_description : String ,
6666 /// The data types of columns of the input batch. Converted from Spark schema.
6767 pub data_types : Vec < DataType > ,
68+ /// Schema of first batch
69+ pub schema : SchemaRef ,
6870 /// The input batch of input data. Used to determine the schema of the input data.
6971 /// It is also used in unit test to mock the input data from JVM.
7072 pub batch : Arc < Mutex < Option < InputBatch > > > ,
7173 /// Cache of expensive-to-compute plan properties
7274 cache : PlanProperties ,
7375 /// Metrics collector
7476 metrics : ExecutionPlanMetricsSet ,
77+ baseline_metrics : BaselineMetrics ,
7578}
7679
7780impl ScanExec {
@@ -81,22 +84,30 @@ impl ScanExec {
8184 input_source_description : & str ,
8285 data_types : Vec < DataType > ,
8386 ) -> Result < Self , CometError > {
87+ let metrics_set = ExecutionPlanMetricsSet :: default ( ) ;
88+ let baseline_metrics = BaselineMetrics :: new ( & metrics_set, 0 ) ;
89+
8490 // Scan's schema is determined by the input batch, so we need to set it before execution.
8591 // Note that we determine if arrays are dictionary-encoded based on the
8692 // first batch. The array may be dictionary-encoded in some batches and not others, and
8793 // ScanExec will cast arrays from all future batches to the type determined here, so we
8894 // may end up either unpacking dictionary arrays or dictionary-encoding arrays.
8995 // Dictionary-encoded primitive arrays are always unpacked.
9096 let first_batch = if let Some ( input_source) = input_source. as_ref ( ) {
91- ScanExec :: get_next ( exec_context_id, input_source. as_obj ( ) , data_types. len ( ) ) ?
97+ let mut timer = baseline_metrics. elapsed_compute ( ) . timer ( ) ;
98+ let batch =
99+ ScanExec :: get_next ( exec_context_id, input_source. as_obj ( ) , data_types. len ( ) ) ?;
100+ timer. stop ( ) ;
101+ baseline_metrics. record_output ( batch. num_rows ( ) ) ;
102+ batch
92103 } else {
93104 InputBatch :: EOF
94105 } ;
95106
96107 let schema = scan_schema ( & first_batch, & data_types) ;
97108
98109 let cache = PlanProperties :: new (
99- EquivalenceProperties :: new ( schema) ,
110+ EquivalenceProperties :: new ( Arc :: clone ( & schema) ) ,
100111 // The partitioning is not important because we are not using DataFusion's
101112 // query planner or optimizer
102113 Partitioning :: UnknownPartitioning ( 1 ) ,
@@ -110,7 +121,9 @@ impl ScanExec {
110121 data_types,
111122 batch : Arc :: new ( Mutex :: new ( Some ( first_batch) ) ) ,
112123 cache,
113- metrics : ExecutionPlanMetricsSet :: default ( ) ,
124+ metrics : metrics_set,
125+ baseline_metrics,
126+ schema,
114127 } )
115128 }
116129
@@ -276,11 +289,15 @@ impl ExecutionPlan for ScanExec {
276289 }
277290
278291 fn schema ( & self ) -> SchemaRef {
279- // `unwrap` is safe because `schema` is only called during converting
280- // Spark plan to DataFusion plan. At the moment, `batch` is not EOF.
281- let binding = self . batch . try_lock ( ) . unwrap ( ) ;
282- let input_batch = binding. as_ref ( ) . unwrap ( ) ;
283- scan_schema ( input_batch, & self . data_types )
292+ if self . exec_context_id == TEST_EXEC_CONTEXT_ID {
293+ // `unwrap` is safe because `schema` is only called during converting
294+ // Spark plan to DataFusion plan. At the moment, `batch` is not EOF.
295+ let binding = self . batch . try_lock ( ) . unwrap ( ) ;
296+ let input_batch = binding. as_ref ( ) . unwrap ( ) ;
297+ scan_schema ( input_batch, & self . data_types )
298+ } else {
299+ Arc :: clone ( & self . schema )
300+ }
284301 }
285302
286303 fn children ( & self ) -> Vec < & Arc < dyn ExecutionPlan > > {
@@ -303,6 +320,7 @@ impl ExecutionPlan for ScanExec {
303320 self . clone ( ) ,
304321 self . schema ( ) ,
305322 partition,
323+ self . baseline_metrics . clone ( ) ,
306324 ) ) )
307325 }
308326
@@ -352,8 +370,12 @@ struct ScanStream<'a> {
352370}
353371
354372impl < ' a > ScanStream < ' a > {
355- pub fn new ( scan : ScanExec , schema : SchemaRef , partition : usize ) -> Self {
356- let baseline_metrics = BaselineMetrics :: new ( & scan. metrics , partition) ;
373+ pub fn new (
374+ scan : ScanExec ,
375+ schema : SchemaRef ,
376+ partition : usize ,
377+ baseline_metrics : BaselineMetrics ,
378+ ) -> Self {
357379 let cast_time = MetricBuilder :: new ( & scan. metrics ) . subset_time ( "cast_time" , partition) ;
358380 Self {
359381 scan,
@@ -465,4 +487,12 @@ impl InputBatch {
465487
466488 InputBatch :: Batch ( columns, num_rows)
467489 }
490+
491+ /// Get the number of rows in this batch
492+ fn num_rows ( & self ) -> usize {
493+ match self {
494+ Self :: EOF => 0 ,
495+ Self :: Batch ( _, num_rows) => * num_rows,
496+ }
497+ }
468498}
0 commit comments