@@ -324,4 +324,147 @@ mod tests {
324324
325325 Ok ( ( ) )
326326 }
327+
328+ /// Test that ParquetSink exposes rows_written, bytes_written, and
329+ /// elapsed_compute metrics via DataSinkExec.
330+ #[ tokio:: test]
331+ async fn test_parquet_sink_metrics ( ) -> Result < ( ) > {
332+ use arrow:: array:: Int32Array ;
333+ use arrow:: datatypes:: { DataType , Field , Schema } ;
334+ use arrow:: record_batch:: RecordBatch ;
335+ use datafusion_execution:: TaskContext ;
336+
337+ use futures:: TryStreamExt ;
338+
339+ let ctx = SessionContext :: new ( ) ;
340+ let tmp_dir = TempDir :: new ( ) ?;
341+ let output_path = tmp_dir. path ( ) . join ( "metrics_test.parquet" ) ;
342+ let output_path_str = output_path. to_str ( ) . unwrap ( ) ;
343+
344+ // Register a table with 100 rows
345+ let schema = Arc :: new ( Schema :: new ( vec ! [
346+ Field :: new( "id" , DataType :: Int32 , false ) ,
347+ Field :: new( "val" , DataType :: Int32 , false ) ,
348+ ] ) ) ;
349+ let ids: Vec < i32 > = ( 0 ..100 ) . collect ( ) ;
350+ let vals: Vec < i32 > = ( 100 ..200 ) . collect ( ) ;
351+ let batch = RecordBatch :: try_new (
352+ Arc :: clone ( & schema) ,
353+ vec ! [
354+ Arc :: new( Int32Array :: from( ids) ) ,
355+ Arc :: new( Int32Array :: from( vals) ) ,
356+ ] ,
357+ ) ?;
358+ ctx. register_batch ( "source" , batch) ?;
359+
360+ // Create the physical plan for COPY TO
361+ let df = ctx
362+ . sql ( & format ! (
363+ "COPY source TO '{output_path_str}' STORED AS PARQUET"
364+ ) )
365+ . await ?;
366+ let plan = df. create_physical_plan ( ) . await ?;
367+
368+ // Execute the plan
369+ let task_ctx = Arc :: new ( TaskContext :: from ( & ctx. state ( ) ) ) ;
370+ let stream = plan. execute ( 0 , task_ctx) ?;
371+ let _batches: Vec < _ > = stream. try_collect ( ) . await ?;
372+
373+ // Check metrics on the DataSinkExec (top-level plan)
374+ let metrics = plan
375+ . metrics ( )
376+ . expect ( "DataSinkExec should return metrics from ParquetSink" ) ;
377+ let aggregated = metrics. aggregate_by_name ( ) ;
378+
379+ // rows_written should be 100
380+ let rows_written = aggregated
381+ . iter ( )
382+ . find ( |m| m. value ( ) . name ( ) == "rows_written" )
383+ . expect ( "should have rows_written metric" ) ;
384+ assert_eq ! (
385+ rows_written. value( ) . as_usize( ) ,
386+ 100 ,
387+ "expected 100 rows written"
388+ ) ;
389+
390+ // bytes_written should be > 0
391+ let bytes_written = aggregated
392+ . iter ( )
393+ . find ( |m| m. value ( ) . name ( ) == "bytes_written" )
394+ . expect ( "should have bytes_written metric" ) ;
395+ assert ! (
396+ bytes_written. value( ) . as_usize( ) > 0 ,
397+ "expected bytes_written > 0, got {}" ,
398+ bytes_written. value( ) . as_usize( )
399+ ) ;
400+
401+ // elapsed_compute should be > 0
402+ let elapsed = aggregated
403+ . iter ( )
404+ . find ( |m| m. value ( ) . name ( ) == "elapsed_compute" )
405+ . expect ( "should have elapsed_compute metric" ) ;
406+ assert ! (
407+ elapsed. value( ) . as_usize( ) > 0 ,
408+ "expected elapsed_compute > 0"
409+ ) ;
410+
411+ Ok ( ( ) )
412+ }
413+
414+ /// Test that ParquetSink metrics work with single_file_parallelism enabled.
415+ #[ tokio:: test]
416+ async fn test_parquet_sink_metrics_parallel ( ) -> Result < ( ) > {
417+ use arrow:: array:: Int32Array ;
418+ use arrow:: datatypes:: { DataType , Field , Schema } ;
419+ use arrow:: record_batch:: RecordBatch ;
420+ use datafusion_execution:: TaskContext ;
421+
422+ use futures:: TryStreamExt ;
423+
424+ let ctx = SessionContext :: new ( ) ;
425+ ctx. sql ( "SET datafusion.execution.parquet.allow_single_file_parallelism = true" )
426+ . await ?
427+ . collect ( )
428+ . await ?;
429+
430+ let tmp_dir = TempDir :: new ( ) ?;
431+ let output_path = tmp_dir. path ( ) . join ( "metrics_parallel.parquet" ) ;
432+ let output_path_str = output_path. to_str ( ) . unwrap ( ) ;
433+
434+ let schema =
435+ Arc :: new ( Schema :: new ( vec ! [ Field :: new( "id" , DataType :: Int32 , false ) ] ) ) ;
436+ let ids: Vec < i32 > = ( 0 ..50 ) . collect ( ) ;
437+ let batch = RecordBatch :: try_new (
438+ Arc :: clone ( & schema) ,
439+ vec ! [ Arc :: new( Int32Array :: from( ids) ) ] ,
440+ ) ?;
441+ ctx. register_batch ( "source2" , batch) ?;
442+
443+ let df = ctx
444+ . sql ( & format ! (
445+ "COPY source2 TO '{output_path_str}' STORED AS PARQUET"
446+ ) )
447+ . await ?;
448+ let plan = df. create_physical_plan ( ) . await ?;
449+ let task_ctx = Arc :: new ( TaskContext :: from ( & ctx. state ( ) ) ) ;
450+ let stream = plan. execute ( 0 , task_ctx) ?;
451+ let _batches: Vec < _ > = stream. try_collect ( ) . await ?;
452+
453+ let metrics = plan. metrics ( ) . expect ( "DataSinkExec should return metrics" ) ;
454+ let aggregated = metrics. aggregate_by_name ( ) ;
455+
456+ let rows_written = aggregated
457+ . iter ( )
458+ . find ( |m| m. value ( ) . name ( ) == "rows_written" )
459+ . expect ( "should have rows_written metric" ) ;
460+ assert_eq ! ( rows_written. value( ) . as_usize( ) , 50 ) ;
461+
462+ let bytes_written = aggregated
463+ . iter ( )
464+ . find ( |m| m. value ( ) . name ( ) == "bytes_written" )
465+ . expect ( "should have bytes_written metric" ) ;
466+ assert ! ( bytes_written. value( ) . as_usize( ) > 0 ) ;
467+
468+ Ok ( ( ) )
469+ }
327470}
0 commit comments