@@ -11,9 +11,11 @@ use futures_util::{
1111} ;
1212#[ cfg( feature = "spec_unstable_logs_enabled" ) ]
1313use opentelemetry:: logs:: Severity ;
14- use opentelemetry:: { otel_debug, otel_error, otel_warn, InstrumentationScope } ;
14+ use opentelemetry:: {
15+ global, metrics:: Counter , otel_debug, otel_error, otel_warn, InstrumentationScope ,
16+ } ;
1517
16- use std:: sync:: atomic:: AtomicBool ;
18+ use std:: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
1719use std:: { cmp:: min, env, sync:: Mutex } ;
1820use std:: {
1921 fmt:: { self , Debug , Formatter } ,
@@ -154,6 +156,11 @@ impl LogProcessor for SimpleLogProcessor {
154156/// them at a pre-configured interval.
155157pub struct BatchLogProcessor < R : RuntimeChannel > {
156158 message_sender : R :: Sender < BatchMessage > ,
159+
160+ // Track dropped logs. We'll log this at shutdown and also emit
161+ // as a metric.
162+ dropped_logs_count : AtomicUsize ,
163+ dropped_logs_metric : Counter < u64 > ,
157164}
158165
159166impl < R : RuntimeChannel > Debug for BatchLogProcessor < R > {
@@ -172,11 +179,10 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
172179 ) ) ) ;
173180
174181 // TODO - Implement throttling to prevent error flooding when the queue is full or closed.
175- if let Err ( err) = result {
176- otel_error ! (
177- name: "BatchLogProcessor.Export.Error" ,
178- error = format!( "{}" , err)
179- ) ;
182+ if result. is_err ( ) {
183+ // Increment dropped logs counter and metric
184+ self . dropped_logs_count . fetch_add ( 1 , Ordering :: Relaxed ) ;
185+ self . dropped_logs_metric . add ( 1 , & [ ] ) ;
180186 }
181187 }
182188
@@ -192,6 +198,15 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
192198 }
193199
194200 fn shutdown ( & self ) -> LogResult < ( ) > {
201+ let dropped_logs = self . dropped_logs_count . load ( Ordering :: Relaxed ) ;
202+ if dropped_logs > 0 {
203+ otel_warn ! (
204+ name: "BatchLogProcessor.Shutdown" ,
205+ dropped_logs = dropped_logs,
206+ message = "Logs were dropped due to a full buffer."
207+ ) ;
208+ }
209+
195210 let ( res_sender, res_receiver) = oneshot:: channel ( ) ;
196211 self . message_sender
197212 . try_send ( BatchMessage :: Shutdown ( res_sender) )
@@ -296,8 +311,18 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
296311 }
297312 }
298313 } ) ) ;
314+
315+ let dropped_logs_metric = global:: meter ( "opentelemetry" )
316+ . u64_counter ( "dropped_logs" )
317+ . with_description ( "Number of logs dropped due to full buffer" )
318+ . build ( ) ;
319+
299320 // Return batch processor with link to worker
300- BatchLogProcessor { message_sender }
321+ BatchLogProcessor {
322+ message_sender,
323+ dropped_logs_count : AtomicUsize :: new ( 0 ) ,
324+ dropped_logs_metric,
325+ }
301326 }
302327
303328 /// Create a new batch processor builder
0 commit comments