@@ -9,7 +9,7 @@ use std::sync::mpsc::{self, RecvTimeoutError, SyncSender};
99use opentelemetry:: logs:: Severity ;
1010use opentelemetry:: { otel_debug, otel_error, otel_warn, InstrumentationScope } ;
1111
12- use std:: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
12+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
1313use std:: { cmp:: min, env, sync:: Mutex } ;
1414use std:: {
1515 fmt:: { self , Debug , Formatter } ,
@@ -87,29 +87,18 @@ pub trait LogProcessor: Send + Sync + Debug {
8787#[ derive( Debug ) ]
8888pub struct SimpleLogProcessor < T : LogExporter > {
8989 exporter : Mutex < T > ,
90- is_shutdown : AtomicBool ,
9190}
9291
9392impl < T : LogExporter > SimpleLogProcessor < T > {
9493 pub ( crate ) fn new ( exporter : T ) -> Self {
9594 SimpleLogProcessor {
9695 exporter : Mutex :: new ( exporter) ,
97- is_shutdown : AtomicBool :: new ( false ) ,
9896 }
9997 }
10098}
10199
102100impl < T : LogExporter > LogProcessor for SimpleLogProcessor < T > {
103101 fn emit ( & self , record : & mut LogRecord , instrumentation : & InstrumentationScope ) {
104- // noop after shutdown
105- if self . is_shutdown . load ( std:: sync:: atomic:: Ordering :: Relaxed ) {
106- // this is a warning, as the user is trying to log after the processor has been shutdown
107- otel_warn ! (
108- name: "SimpleLogProcessor.Emit.ProcessorShutdown" ,
109- ) ;
110- return ;
111- }
112-
113102 let result = self
114103 . exporter
115104 . lock ( )
@@ -141,8 +130,6 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
141130 }
142131
143132 fn shutdown ( & self ) -> LogResult < ( ) > {
144- self . is_shutdown
145- . store ( true , std:: sync:: atomic:: Ordering :: Relaxed ) ;
146133 if let Ok ( mut exporter) = self . exporter . lock ( ) {
147134 exporter. shutdown ( ) ;
148135 Ok ( ( ) )
@@ -182,7 +169,6 @@ pub struct BatchLogProcessor {
182169 handle : Mutex < Option < thread:: JoinHandle < ( ) > > > ,
183170 forceflush_timeout : Duration ,
184171 shutdown_timeout : Duration ,
185- is_shutdown : AtomicBool ,
186172
187173 // Track dropped logs - we'll log this at shutdown
188174 dropped_logs_count : AtomicUsize ,
@@ -201,15 +187,6 @@ impl Debug for BatchLogProcessor {
201187
202188impl LogProcessor for BatchLogProcessor {
203189 fn emit ( & self , record : & mut LogRecord , instrumentation : & InstrumentationScope ) {
204- // noop after shutdown
205- if self . is_shutdown . load ( std:: sync:: atomic:: Ordering :: Relaxed ) {
206- otel_warn ! (
207- name: "BatchLogProcessor.Emit.ProcessorShutdown" ,
208- message = "BatchLogProcessor has been shutdown. No further logs will be emitted."
209- ) ;
210- return ;
211- }
212-
213190 let result = self
214191 . message_sender
215192 . try_send ( BatchMessage :: ExportLog ( Box :: new ( (
@@ -229,11 +206,6 @@ impl LogProcessor for BatchLogProcessor {
229206 }
230207
231208 fn force_flush ( & self ) -> LogResult < ( ) > {
232- if self . is_shutdown . load ( std:: sync:: atomic:: Ordering :: Relaxed ) {
233- return LogResult :: Err ( LogError :: Other (
234- "BatchLogProcessor is already shutdown" . into ( ) ,
235- ) ) ;
236- }
237209 let ( sender, receiver) = mpsc:: sync_channel ( 1 ) ;
238210 self . message_sender
239211 . try_send ( BatchMessage :: ForceFlush ( sender) )
@@ -251,20 +223,6 @@ impl LogProcessor for BatchLogProcessor {
251223 }
252224
253225 fn shutdown ( & self ) -> LogResult < ( ) > {
254- // test and set is_shutdown flag if it is not set
255- if self
256- . is_shutdown
257- . swap ( true , std:: sync:: atomic:: Ordering :: Relaxed )
258- {
259- otel_warn ! (
260- name: "BatchLogProcessor.Shutdown.ProcessorShutdown" ,
261- message = "BatchLogProcessor has been shutdown. No further logs will be emitted."
262- ) ;
263- return LogResult :: Err ( LogError :: AlreadyShutdown (
264- "BatchLogProcessor is already shutdown" . into ( ) ,
265- ) ) ;
266- }
267-
268226 let dropped_logs = self . dropped_logs_count . load ( Ordering :: Relaxed ) ;
269227 let max_queue_size = self . max_queue_size ;
270228 if dropped_logs > 0 {
@@ -406,7 +364,6 @@ impl BatchLogProcessor {
406364 handle : Mutex :: new ( Some ( handle) ) ,
407365 forceflush_timeout : Duration :: from_secs ( 5 ) , // TODO: make this configurable
408366 shutdown_timeout : Duration :: from_secs ( 5 ) , // TODO: make this configurable
409- is_shutdown : AtomicBool :: new ( false ) ,
410367 dropped_logs_count : AtomicUsize :: new ( 0 ) ,
411368 max_queue_size,
412369 }
@@ -1183,20 +1140,18 @@ mod tests {
11831140 . keep_records_on_shutdown ( )
11841141 . build ( ) ;
11851142 let processor = SimpleLogProcessor :: new ( exporter. clone ( ) ) ;
1143+ let provider = LoggerProvider :: builder ( )
1144+ . with_log_processor ( processor)
1145+ . build ( ) ;
1146+ let logger = provider. logger ( "test-simple-logger" ) ;
11861147
1187- let mut record: LogRecord = Default :: default ( ) ;
1188- let instrumentation: InstrumentationScope = Default :: default ( ) ;
1189-
1190- processor. emit ( & mut record, & instrumentation) ;
1148+ let record: LogRecord = Default :: default ( ) ;
11911149
1192- processor . shutdown ( ) . unwrap ( ) ;
1150+ logger . emit ( record . clone ( ) ) ;
11931151
1194- let is_shutdown = processor
1195- . is_shutdown
1196- . load ( std:: sync:: atomic:: Ordering :: Relaxed ) ;
1197- assert ! ( is_shutdown) ;
1152+ provider. shutdown ( ) . unwrap ( ) ;
11981153
1199- processor . emit ( & mut record, & instrumentation ) ;
1154+ logger . emit ( record) ;
12001155
12011156 assert_eq ! ( 1 , exporter. get_emitted_logs( ) . unwrap( ) . len( ) )
12021157 }
0 commit comments