@@ -154,11 +154,12 @@ impl PeriodicReader {
154154 {
155155 let ( message_sender, message_receiver) : ( Sender < Message > , Receiver < Message > ) =
156156 mpsc:: channel ( ) ;
157+ let exporter_arc = Arc :: new ( exporter) ;
157158 let reader = PeriodicReader {
158159 inner : Arc :: new ( PeriodicReaderInner {
159160 message_sender : Arc :: new ( message_sender) ,
160161 producer : Mutex :: new ( None ) ,
161- exporter : Arc :: new ( exporter ) ,
162+ exporter : exporter_arc . clone ( ) ,
162163 } ) ,
163164 } ;
164165 let cloned_reader = reader. clone ( ) ;
@@ -213,7 +214,13 @@ impl PeriodicReader {
213214 Ok ( Message :: Shutdown ( response_sender) ) => {
214215 // Perform final export and break out of loop and exit the thread
215216 otel_debug ! ( name: "PeriodReaderThreadExportingDueToShutdown" ) ;
216- if let Err ( _e) = cloned_reader. collect_and_export ( timeout) {
217+ let export_result = cloned_reader. collect_and_export ( timeout) ;
218+ let shutdown_result = exporter_arc. shutdown ( ) ;
219+ otel_debug ! (
220+ name: "PeriodReaderInvokedExporterShutdown" ,
221+ shutdown_result = format!( "{:?}" , shutdown_result)
222+ ) ;
223+ if export_result. is_err ( ) || shutdown_result. is_err ( ) {
217224 response_sender. send ( false ) . unwrap ( ) ;
218225 } else {
219226 response_sender. send ( true ) . unwrap ( ) ;
@@ -474,7 +481,7 @@ mod tests {
474481 use opentelemetry:: metrics:: MeterProvider ;
475482 use std:: {
476483 sync:: {
477- atomic:: { AtomicUsize , Ordering } ,
484+ atomic:: { AtomicBool , AtomicUsize , Ordering } ,
478485 mpsc, Arc ,
479486 } ,
480487 time:: Duration ,
@@ -525,6 +532,31 @@ mod tests {
525532 }
526533 }
527534
535+ #[ derive( Debug , Clone , Default ) ]
536+ struct MockMetricExporter {
537+ is_shutdown : Arc < AtomicBool > ,
538+ }
539+
540+ #[ async_trait]
541+ impl PushMetricExporter for MockMetricExporter {
542+ async fn export ( & self , _metrics : & mut ResourceMetrics ) -> MetricResult < ( ) > {
543+ Ok ( ( ) )
544+ }
545+
546+ async fn force_flush ( & self ) -> MetricResult < ( ) > {
547+ Ok ( ( ) )
548+ }
549+
550+ fn shutdown ( & self ) -> MetricResult < ( ) > {
551+ self . is_shutdown . store ( true , Ordering :: Relaxed ) ;
552+ Ok ( ( ) )
553+ }
554+
555+ fn temporality ( & self ) -> Temporality {
556+ Temporality :: Cumulative
557+ }
558+ }
559+
528560 #[ test]
529561 fn collection_triggered_by_interval_multiple ( ) {
530562 // Arrange
@@ -687,6 +719,24 @@ mod tests {
687719 assert ! ( exporter. get_count( ) >= 2 ) ;
688720 }
689721
722+ #[ test]
723+ fn shutdown_passed_to_exporter ( ) {
724+ // Arrange
725+ let exporter = MockMetricExporter :: default ( ) ;
726+ let reader = PeriodicReader :: builder ( exporter. clone ( ) ) . build ( ) ;
727+
728+ let meter_provider = SdkMeterProvider :: builder ( ) . with_reader ( reader) . build ( ) ;
729+ let meter = meter_provider. meter ( "test" ) ;
730+ let counter = meter. u64_counter ( "sync_counter" ) . build ( ) ;
731+ counter. add ( 1 , & [ ] ) ;
732+
733+ // shutdown the provider, which should call shutdown on periodic reader
734+ // which in turn should call shutdown on exporter.
735+ let result = meter_provider. shutdown ( ) ;
736+ assert ! ( result. is_ok( ) ) ;
737+ assert ! ( exporter. is_shutdown. load( Ordering :: Relaxed ) ) ;
738+ }
739+
690740 #[ test]
691741 fn collection ( ) {
692742 collection_triggered_by_interval_helper ( ) ;
0 commit comments