Skip to content

Commit db27c7c

Browse files
Make force_flush available on SDK's tracer provider (#594)
Co-authored-by: Yusuke Tsutsumi <[email protected]>
1 parent d27979f commit db27c7c

File tree

3 files changed

+463
-7
lines changed

3 files changed

+463
-7
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py

Lines changed: 150 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,25 @@
1515

1616
import abc
1717
import atexit
18+
import concurrent.futures
1819
import json
1920
import logging
2021
import random
2122
import threading
2223
from collections import OrderedDict
2324
from contextlib import contextmanager
2425
from types import TracebackType
25-
from typing import Iterator, MutableSequence, Optional, Sequence, Tuple, Type
26+
from typing import (
27+
Any,
28+
Callable,
29+
Iterator,
30+
MutableSequence,
31+
Optional,
32+
Sequence,
33+
Tuple,
34+
Type,
35+
Union,
36+
)
2637

2738
from opentelemetry import context as context_api
2839
from opentelemetry import trace as trace_api
@@ -89,9 +100,12 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
89100
"""
90101

91102

92-
class MultiSpanProcessor(SpanProcessor):
93-
"""Implementation of :class:`SpanProcessor` that forwards all received
94-
events to a list of `SpanProcessor`.
103+
class SynchronousMultiSpanProcessor(SpanProcessor):
104+
"""Implementation of class:`SpanProcessor` that forwards all received
105+
events to a list of span processors sequentially.
106+
107+
The underlying span processors are called in sequential order as they were
108+
added.
95109
"""
96110

97111
def __init__(self):
@@ -114,9 +128,113 @@ def on_end(self, span: "Span") -> None:
114128
sp.on_end(span)
115129

116130
def shutdown(self) -> None:
131+
"""Sequentially shuts down all underlying span processors.
132+
"""
117133
for sp in self._span_processors:
118134
sp.shutdown()
119135

136+
def force_flush(self, timeout_millis: int = 30000) -> bool:
137+
"""Sequentially calls force_flush on all underlying
138+
:class:`SpanProcessor`
139+
140+
Args:
141+
timeout_millis: The maximum amount of time over all span processors
142+
to wait for spans to be exported. In case the first n span
143+
processors exceeded the timeout followup span processors will be
144+
skipped.
145+
146+
Returns:
147+
True if all span processors flushed their spans within the
148+
given timeout, False otherwise.
149+
"""
150+
deadline_ns = time_ns() + timeout_millis * 1000000
151+
for sp in self._span_processors:
152+
current_time_ns = time_ns()
153+
if current_time_ns >= deadline_ns:
154+
return False
155+
156+
if not sp.force_flush((deadline_ns - current_time_ns) // 1000000):
157+
return False
158+
159+
return True
160+
161+
162+
class ConcurrentMultiSpanProcessor(SpanProcessor):
163+
"""Implementation of :class:`SpanProcessor` that forwards all received
164+
events to a list of span processors in parallel.
165+
166+
Calls to the underlying span processors are forwarded in parallel by
167+
submitting them to a thread pool executor and waiting until each span
168+
processor finished its work.
169+
170+
Args:
171+
num_threads: The number of threads managed by the thread pool executor
172+
and thus defining how many span processors can work in parallel.
173+
"""
174+
175+
def __init__(self, num_threads: int = 2):
176+
# use a tuple to avoid race conditions when adding a new span and
177+
# iterating through it on "on_start" and "on_end".
178+
self._span_processors = () # type: Tuple[SpanProcessor, ...]
179+
self._lock = threading.Lock()
180+
self._executor = concurrent.futures.ThreadPoolExecutor(
181+
max_workers=num_threads
182+
)
183+
184+
def add_span_processor(self, span_processor: SpanProcessor) -> None:
185+
"""Adds a SpanProcessor to the list handled by this instance."""
186+
with self._lock:
187+
self._span_processors = self._span_processors + (span_processor,)
188+
189+
def _submit_and_await(
190+
self, func: Callable[[SpanProcessor], Callable[..., None]], *args: Any
191+
):
192+
futures = []
193+
for sp in self._span_processors:
194+
future = self._executor.submit(func(sp), *args)
195+
futures.append(future)
196+
for future in futures:
197+
future.result()
198+
199+
def on_start(self, span: "Span") -> None:
200+
self._submit_and_await(lambda sp: sp.on_start, span)
201+
202+
def on_end(self, span: "Span") -> None:
203+
self._submit_and_await(lambda sp: sp.on_end, span)
204+
205+
def shutdown(self) -> None:
206+
"""Shuts down all underlying span processors in parallel."""
207+
self._submit_and_await(lambda sp: sp.shutdown)
208+
209+
def force_flush(self, timeout_millis: int = 30000) -> bool:
210+
"""Calls force_flush on all underlying span processors in parallel.
211+
212+
Args:
213+
timeout_millis: The maximum amount of time to wait for spans to be
214+
exported.
215+
216+
Returns:
217+
True if all span processors flushed their spans within the given
218+
timeout, False otherwise.
219+
"""
220+
futures = []
221+
for sp in self._span_processors: # type: SpanProcessor
222+
future = self._executor.submit(sp.force_flush, timeout_millis)
223+
futures.append(future)
224+
225+
timeout_sec = timeout_millis / 1e3
226+
done_futures, not_done_futures = concurrent.futures.wait(
227+
futures, timeout_sec
228+
)
229+
if not_done_futures:
230+
return False
231+
232+
for future in done_futures:
233+
if not future.result():
234+
return False
235+
236+
return True
237+
120238

121239
class EventBase(abc.ABC):
122240
def __init__(self, name: str, timestamp: Optional[int] = None) -> None:
@@ -730,8 +848,13 @@ def __init__(
730848
sampler: sampling.Sampler = trace_api.sampling.ALWAYS_ON,
731849
resource: Resource = Resource.create_empty(),
732850
shutdown_on_exit: bool = True,
851+
active_span_processor: Union[
852+
SynchronousMultiSpanProcessor, ConcurrentMultiSpanProcessor
853+
] = None,
733854
):
734-
self._active_span_processor = MultiSpanProcessor()
855+
self._active_span_processor = (
856+
active_span_processor or SynchronousMultiSpanProcessor()
857+
)
735858
self.resource = resource
736859
self.sampler = sampler
737860
self._atexit_handler = None
@@ -759,8 +882,8 @@ def add_span_processor(self, span_processor: SpanProcessor) -> None:
759882
The span processors are invoked in the same order they are registered.
760883
"""
761884

762-
# no lock here because MultiSpanProcessor.add_span_processor is
763-
# thread safe
885+
# no lock here because add_span_processor is thread safe for both
886+
# SynchronousMultiSpanProcessor and ConcurrentMultiSpanProcessor.
764887
self._active_span_processor.add_span_processor(span_processor)
765888

766889
def shutdown(self):
@@ -769,3 +892,23 @@ def shutdown(self):
769892
if self._atexit_handler is not None:
770893
atexit.unregister(self._atexit_handler)
771894
self._atexit_handler = None
895+
896+
def force_flush(self, timeout_millis: int = 30000) -> bool:
897+
"""Requests the active span processor to process all spans that have not
898+
yet been processed.
899+
900+
By default force flush is called sequentially on all added span
901+
processors. This means that span processors further back in the list
902+
have less time to flush their spans.
903+
To have span processors flush their spans in parallel it is possible to
904+
initialize the tracer provider with an instance of
905+
`ConcurrentMultiSpanProcessor` at the cost of using multiple threads.
906+
907+
Args:
908+
timeout_millis: The maximum amount of time to wait for spans to be
909+
processed.
910+
911+
Returns:
912+
False if the timeout is exceeded, True otherwise.
913+
"""
914+
return self._active_span_processor.force_flush(timeout_millis)

0 commit comments

Comments
 (0)