1313# limitations under the License.
1414from __future__ import annotations
1515
16- import collections
1716import logging
18- import os
1917import sys
20- import threading
2118import typing
22- import weakref
2319from enum import Enum
2420from os import environ , linesep
25- from time import time_ns
2621
2722from opentelemetry .context import (
2823 _SUPPRESS_INSTRUMENTATION_KEY ,
3126 detach ,
3227 set_value ,
3328)
29+ from opentelemetry .sdk ._shared_internal import BatchProcessor
3430from opentelemetry .sdk .environment_variables import (
3531 OTEL_BSP_EXPORT_TIMEOUT ,
3632 OTEL_BSP_MAX_EXPORT_BATCH_SIZE ,
3733 OTEL_BSP_MAX_QUEUE_SIZE ,
3834 OTEL_BSP_SCHEDULE_DELAY ,
3935)
4036from opentelemetry .sdk .trace import ReadableSpan , Span , SpanProcessor
41- from opentelemetry .util ._once import Once
4237
4338_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
4439_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
@@ -125,19 +120,6 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
125120 return True
126121
127122
128- class _FlushRequest :
129- """Represents a request for the BatchSpanProcessor to flush spans."""
130-
131- __slots__ = ["event" , "num_spans" ]
132-
133- def __init__ (self ):
134- self .event = threading .Event ()
135- self .num_spans = 0
136-
137-
138- _BSP_RESET_ONCE = Once ()
139-
140-
141123class BatchSpanProcessor (SpanProcessor ):
142124 """Batch span processor implementation.
143125
@@ -151,6 +133,8 @@ class BatchSpanProcessor(SpanProcessor):
151133 - :envvar:`OTEL_BSP_MAX_QUEUE_SIZE`
152134 - :envvar:`OTEL_BSP_MAX_EXPORT_BATCH_SIZE`
153135 - :envvar:`OTEL_BSP_EXPORT_TIMEOUT`
136+
137+ All the logic for emitting spans, shutting down etc. resides in the `BatchProcessor` class.
154138 """
155139
156140 def __init__ (
@@ -174,6 +158,7 @@ def __init__(
174158 BatchSpanProcessor ._default_max_export_batch_size ()
175159 )
176160
161+ # Not used. No way currently to pass timeout to export.
177162 if export_timeout_millis is None :
178163 export_timeout_millis = (
179164 BatchSpanProcessor ._default_export_timeout_millis ()
@@ -183,227 +168,30 @@ def __init__(
183168 max_queue_size , schedule_delay_millis , max_export_batch_size
184169 )
185170
186- self .span_exporter = span_exporter
187- self .queue = collections .deque ([], max_queue_size ) # type: typing.Deque[Span]
188- self .worker_thread = threading .Thread (
189- name = "OtelBatchSpanProcessor" , target = self .worker , daemon = True
171+ self ._batch_processor = BatchProcessor (
172+ span_exporter ,
173+ schedule_delay_millis ,
174+ max_export_batch_size ,
175+ export_timeout_millis ,
176+ max_queue_size ,
177+ "Span" ,
190178 )
191- self .condition = threading .Condition (threading .Lock ())
192- self ._flush_request = None # type: typing.Optional[_FlushRequest]
193- self .schedule_delay_millis = schedule_delay_millis
194- self .max_export_batch_size = max_export_batch_size
195- self .max_queue_size = max_queue_size
196- self .export_timeout_millis = export_timeout_millis
197- self .done = False
198- # flag that indicates that spans are being dropped
199- self ._spans_dropped = False
200- # precallocated list to send spans to exporter
201- self .spans_list = [None ] * self .max_export_batch_size # type: typing.List[typing.Optional[Span]]
202- self .worker_thread .start ()
203- if hasattr (os , "register_at_fork" ):
204- weak_reinit = weakref .WeakMethod (self ._at_fork_reinit )
205- os .register_at_fork (after_in_child = lambda : weak_reinit ()()) # pylint: disable=unnecessary-lambda
206- self ._pid = os .getpid ()
207179
208180 def on_start (
209181 self , span : Span , parent_context : Context | None = None
210182 ) -> None :
211183 pass
212184
213185 def on_end (self , span : ReadableSpan ) -> None :
214- if self .done :
215- logger .warning ("Already shutdown, dropping span." )
216- return
217186 if not span .context .trace_flags .sampled :
218187 return
219- if self ._pid != os .getpid ():
220- _BSP_RESET_ONCE .do_once (self ._at_fork_reinit )
221-
222- if len (self .queue ) == self .max_queue_size :
223- if not self ._spans_dropped :
224- logger .warning ("Queue is full, likely spans will be dropped." )
225- self ._spans_dropped = True
226-
227- self .queue .appendleft (span )
228-
229- if len (self .queue ) >= self .max_export_batch_size :
230- with self .condition :
231- self .condition .notify ()
232-
233- def _at_fork_reinit (self ):
234- self .condition = threading .Condition (threading .Lock ())
235- self .queue .clear ()
236-
237- # worker_thread is local to a process, only the thread that issued fork continues
238- # to exist. A new worker thread must be started in child process.
239- self .worker_thread = threading .Thread (
240- name = "OtelBatchSpanProcessor" , target = self .worker , daemon = True
241- )
242- self .worker_thread .start ()
243- self ._pid = os .getpid ()
244-
245- def worker (self ):
246- timeout = self .schedule_delay_millis / 1e3
247- flush_request = None # type: typing.Optional[_FlushRequest]
248- while not self .done :
249- with self .condition :
250- if self .done :
251- # done flag may have changed, avoid waiting
252- break
253- flush_request = self ._get_and_unset_flush_request ()
254- if (
255- len (self .queue ) < self .max_export_batch_size
256- and flush_request is None
257- ):
258- self .condition .wait (timeout )
259- flush_request = self ._get_and_unset_flush_request ()
260- if not self .queue :
261- # spurious notification, let's wait again, reset timeout
262- timeout = self .schedule_delay_millis / 1e3
263- self ._notify_flush_request_finished (flush_request )
264- flush_request = None
265- continue
266- if self .done :
267- # missing spans will be sent when calling flush
268- break
269-
270- # subtract the duration of this export call to the next timeout
271- start = time_ns ()
272- self ._export (flush_request )
273- end = time_ns ()
274- duration = (end - start ) / 1e9
275- timeout = self .schedule_delay_millis / 1e3 - duration
276-
277- self ._notify_flush_request_finished (flush_request )
278- flush_request = None
279-
280- # there might have been a new flush request while export was running
281- # and before the done flag switched to true
282- with self .condition :
283- shutdown_flush_request = self ._get_and_unset_flush_request ()
284-
285- # be sure that all spans are sent
286- self ._drain_queue ()
287- self ._notify_flush_request_finished (flush_request )
288- self ._notify_flush_request_finished (shutdown_flush_request )
289-
290- def _get_and_unset_flush_request (
291- self ,
292- ) -> typing .Optional [_FlushRequest ]:
293- """Returns the current flush request and makes it invisible to the
294- worker thread for subsequent calls.
295- """
296- flush_request = self ._flush_request
297- self ._flush_request = None
298- if flush_request is not None :
299- flush_request .num_spans = len (self .queue )
300- return flush_request
301-
302- @staticmethod
303- def _notify_flush_request_finished (
304- flush_request : typing .Optional [_FlushRequest ],
305- ):
306- """Notifies the flush initiator(s) waiting on the given request/event
307- that the flush operation was finished.
308- """
309- if flush_request is not None :
310- flush_request .event .set ()
311-
312- def _get_or_create_flush_request (self ) -> _FlushRequest :
313- """Either returns the current active flush event or creates a new one.
188+ self ._batch_processor .emit (span )
314189
315- The flush event will be visible and read by the worker thread before an
316- export operation starts. Callers of a flush operation may wait on the
317- returned event to be notified when the flush/export operation was
318- finished.
190+ def shutdown (self ):
191+ return self ._batch_processor .shutdown ()
319192
320- This method is not thread-safe, i.e. callers need to take care about
321- synchronization/locking.
322- """
323- if self ._flush_request is None :
324- self ._flush_request = _FlushRequest ()
325- return self ._flush_request
326-
327- def _export (self , flush_request : typing .Optional [_FlushRequest ]):
328- """Exports spans considering the given flush_request.
329-
330- In case of a given flush_requests spans are exported in batches until
331- the number of exported spans reached or exceeded the number of spans in
332- the flush request.
333- In no flush_request was given at most max_export_batch_size spans are
334- exported.
335- """
336- if not flush_request :
337- self ._export_batch ()
338- return
339-
340- num_spans = flush_request .num_spans
341- while self .queue :
342- num_exported = self ._export_batch ()
343- num_spans -= num_exported
344-
345- if num_spans <= 0 :
346- break
347-
348- def _export_batch (self ) -> int :
349- """Exports at most max_export_batch_size spans and returns the number of
350- exported spans.
351- """
352- idx = 0
353- # currently only a single thread acts as consumer, so queue.pop() will
354- # not raise an exception
355- while idx < self .max_export_batch_size and self .queue :
356- self .spans_list [idx ] = self .queue .pop ()
357- idx += 1
358- token = attach (set_value (_SUPPRESS_INSTRUMENTATION_KEY , True ))
359- try :
360- # Ignore type b/c the Optional[None]+slicing is too "clever"
361- # for mypy
362- self .span_exporter .export (self .spans_list [:idx ]) # type: ignore
363- except Exception : # pylint: disable=broad-exception-caught
364- logger .exception ("Exception while exporting Span batch." )
365- detach (token )
366-
367- # clean up list
368- for index in range (idx ):
369- self .spans_list [index ] = None
370- return idx
371-
372- def _drain_queue (self ):
373- """Export all elements until queue is empty.
374-
375- Can only be called from the worker thread context because it invokes
376- `export` that is not thread safe.
377- """
378- while self .queue :
379- self ._export_batch ()
380-
381- def force_flush (self , timeout_millis : int | None = None ) -> bool :
382- if timeout_millis is None :
383- timeout_millis = self .export_timeout_millis
384-
385- if self .done :
386- logger .warning ("Already shutdown, ignoring call to force_flush()." )
387- return True
388-
389- with self .condition :
390- flush_request = self ._get_or_create_flush_request ()
391- # signal the worker thread to flush and wait for it to finish
392- self .condition .notify_all ()
393-
394- # wait for token to be processed
395- ret = flush_request .event .wait (timeout_millis / 1e3 )
396- if not ret :
397- logger .warning ("Timeout was exceeded in force_flush()." )
398- return ret
399-
400- def shutdown (self ) -> None :
401- # signal the worker thread to finish and then wait for it
402- self .done = True
403- with self .condition :
404- self .condition .notify_all ()
405- self .worker_thread .join ()
406- self .span_exporter .shutdown ()
193+ def force_flush (self , timeout_millis : typing .Optional [int ] = None ) -> bool :
194+ return self ._batch_processor .force_flush (timeout_millis )
407195
408196 @staticmethod
409197 def _default_max_queue_size ():
0 commit comments