@@ -14,7 +14,7 @@ use futures_util::{
1414use opentelemetry:: {
1515 global,
1616 metrics:: { MetricsError , Result } ,
17- otel_error,
17+ otel_debug , otel_error,
1818} ;
1919
2020use crate :: runtime:: Runtime ;
@@ -245,36 +245,56 @@ impl<RT: Runtime> PeriodicReaderWorker<RT> {
245245 Either :: Left ( ( res, _) ) => {
246246 res // return the status of export.
247247 }
248- Either :: Right ( _) => {
249- otel_error ! (
250- name: "collect_and_export" ,
251- status = "timed_out"
252- ) ;
253- Err ( MetricsError :: Other ( "export timed out" . into ( ) ) )
254- }
248+ Either :: Right ( _) => Err ( MetricsError :: ExportTimeout (
249+ "PeriodicReader" . into ( ) ,
250+ self . timeout . as_nanos ( ) ,
251+ )
252+ . into ( ) ) ,
255253 }
256254 }
257255
258256 async fn process_message ( & mut self , message : Message ) -> bool {
259257 match message {
260258 Message :: Export => {
261259 if let Err ( err) = self . collect_and_export ( ) . await {
262- global:: handle_error ( err)
260+ match err {
261+ MetricsError :: ExportTimeout ( _, _) => {
262+ otel_error ! ( name: "PeriodicReader.ExportFailed" , error = format!( "{:?}" , err) ) ;
263+ }
264+ MetricsError :: ReaderShutdown => {
265+ otel_debug ! ( name: "PeriodicReader.ReaderShutdown" , error = format!( "{:?}" , err) ) ;
266+ }
267+ MetricsError :: ReaderNotRegistered => {
268+ otel_debug ! ( name: "PeriodicReader.ReaderNotRegistered" , error = format!( "{:?}" , err) ) ;
269+ }
270+ _ => {
271+ // TBD: This includes errors from both collection and export. Need to be made more specific
272+ // and identify the levels to log them.
273+ otel_error ! ( name: "PeriodicReader.ExportFailed" , error = format!( "{:?}" , err) ) ;
274+ }
275+ }
263276 }
264277 }
265278 Message :: Flush ( ch) => {
266279 let res = self . collect_and_export ( ) . await ;
267- if ch. send ( res) . is_err ( ) {
268- global:: handle_error ( MetricsError :: Other ( "flush channel closed" . into ( ) ) )
280+ if let Err ( send_error) = ch. send ( res) {
281+ otel_debug ! (
282+ name: "PeriodicReader.Flush.SendResultError" ,
283+ error = format!( "{:?}" , send_error) ,
284+ ) ;
269285 }
270286 }
271287 Message :: Shutdown ( ch) => {
272288 let res = self . collect_and_export ( ) . await ;
273289 let _ = self . reader . exporter . shutdown ( ) ;
274- if ch. send ( res) . is_err ( ) {
275- global:: handle_error ( MetricsError :: Other ( "shutdown channel closed" . into ( ) ) )
290+ if let Err ( send_error) = ch. send ( res) {
291+ otel_debug ! (
292+ name: "PeriodicReader.Flush.SendResultError" ,
293+ error = format!( "{:?}" , send_error) ,
294+ ) ;
295+ //Return false to break the loop and shutdown the worker.
296+ return false ;
276297 }
277- return false ;
278298 }
279299 }
280300
@@ -300,9 +320,7 @@ impl MetricReader for PeriodicReader {
300320 let worker = match & mut inner. sdk_producer_or_worker {
301321 ProducerOrWorker :: Producer ( _) => {
302322 // Only register once. If producer is already set, do nothing.
303- global:: handle_error ( MetricsError :: Other (
304- "duplicate meter registration, did not register manual reader" . into ( ) ,
305- ) ) ;
323+ otel_debug ! ( name: "PeriodicReader.RegisterPipeline.DuplicateRegistration" ) ;
306324 return ;
307325 }
308326 ProducerOrWorker :: Worker ( w) => mem:: replace ( w, Box :: new ( |_| { } ) ) ,
@@ -315,7 +333,7 @@ impl MetricReader for PeriodicReader {
315333 fn collect ( & self , rm : & mut ResourceMetrics ) -> Result < ( ) > {
316334 let inner = self . inner . lock ( ) ?;
317335 if inner. is_shutdown {
318- return Err ( MetricsError :: Other ( "reader is shut down" . into ( ) ) ) ;
336+ return Err ( MetricsError :: ReaderShutdown ) ;
319337 }
320338
321339 if let Some ( producer) = match & inner. sdk_producer_or_worker {
@@ -324,7 +342,7 @@ impl MetricReader for PeriodicReader {
324342 } {
325343 producer. produce ( rm) ?;
326344 } else {
327- return Err ( MetricsError :: Other ( "reader is not registered" . into ( ) ) ) ;
345+ return Err ( MetricsError :: ReaderNotRegistered ) ;
328346 }
329347
330348 Ok ( ( ) )
0 commit comments