@@ -117,9 +117,6 @@ where
117117/// return metric data to the user. It will not automatically send that data to
118118/// the exporter outside of the predefined interval.
119119///
120- /// As this spuns up own background thread, this is recommended to be used with push exporters
121- /// that do not require any particular async runtime. As of now, this cannot be used with
122- /// OTLP exporters as they requires async runtime
123120///
124121/// [collect]: MetricReader::collect
125122///
@@ -160,7 +157,7 @@ impl PeriodicReader {
160157 mpsc:: channel ( ) ;
161158 let reader = PeriodicReader {
162159 inner : Arc :: new ( PeriodicReaderInner {
163- message_sender : Arc :: new ( Mutex :: new ( message_sender) ) ,
160+ message_sender : Arc :: new ( message_sender) ,
164161 is_shutdown : AtomicBool :: new ( false ) ,
165162 producer : Mutex :: new ( None ) ,
166163 exporter : Arc :: new ( exporter) ,
@@ -223,6 +220,11 @@ impl PeriodicReader {
223220 } else {
224221 response_sender. send ( true ) . unwrap ( ) ;
225222 }
223+
224+ otel_debug ! (
225+ name: "PeriodReaderThreadExiting" ,
226+ reason = "ShutdownRequested"
227+ ) ;
226228 break ;
227229 }
228230 Err ( mpsc:: RecvTimeoutError :: Timeout ) => {
@@ -255,8 +257,13 @@ impl PeriodicReader {
255257 interval_start = Instant :: now ( ) ;
256258 }
257259 }
258- Err ( _) => {
259- // Some other error. Break out and exit the thread.
260+ Err ( mpsc:: RecvTimeoutError :: Disconnected ) => {
261+ // Channel disconnected, only thing to do is break
262+ // out (i.e exit the thread)
263+ otel_debug ! (
264+ name: "PeriodReaderThreadExiting" ,
265+ reason = "MessageReceiverDisconnected"
266+ ) ;
260267 break ;
261268 }
262269 }
@@ -271,6 +278,7 @@ impl PeriodicReader {
271278 if let Err ( e) = result_thread_creation {
272279 otel_error ! (
273280 name: "PeriodReaderThreadStartError" ,
281+ message = "Failed to start PeriodicReader thread. Metrics will not be exported." ,
274282 error = format!( "{:?}" , e)
275283 ) ;
276284 }
@@ -290,7 +298,7 @@ impl fmt::Debug for PeriodicReader {
290298
291299struct PeriodicReaderInner {
292300 exporter : Arc < dyn PushMetricExporter > ,
293- message_sender : Arc < Mutex < mpsc:: Sender < Message > > > ,
301+ message_sender : Arc < mpsc:: Sender < Message > > ,
294302 producer : Mutex < Option < Weak < dyn SdkProducer > > > ,
295303 is_shutdown : AtomicBool ,
296304}
@@ -374,20 +382,9 @@ impl PeriodicReaderInner {
374382 return Err ( MetricError :: Other ( "reader is shut down" . into ( ) ) ) ;
375383 }
376384 let ( response_tx, response_rx) = mpsc:: channel ( ) ;
377- match self . message_sender . lock ( ) {
378- Ok ( sender) => {
379- sender
380- . send ( Message :: Flush ( response_tx) )
381- . map_err ( |e| MetricError :: Other ( e. to_string ( ) ) ) ?;
382- }
383- Err ( e) => {
384- otel_debug ! (
385- name: "PeriodReaderForceFlushError" ,
386- error = format!( "{:?}" , e)
387- ) ;
388- return Err ( MetricError :: Other ( e. to_string ( ) ) ) ;
389- }
390- }
385+ self . message_sender
386+ . send ( Message :: Flush ( response_tx) )
387+ . map_err ( |e| MetricError :: Other ( e. to_string ( ) ) ) ?;
391388
392389 if let Ok ( response) = response_rx. recv ( ) {
393390 // TODO: call exporter's force_flush method.
@@ -408,20 +405,9 @@ impl PeriodicReaderInner {
408405
409406 // TODO: See if this is better to be created upfront.
410407 let ( response_tx, response_rx) = mpsc:: channel ( ) ;
411- match self . message_sender . lock ( ) {
412- Ok ( sender) => {
413- sender
414- . send ( Message :: Shutdown ( response_tx) )
415- . map_err ( |e| MetricError :: Other ( e. to_string ( ) ) ) ?;
416- }
417- Err ( e) => {
418- otel_debug ! (
419- name: "PeriodReaderShutdownError" ,
420- error = format!( "{:?}" , e)
421- ) ;
422- return Err ( MetricError :: Other ( e. to_string ( ) ) ) ;
423- }
424- }
408+ self . message_sender
409+ . send ( Message :: Shutdown ( response_tx) )
410+ . map_err ( |e| MetricError :: Other ( e. to_string ( ) ) ) ?;
425411
426412 if let Ok ( response) = response_rx. recv ( ) {
427413 self . is_shutdown
0 commit comments