2222import threading
2323import weakref
2424from os import environ , linesep
25- from time import time_ns
26- from typing import IO , Callable , Deque , List , Optional , Sequence
25+ from typing import IO , Callable , Deque , Optional , Sequence
2726
2827from opentelemetry .context import (
2928 _SUPPRESS_INSTRUMENTATION_KEY ,
@@ -56,6 +55,12 @@ class LogExportResult(enum.Enum):
5655 FAILURE = 1
5756
5857
58+ class BatchLogExportStrategy (enum .Enum ):
59+ EXPORT_ALL = 0
60+ EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD = 1
61+ EXPORT_AT_LEAST_ONE_BATCH = 2
62+
63+
5964class LogExporter (abc .ABC ):
6065 """Interface for exporting logs.
6166
@@ -141,14 +146,6 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=n
141146 return True
142147
143148
144- class _FlushRequest :
145- __slots__ = ["event" , "num_log_records" ]
146-
147- def __init__ (self ):
148- self .event = threading .Event ()
149- self .num_log_records = 0
150-
151-
152149_BSP_RESET_ONCE = Once ()
153150
154151
@@ -167,8 +164,6 @@ class BatchLogRecordProcessor(LogRecordProcessor):
167164 """
168165
169166 _queue : Deque [LogData ]
170- _flush_request : _FlushRequest | None
171- _log_records : List [LogData | None ]
172167
173168 def __init__ (
174169 self ,
@@ -190,7 +185,7 @@ def __init__(
190185 max_export_batch_size = (
191186 BatchLogRecordProcessor ._default_max_export_batch_size ()
192187 )
193-
188+ # Not used. No way currently to pass timeout to export.
194189 if export_timeout_millis is None :
195190 export_timeout_millis = (
196191 BatchLogRecordProcessor ._default_export_timeout_millis ()
@@ -202,27 +197,45 @@ def __init__(
202197
203198 self ._exporter = exporter
204199 self ._max_queue_size = max_queue_size
205- self ._schedule_delay_millis = schedule_delay_millis
200+ self ._schedule_delay = schedule_delay_millis / 1e3
206201 self ._max_export_batch_size = max_export_batch_size
202+ # Not used. No way currently to pass timeout to export.
203+ # TODO(https://github.com/open-telemetry/opentelemetry-python/issues/4555): figure out what this should do.
207204 self ._export_timeout_millis = export_timeout_millis
205+ # Deque is thread safe.
208206 self ._queue = collections .deque ([], max_queue_size )
209207 self ._worker_thread = threading .Thread (
210208 name = "OtelBatchLogRecordProcessor" ,
211209 target = self .worker ,
212210 daemon = True ,
213211 )
214- self . _condition = threading . Condition ( threading . Lock ())
212+
215213 self ._shutdown = False
216- self ._flush_request = None
217- self ._log_records = [ None ] * self . _max_export_batch_size
214+ self ._export_lock = threading . Lock ()
215+ self ._worker_awaken = threading . Event ()
218216 self ._worker_thread .start ()
219217 if hasattr (os , "register_at_fork" ):
220218 weak_reinit = weakref .WeakMethod (self ._at_fork_reinit )
221219 os .register_at_fork (after_in_child = lambda : weak_reinit ()()) # pylint: disable=unnecessary-lambda
222220 self ._pid = os .getpid ()
223221
222+ def _should_export_batch (
223+ self , batch_strategy : BatchLogExportStrategy , num_iterations : int
224+ ) -> bool :
225+ if not self ._queue :
226+ return False
227+ # Always continue to export while queue length exceeds max batch size.
228+ if len (self ._queue ) >= self ._max_export_batch_size :
229+ return True
230+ if batch_strategy is BatchLogExportStrategy .EXPORT_ALL :
231+ return True
232+ if batch_strategy is BatchLogExportStrategy .EXPORT_AT_LEAST_ONE_BATCH :
233+ return num_iterations == 0
234+ return False
235+
224236 def _at_fork_reinit (self ):
225- self ._condition = threading .Condition (threading .Lock ())
237+ self ._export_lock = threading .Lock ()
238+ self ._worker_awaken = threading .Event ()
226239 self ._queue .clear ()
227240 self ._worker_thread = threading .Thread (
228241 name = "OtelBatchLogRecordProcessor" ,
@@ -233,152 +246,75 @@ def _at_fork_reinit(self):
233246 self ._pid = os .getpid ()
234247
235248 def worker (self ):
236- timeout = self ._schedule_delay_millis / 1e3
237- flush_request : Optional [_FlushRequest ] = None
238249 while not self ._shutdown :
239- with self ._condition :
240- if self ._shutdown :
241- # shutdown may have been called, avoid further processing
242- break
243- flush_request = self ._get_and_unset_flush_request ()
244- if (
245- len (self ._queue ) < self ._max_export_batch_size
246- and flush_request is None
247- ):
248- self ._condition .wait (timeout )
249-
250- flush_request = self ._get_and_unset_flush_request ()
251- if not self ._queue :
252- timeout = self ._schedule_delay_millis / 1e3
253- self ._notify_flush_request_finished (flush_request )
254- flush_request = None
255- continue
256- if self ._shutdown :
257- break
258-
259- start_ns = time_ns ()
260- self ._export (flush_request )
261- end_ns = time_ns ()
262- # subtract the duration of this export call to the next timeout
263- timeout = self ._schedule_delay_millis / 1e3 - (
264- (end_ns - start_ns ) / 1e9
265- )
266-
267- self ._notify_flush_request_finished (flush_request )
268- flush_request = None
269-
270- # there might have been a new flush request while export was running
271- # and before the done flag switched to true
272- with self ._condition :
273- shutdown_flush_request = self ._get_and_unset_flush_request ()
274-
275- # flush the remaining logs
276- self ._drain_queue ()
277- self ._notify_flush_request_finished (flush_request )
278- self ._notify_flush_request_finished (shutdown_flush_request )
279-
280- def _export (self , flush_request : Optional [_FlushRequest ] = None ):
281- """Exports logs considering the given flush_request.
282-
283- If flush_request is not None then logs are exported in batches
284- until the number of exported logs reached or exceeded the num of logs in
285- flush_request, otherwise exports at max max_export_batch_size logs.
286- """
287- if flush_request is None :
288- self ._export_batch ()
289- return
290-
291- num_log_records = flush_request .num_log_records
292- while self ._queue :
293- exported = self ._export_batch ()
294- num_log_records -= exported
295-
296- if num_log_records <= 0 :
250+ # Lots of strategies in the spec for setting next timeout.
251+ # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#batching-processor.
252+ # Shutdown will interrupt this sleep. Emit will interrupt this sleep only if the queue is bigger then threshold.
253+ sleep_interrupted = self ._worker_awaken .wait (self ._schedule_delay )
254+ if self ._shutdown :
297255 break
298-
299- def _export_batch (self ) -> int :
300- """Exports at most max_export_batch_size logs and returns the number of
301- exported logs.
302- """
303- idx = 0
304- while idx < self ._max_export_batch_size and self ._queue :
305- record = self ._queue .pop ()
306- self ._log_records [idx ] = record
307- idx += 1
308- token = attach (set_value (_SUPPRESS_INSTRUMENTATION_KEY , True ))
309- try :
310- self ._exporter .export (self ._log_records [:idx ]) # type: ignore
311- except Exception : # pylint: disable=broad-exception-caught
312- _logger .exception ("Exception while exporting logs." )
313- detach (token )
314-
315- for index in range (idx ):
316- self ._log_records [index ] = None
317- return idx
318-
319- def _drain_queue (self ):
320- """Export all elements until queue is empty.
321-
322- Can only be called from the worker thread context because it invokes
323- `export` that is not thread safe.
324- """
325- while self ._queue :
326- self ._export_batch ()
327-
328- def _get_and_unset_flush_request (self ) -> Optional [_FlushRequest ]:
329- flush_request = self ._flush_request
330- self ._flush_request = None
331- if flush_request is not None :
332- flush_request .num_log_records = len (self ._queue )
333- return flush_request
334-
335- @staticmethod
336- def _notify_flush_request_finished (
337- flush_request : Optional [_FlushRequest ] = None ,
338- ):
339- if flush_request is not None :
340- flush_request .event .set ()
341-
342- def _get_or_create_flush_request (self ) -> _FlushRequest :
343- if self ._flush_request is None :
344- self ._flush_request = _FlushRequest ()
345- return self ._flush_request
256+ self ._export (
257+ BatchLogExportStrategy .EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD
258+ if sleep_interrupted
259+ else BatchLogExportStrategy .EXPORT_AT_LEAST_ONE_BATCH
260+ )
261+ self ._worker_awaken .clear ()
262+ self ._export (BatchLogExportStrategy .EXPORT_ALL )
263+
264+ def _export (self , batch_strategy : BatchLogExportStrategy ) -> None :
265+ with self ._export_lock :
266+ iteration = 0
267+ # We could see concurrent export calls from worker and force_flush. We call _should_export_batch
268+ # once the lock is obtained to see if we still need to make the requested export.
269+ while self ._should_export_batch (batch_strategy , iteration ):
270+ iteration += 1
271+ token = attach (set_value (_SUPPRESS_INSTRUMENTATION_KEY , True ))
272+ try :
273+ self ._exporter .export (
274+ [
275+ # Oldest records are at the back, so pop from there.
276+ self ._queue .pop ()
277+ for _ in range (
278+ min (
279+ self ._max_export_batch_size ,
280+ len (self ._queue ),
281+ )
282+ )
283+ ]
284+ )
285+ except Exception : # pylint: disable=broad-exception-caught
286+ _logger .exception ("Exception while exporting logs." )
287+ detach (token )
346288
347289 def emit (self , log_data : LogData ) -> None :
348- """Adds the `LogData` to queue and notifies the waiting threads
349- when size of queue reaches max_export_batch_size.
350- """
351290 if self ._shutdown :
291+ _logger .info ("Shutdown called, ignoring log." )
352292 return
353293 if self ._pid != os .getpid ():
354294 _BSP_RESET_ONCE .do_once (self ._at_fork_reinit )
355295
296+ if len (self ._queue ) == self ._max_queue_size :
297+ _logger .warning ("Queue full, dropping log." )
356298 self ._queue .appendleft (log_data )
357299 if len (self ._queue ) >= self ._max_export_batch_size :
358- with self ._condition :
359- self ._condition .notify ()
300+ self ._worker_awaken .set ()
360301
361302 def shutdown (self ):
303+ if self ._shutdown :
304+ return
305+ # Prevents emit and force_flush from further calling export.
362306 self ._shutdown = True
363- with self ._condition :
364- self ._condition .notify_all ()
307+ # Interrupts sleep in the worker, if it's sleeping.
308+ self ._worker_awaken .set ()
309+ # Main worker loop should exit after one final export call with flush all strategy.
365310 self ._worker_thread .join ()
366311 self ._exporter .shutdown ()
367312
368313 def force_flush (self , timeout_millis : Optional [int ] = None ) -> bool :
369- if timeout_millis is None :
370- timeout_millis = self ._export_timeout_millis
371314 if self ._shutdown :
372- return True
373-
374- with self ._condition :
375- flush_request = self ._get_or_create_flush_request ()
376- self ._condition .notify_all ()
377-
378- ret = flush_request .event .wait (timeout_millis / 1e3 )
379- if not ret :
380- _logger .warning ("Timeout was exceeded in force_flush()." )
381- return ret
315+ return
316+ # Blocking call to export.
317+ self ._export (BatchLogExportStrategy .EXPORT_ALL )
382318
383319 @staticmethod
384320 def _default_max_queue_size ():
0 commit comments