Skip to content

Commit 217463e

Browse files
committed
Refactor BatchLogRecordProcessor
1 parent 3644a1e commit 217463e

File tree

2 files changed

+176
-273
lines changed

2 files changed

+176
-273
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

Lines changed: 80 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@
2121
import sys
2222
import threading
2323
from os import environ, linesep
24-
from time import time_ns
25-
from typing import IO, Callable, Deque, List, Optional, Sequence
24+
from typing import IO, Callable, Deque, Optional, Sequence
2625

2726
from opentelemetry.context import (
2827
_SUPPRESS_INSTRUMENTATION_KEY,
@@ -55,6 +54,12 @@ class LogExportResult(enum.Enum):
5554
FAILURE = 1
5655

5756

57+
class BatchLogExportStrategy(enum.Enum):
58+
EXPORT_ALL = 0
59+
EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD = 1
60+
EXPORT_AT_LEAST_ONE_BATCH = 2
61+
62+
5863
class LogExporter(abc.ABC):
5964
"""Interface for exporting logs.
6065
@@ -140,14 +145,6 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=n
140145
return True
141146

142147

143-
class _FlushRequest:
144-
__slots__ = ["event", "num_log_records"]
145-
146-
def __init__(self):
147-
self.event = threading.Event()
148-
self.num_log_records = 0
149-
150-
151148
_BSP_RESET_ONCE = Once()
152149

153150

@@ -166,8 +163,6 @@ class BatchLogRecordProcessor(LogRecordProcessor):
166163
"""
167164

168165
_queue: Deque[LogData]
169-
_flush_request: _FlushRequest | None
170-
_log_records: List[LogData | None]
171166

172167
def __init__(
173168
self,
@@ -189,7 +184,7 @@ def __init__(
189184
max_export_batch_size = (
190185
BatchLogRecordProcessor._default_max_export_batch_size()
191186
)
192-
187+
# Not used. No way currently to pass timeout to export.
193188
if export_timeout_millis is None:
194189
export_timeout_millis = (
195190
BatchLogRecordProcessor._default_export_timeout_millis()
@@ -201,26 +196,43 @@ def __init__(
201196

202197
self._exporter = exporter
203198
self._max_queue_size = max_queue_size
204-
self._schedule_delay_millis = schedule_delay_millis
199+
self._schedule_delay = schedule_delay_millis / 1e3
205200
self._max_export_batch_size = max_export_batch_size
201+
# Not used. No way currently to pass timeout to export.
206202
self._export_timeout_millis = export_timeout_millis
203+
# Deque is thread safe.
207204
self._queue = collections.deque([], max_queue_size)
208205
self._worker_thread = threading.Thread(
209206
name="OtelBatchLogRecordProcessor",
210207
target=self.worker,
211208
daemon=True,
212209
)
213-
self._condition = threading.Condition(threading.Lock())
210+
214211
self._shutdown = False
215-
self._flush_request = None
216-
self._log_records = [None] * self._max_export_batch_size
212+
self._export_lock = threading.Lock()
213+
self._worker_sleep = threading.Event()
217214
self._worker_thread.start()
218215
if hasattr(os, "register_at_fork"):
219216
os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access
220217
self._pid = os.getpid()
221218

219+
def _should_export_batch(
220+
self, batch_strategy: BatchLogExportStrategy, num_iterations: int
221+
) -> bool:
222+
if not self._queue:
223+
return False
224+
# Always continue to export while queue length exceeds max batch size.
225+
if len(self._queue) >= self._max_export_batch_size:
226+
return True
227+
if batch_strategy == BatchLogExportStrategy.EXPORT_ALL:
228+
return True
229+
if batch_strategy == BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH:
230+
return num_iterations == 0
231+
return False
232+
222233
def _at_fork_reinit(self):
223-
self._condition = threading.Condition(threading.Lock())
234+
self._export_lock = threading.Lock()
235+
self._worker_sleep = threading.Event()
224236
self._queue.clear()
225237
self._worker_thread = threading.Thread(
226238
name="OtelBatchLogRecordProcessor",
@@ -231,152 +243,75 @@ def _at_fork_reinit(self):
231243
self._pid = os.getpid()
232244

233245
def worker(self):
234-
timeout = self._schedule_delay_millis / 1e3
235-
flush_request: Optional[_FlushRequest] = None
236246
while not self._shutdown:
237-
with self._condition:
238-
if self._shutdown:
239-
# shutdown may have been called, avoid further processing
240-
break
241-
flush_request = self._get_and_unset_flush_request()
242-
if (
243-
len(self._queue) < self._max_export_batch_size
244-
and flush_request is None
245-
):
246-
self._condition.wait(timeout)
247-
248-
flush_request = self._get_and_unset_flush_request()
249-
if not self._queue:
250-
timeout = self._schedule_delay_millis / 1e3
251-
self._notify_flush_request_finished(flush_request)
252-
flush_request = None
253-
continue
254-
if self._shutdown:
255-
break
256-
257-
start_ns = time_ns()
258-
self._export(flush_request)
259-
end_ns = time_ns()
260-
# subtract the duration of this export call to the next timeout
261-
timeout = self._schedule_delay_millis / 1e3 - (
262-
(end_ns - start_ns) / 1e9
263-
)
264-
265-
self._notify_flush_request_finished(flush_request)
266-
flush_request = None
267-
268-
# there might have been a new flush request while export was running
269-
# and before the done flag switched to true
270-
with self._condition:
271-
shutdown_flush_request = self._get_and_unset_flush_request()
272-
273-
# flush the remaining logs
274-
self._drain_queue()
275-
self._notify_flush_request_finished(flush_request)
276-
self._notify_flush_request_finished(shutdown_flush_request)
277-
278-
def _export(self, flush_request: Optional[_FlushRequest] = None):
279-
"""Exports logs considering the given flush_request.
280-
281-
If flush_request is not None then logs are exported in batches
282-
until the number of exported logs reached or exceeded the num of logs in
283-
flush_request, otherwise exports at max max_export_batch_size logs.
284-
"""
285-
if flush_request is None:
286-
self._export_batch()
287-
return
288-
289-
num_log_records = flush_request.num_log_records
290-
while self._queue:
291-
exported = self._export_batch()
292-
num_log_records -= exported
293-
294-
if num_log_records <= 0:
247+
# Lots of strategies in the spec for setting next timeout.
248+
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#batching-processor.
249+
# Shutdown will interrupt this sleep. Emit will interrupt this sleep only if the queue is bigger then threshold.
250+
sleep_interrupted = self._worker_sleep.wait(self._schedule_delay)
251+
if self._shutdown:
295252
break
296-
297-
def _export_batch(self) -> int:
298-
"""Exports at most max_export_batch_size logs and returns the number of
299-
exported logs.
300-
"""
301-
idx = 0
302-
while idx < self._max_export_batch_size and self._queue:
303-
record = self._queue.pop()
304-
self._log_records[idx] = record
305-
idx += 1
306-
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
307-
try:
308-
self._exporter.export(self._log_records[:idx]) # type: ignore
309-
except Exception: # pylint: disable=broad-exception-caught
310-
_logger.exception("Exception while exporting logs.")
311-
detach(token)
312-
313-
for index in range(idx):
314-
self._log_records[index] = None
315-
return idx
316-
317-
def _drain_queue(self):
318-
"""Export all elements until queue is empty.
319-
320-
Can only be called from the worker thread context because it invokes
321-
`export` that is not thread safe.
322-
"""
323-
while self._queue:
324-
self._export_batch()
325-
326-
def _get_and_unset_flush_request(self) -> Optional[_FlushRequest]:
327-
flush_request = self._flush_request
328-
self._flush_request = None
329-
if flush_request is not None:
330-
flush_request.num_log_records = len(self._queue)
331-
return flush_request
332-
333-
@staticmethod
334-
def _notify_flush_request_finished(
335-
flush_request: Optional[_FlushRequest] = None,
336-
):
337-
if flush_request is not None:
338-
flush_request.event.set()
339-
340-
def _get_or_create_flush_request(self) -> _FlushRequest:
341-
if self._flush_request is None:
342-
self._flush_request = _FlushRequest()
343-
return self._flush_request
253+
self._export(
254+
BatchLogExportStrategy.EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD
255+
if sleep_interrupted
256+
else BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH
257+
)
258+
self._worker_sleep.clear()
259+
self._export(BatchLogExportStrategy.EXPORT_ALL)
260+
261+
def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
262+
with self._export_lock:
263+
iteration = 0
264+
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
265+
# once the lock is obtained to see if we still need to make the requested export.
266+
while self._should_export_batch(batch_strategy, iteration):
267+
iteration += 1
268+
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
269+
try:
270+
self._exporter.export(
271+
[
272+
# Oldest records are at the back, so pop from there.
273+
self._queue.pop()
274+
for _ in range(
275+
min(
276+
self._max_export_batch_size,
277+
len(self._queue),
278+
)
279+
)
280+
]
281+
)
282+
except Exception: # pylint: disable=broad-exception-caught
283+
_logger.exception("Exception while exporting logs.")
284+
detach(token)
344285

345286
def emit(self, log_data: LogData) -> None:
346-
"""Adds the `LogData` to queue and notifies the waiting threads
347-
when size of queue reaches max_export_batch_size.
348-
"""
349287
if self._shutdown:
288+
_logger.warning("Shutdown called, ignoring log.")
350289
return
351290
if self._pid != os.getpid():
352291
_BSP_RESET_ONCE.do_once(self._at_fork_reinit)
353292

293+
if len(self._queue) == self._max_queue_size:
294+
_logger.warning("Queue full, dropping log.")
354295
self._queue.appendleft(log_data)
355296
if len(self._queue) >= self._max_export_batch_size:
356-
with self._condition:
357-
self._condition.notify()
297+
self._worker_sleep.set()
358298

359299
def shutdown(self):
300+
if self._shutdown:
301+
return
302+
# Prevents emit and force_flush from further calling export.
360303
self._shutdown = True
361-
with self._condition:
362-
self._condition.notify_all()
304+
# Interrupts sleep in the worker, if it's sleeping.
305+
self._worker_sleep.set()
306+
# Main worker loop should exit after one final export call with flush all strategy.
363307
self._worker_thread.join()
364308
self._exporter.shutdown()
365309

366310
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
367-
if timeout_millis is None:
368-
timeout_millis = self._export_timeout_millis
369311
if self._shutdown:
370-
return True
371-
372-
with self._condition:
373-
flush_request = self._get_or_create_flush_request()
374-
self._condition.notify_all()
375-
376-
ret = flush_request.event.wait(timeout_millis / 1e3)
377-
if not ret:
378-
_logger.warning("Timeout was exceeded in force_flush().")
379-
return ret
312+
return
313+
# Blocking call to export.
314+
self._export(BatchLogExportStrategy.EXPORT_ALL)
380315

381316
@staticmethod
382317
def _default_max_queue_size():

0 commit comments

Comments
 (0)