@@ -12,9 +12,8 @@ use futures_util::{
1212#[ cfg( feature = "logs_level_enabled" ) ]
1313use opentelemetry:: logs:: Severity ;
1414use opentelemetry:: {
15- global,
1615 logs:: { LogError , LogResult } ,
17- otel_error, otel_warn, InstrumentationLibrary ,
16+ otel_debug , otel_error, otel_warn, InstrumentationLibrary ,
1817} ;
1918
2019use std:: sync:: atomic:: AtomicBool ;
@@ -99,26 +98,36 @@ impl LogProcessor for SimpleLogProcessor {
9998 fn emit ( & self , record : & mut LogRecord , instrumentation : & InstrumentationLibrary ) {
10099 // noop after shutdown
101100 if self . is_shutdown . load ( std:: sync:: atomic:: Ordering :: Relaxed ) {
101+ // this is a warning, as the user is trying to log after the processor has been shutdown
102102 otel_warn ! (
103- name: "simple_log_processor_emit_after_shutdown"
103+ name: "SimpleLogProcessor.Emit.ProcessorShutdown" ,
104104 ) ;
105105 return ;
106106 }
107107
108108 let result = self
109109 . exporter
110110 . lock ( )
111- . map_err ( |_| LogError :: Other ( "simple logprocessor mutex poison ". into ( ) ) )
111+ . map_err ( |_| LogError :: MutexPoisoned ( "SimpleLogProcessor ". into ( ) ) )
112112 . and_then ( |mut exporter| {
113113 let log_tuple = & [ ( record as & LogRecord , instrumentation) ] ;
114114 futures_executor:: block_on ( exporter. export ( LogBatch :: new ( log_tuple) ) )
115115 } ) ;
116- if let Err ( err) = result {
117- otel_error ! (
118- name: "simple_log_processor_emit_error" ,
119- error = format!( "{:?}" , err)
120- ) ;
121- global:: handle_error ( err) ;
116+ // Handle errors with specific static names
117+ match result {
118+ Err ( LogError :: MutexPoisoned ( _) ) => {
119+ // logging as debug as this is not a user error
120+ otel_debug ! (
121+ name: "SimpleLogProcessor.Emit.MutexPoisoning" ,
122+ ) ;
123+ }
124+ Err ( err) => {
125+ otel_error ! (
126+ name: "SimpleLogProcessor.Emit.ExportError" ,
127+ error = format!( "{}" , err)
128+ ) ;
129+ }
130+ _ => { }
122131 }
123132 }
124133
@@ -133,12 +142,7 @@ impl LogProcessor for SimpleLogProcessor {
133142 exporter. shutdown ( ) ;
134143 Ok ( ( ) )
135144 } else {
136- otel_error ! (
137- name: "simple_log_processor_shutdown_error"
138- ) ;
139- Err ( LogError :: Other (
140- "simple logprocessor mutex poison during shutdown" . into ( ) ,
141- ) )
145+ Err ( LogError :: MutexPoisoned ( "SimpleLogProcessor" . into ( ) ) )
142146 }
143147 }
144148
@@ -170,12 +174,12 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
170174 instrumentation. clone ( ) ,
171175 ) ) ) ;
172176
177+ // TODO - Implement throttling to prevent error flooding when the queue is full or closed.
173178 if let Err ( err) = result {
174179 otel_error ! (
175- name: "batch_log_processor_emit_error " ,
176- error = format!( "{:? }" , err)
180+ name: "BatchLogProcessor.Export.Error " ,
181+ error = format!( "{}" , err)
177182 ) ;
178- global:: handle_error ( LogError :: Other ( err. into ( ) ) ) ;
179183 }
180184 }
181185
@@ -243,10 +247,9 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
243247
244248 if let Err ( err) = result {
245249 otel_error ! (
246- name: "batch_log_processor_export_error " ,
247- error = format!( "{:? }" , err)
250+ name: "BatchLogProcessor.Export.Error " ,
251+ error = format!( "{}" , err)
248252 ) ;
249- global:: handle_error ( err) ;
250253 }
251254 }
252255 }
@@ -261,24 +264,12 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
261264 . await ;
262265
263266 if let Some ( channel) = res_channel {
264- if let Err ( result) = channel. send ( result) {
265- global:: handle_error ( LogError :: from ( format ! (
266- "failed to send flush result: {:?}" ,
267- result
268- ) ) ) ;
269- otel_error ! (
270- name: "batch_log_processor_flush_error" ,
271- error = format!( "{:?}" , result) ,
272- message = "Failed to send flush result"
267+ if let Err ( send_error) = channel. send ( result) {
268+ otel_debug ! (
269+ name: "BatchLogProcessor.Flush.SendResultError" ,
270+ error = format!( "{:?}" , send_error) ,
273271 ) ;
274272 }
275- } else if let Err ( err) = result {
276- otel_error ! (
277- name: "batch_log_processor_flush_error" ,
278- error = format!( "{:?}" , err) ,
279- message = "Flush failed"
280- ) ;
281- global:: handle_error ( err) ;
282273 }
283274 }
284275 // Stream has terminated or processor is shutdown, return to finish execution.
@@ -293,21 +284,14 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
293284
294285 exporter. shutdown ( ) ;
295286
296- if let Err ( result) = ch. send ( result) {
297- otel_error ! (
298- name: "batch_log_processor_shutdown_error" ,
299- error = format!( "{:?}" , result) ,
300- message = "Failed to send shutdown result"
287+ if let Err ( send_error) = ch. send ( result) {
288+ otel_debug ! (
289+ name: "BatchLogProcessor.Shutdown.SendResultError" ,
290+ error = format!( "{:?}" , send_error) ,
301291 ) ;
302- global:: handle_error ( LogError :: from ( format ! (
303- "failed to send batch processor shutdown result: {:?}" ,
304- result
305- ) ) ) ;
306292 }
307-
308293 break ;
309294 }
310-
311295 // propagate the resource
312296 BatchMessage :: SetResource ( resource) => {
313297 exporter. set_resource ( & resource) ;
@@ -357,13 +341,7 @@ where
357341 pin_mut ! ( timeout) ;
358342 match future:: select ( export, timeout) . await {
359343 Either :: Left ( ( export_res, _) ) => export_res,
360- Either :: Right ( ( _, _) ) => {
361- otel_error ! (
362- name: "export_with_timeout_timeout" ,
363- timeout_duration = time_out. as_millis( )
364- ) ;
365- ExportResult :: Err ( LogError :: ExportTimedOut ( time_out) )
366- }
344+ Either :: Right ( ( _, _) ) => ExportResult :: Err ( LogError :: ExportTimedOut ( time_out) ) ,
367345 }
368346}
369347
0 commit comments