Skip to content

Commit 60fd40f

Browse files
Merge branch 'main' into otlp-http-metrics-export-max-batch
2 parents f1ef6c4 + 037e9cb commit 60fd40f

File tree

7 files changed

+526
-337
lines changed

7 files changed

+526
-337
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
- Refactor `BatchLogRecordProcessor` to simplify code and make the control flow more
11+
clear ([#4562](https://github.com/open-telemetry/opentelemetry-python/pull/4562/)
12+
and [#4535](https://github.com/open-telemetry/opentelemetry-python/pull/4535)).
1013
- Add configurable `max_export_batch_size` to OTLP HTTP metrics exporter
1114
([#4576](https://github.com/open-telemetry/opentelemetry-python/pull/4576))
1215

@@ -90,6 +93,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
9093
([#4353](https://github.com/open-telemetry/opentelemetry-python/pull/4353))
9194
- sdk: don't log or print warnings when the SDK has been disabled
9295
([#4371](https://github.com/open-telemetry/opentelemetry-python/pull/4371))
96+
- Configurable max retry timeout for grpc exporter
97+
([#4333](https://github.com/open-telemetry/opentelemetry-python/pull/4333))
9398
- Fix span context manager typing by using ParamSpec from typing_extensions
9499
([#4389](https://github.com/open-telemetry/opentelemetry-python/pull/4389))
95100
- Fix serialization of None values in logs body to match 1.31.0+ data model

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,8 @@ class OTLPExporterMixin(
187187
compression: gRPC compression method to use
188188
"""
189189

190+
_MAX_RETRY_TIMEOUT = 64
191+
190192
def __init__(
191193
self,
192194
endpoint: Optional[str] = None,
@@ -286,12 +288,13 @@ def _export(
286288
# data.__class__.__name__,
287289
# delay,
288290
# )
289-
max_value = 64
290291
# expo returns a generator that yields delay values which grow
291292
# exponentially. Once delay is greater than max_value, the yielded
292293
# value will remain constant.
293-
for delay in _create_exp_backoff_generator(max_value=max_value):
294-
if delay == max_value or self._shutdown:
294+
for delay in _create_exp_backoff_generator(
295+
max_value=self._MAX_RETRY_TIMEOUT
296+
):
297+
if delay == self._MAX_RETRY_TIMEOUT or self._shutdown:
295298
return self._result.FAILURE
296299

297300
with self._export_lock:

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

Lines changed: 16 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,11 @@
1414
from __future__ import annotations
1515

1616
import abc
17-
import collections
1817
import enum
1918
import logging
20-
import os
2119
import sys
22-
import threading
23-
import weakref
2420
from os import environ, linesep
25-
from typing import IO, Callable, Deque, Optional, Sequence
21+
from typing import IO, Callable, Optional, Sequence
2622

2723
from opentelemetry.context import (
2824
_SUPPRESS_INSTRUMENTATION_KEY,
@@ -31,13 +27,13 @@
3127
set_value,
3228
)
3329
from opentelemetry.sdk._logs import LogData, LogRecord, LogRecordProcessor
30+
from opentelemetry.sdk._shared_internal import BatchProcessor
3431
from opentelemetry.sdk.environment_variables import (
3532
OTEL_BLRP_EXPORT_TIMEOUT,
3633
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
3734
OTEL_BLRP_MAX_QUEUE_SIZE,
3835
OTEL_BLRP_SCHEDULE_DELAY,
3936
)
40-
from opentelemetry.util._once import Once
4137

4238
_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
4339
_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
@@ -46,7 +42,6 @@
4642
_ENV_VAR_INT_VALUE_ERROR_MESSAGE = (
4743
"Unable to parse value for %s as integer. Defaulting to %s."
4844
)
49-
5045
_logger = logging.getLogger(__name__)
5146

5247

@@ -55,29 +50,19 @@ class LogExportResult(enum.Enum):
5550
FAILURE = 1
5651

5752

58-
class BatchLogExportStrategy(enum.Enum):
59-
EXPORT_ALL = 0
60-
EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD = 1
61-
EXPORT_AT_LEAST_ONE_BATCH = 2
62-
63-
6453
class LogExporter(abc.ABC):
6554
"""Interface for exporting logs.
66-
6755
Interface to be implemented by services that want to export logs received
6856
in their own format.
69-
7057
To export data this MUST be registered to the :class`opentelemetry.sdk._logs.Logger` using a
7158
log processor.
7259
"""
7360

7461
@abc.abstractmethod
7562
def export(self, batch: Sequence[LogData]):
7663
"""Exports a batch of logs.
77-
7864
Args:
7965
batch: The list of `LogData` objects to be exported
80-
8166
Returns:
8267
The result of the export
8368
"""
@@ -146,9 +131,6 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=n
146131
return True
147132

148133

149-
_BSP_RESET_ONCE = Once()
150-
151-
152134
class BatchLogRecordProcessor(LogRecordProcessor):
153135
"""This is an implementation of LogRecordProcessor which creates batches of
154136
received logs in the export-friendly LogData representation and
@@ -161,9 +143,9 @@ class BatchLogRecordProcessor(LogRecordProcessor):
161143
- :envvar:`OTEL_BLRP_MAX_QUEUE_SIZE`
162144
- :envvar:`OTEL_BLRP_MAX_EXPORT_BATCH_SIZE`
163145
- :envvar:`OTEL_BLRP_EXPORT_TIMEOUT`
164-
"""
165146
166-
_queue: Deque[LogData]
147+
All the logic for emitting logs, shutting down etc. resides in the BatchProcessor class.
148+
"""
167149

168150
def __init__(
169151
self,
@@ -194,127 +176,24 @@ def __init__(
194176
BatchLogRecordProcessor._validate_arguments(
195177
max_queue_size, schedule_delay_millis, max_export_batch_size
196178
)
197-
198-
self._exporter = exporter
199-
self._max_queue_size = max_queue_size
200-
self._schedule_delay = schedule_delay_millis / 1e3
201-
self._max_export_batch_size = max_export_batch_size
202-
# Not used. No way currently to pass timeout to export.
203-
# TODO(https://github.com/open-telemetry/opentelemetry-python/issues/4555): figure out what this should do.
204-
self._export_timeout_millis = export_timeout_millis
205-
# Deque is thread safe.
206-
self._queue = collections.deque([], max_queue_size)
207-
self._worker_thread = threading.Thread(
208-
name="OtelBatchLogRecordProcessor",
209-
target=self.worker,
210-
daemon=True,
179+
# Initializes BatchProcessor
180+
self._batch_processor = BatchProcessor(
181+
exporter,
182+
schedule_delay_millis,
183+
max_export_batch_size,
184+
export_timeout_millis,
185+
max_queue_size,
186+
"Log",
211187
)
212188

213-
self._shutdown = False
214-
self._export_lock = threading.Lock()
215-
self._worker_awaken = threading.Event()
216-
self._worker_thread.start()
217-
if hasattr(os, "register_at_fork"):
218-
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
219-
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda
220-
self._pid = os.getpid()
221-
222-
def _should_export_batch(
223-
self, batch_strategy: BatchLogExportStrategy, num_iterations: int
224-
) -> bool:
225-
if not self._queue:
226-
return False
227-
# Always continue to export while queue length exceeds max batch size.
228-
if len(self._queue) >= self._max_export_batch_size:
229-
return True
230-
if batch_strategy is BatchLogExportStrategy.EXPORT_ALL:
231-
return True
232-
if batch_strategy is BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH:
233-
return num_iterations == 0
234-
return False
235-
236-
def _at_fork_reinit(self):
237-
self._export_lock = threading.Lock()
238-
self._worker_awaken = threading.Event()
239-
self._queue.clear()
240-
self._worker_thread = threading.Thread(
241-
name="OtelBatchLogRecordProcessor",
242-
target=self.worker,
243-
daemon=True,
244-
)
245-
self._worker_thread.start()
246-
self._pid = os.getpid()
247-
248-
def worker(self):
249-
while not self._shutdown:
250-
# Lots of strategies in the spec for setting next timeout.
251-
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#batching-processor.
252-
# Shutdown will interrupt this sleep. Emit will interrupt this sleep only if the queue is bigger then threshold.
253-
sleep_interrupted = self._worker_awaken.wait(self._schedule_delay)
254-
if self._shutdown:
255-
break
256-
self._export(
257-
BatchLogExportStrategy.EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD
258-
if sleep_interrupted
259-
else BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH
260-
)
261-
self._worker_awaken.clear()
262-
self._export(BatchLogExportStrategy.EXPORT_ALL)
263-
264-
def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
265-
with self._export_lock:
266-
iteration = 0
267-
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
268-
# once the lock is obtained to see if we still need to make the requested export.
269-
while self._should_export_batch(batch_strategy, iteration):
270-
iteration += 1
271-
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
272-
try:
273-
self._exporter.export(
274-
[
275-
# Oldest records are at the back, so pop from there.
276-
self._queue.pop()
277-
for _ in range(
278-
min(
279-
self._max_export_batch_size,
280-
len(self._queue),
281-
)
282-
)
283-
]
284-
)
285-
except Exception: # pylint: disable=broad-exception-caught
286-
_logger.exception("Exception while exporting logs.")
287-
detach(token)
288-
289189
def emit(self, log_data: LogData) -> None:
290-
if self._shutdown:
291-
_logger.info("Shutdown called, ignoring log.")
292-
return
293-
if self._pid != os.getpid():
294-
_BSP_RESET_ONCE.do_once(self._at_fork_reinit)
295-
296-
if len(self._queue) == self._max_queue_size:
297-
_logger.warning("Queue full, dropping log.")
298-
self._queue.appendleft(log_data)
299-
if len(self._queue) >= self._max_export_batch_size:
300-
self._worker_awaken.set()
190+
return self._batch_processor.emit(log_data)
301191

302192
def shutdown(self):
303-
if self._shutdown:
304-
return
305-
# Prevents emit and force_flush from further calling export.
306-
self._shutdown = True
307-
# Interrupts sleep in the worker, if it's sleeping.
308-
self._worker_awaken.set()
309-
# Main worker loop should exit after one final export call with flush all strategy.
310-
self._worker_thread.join()
311-
self._exporter.shutdown()
193+
return self._batch_processor.shutdown()
312194

313-
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
314-
if self._shutdown:
315-
return
316-
# Blocking call to export.
317-
self._export(BatchLogExportStrategy.EXPORT_ALL)
195+
def force_flush(self, timeout_millis: Optional[int] = None):
196+
return self._batch_processor.force_flush(timeout_millis)
318197

319198
@staticmethod
320199
def _default_max_queue_size():

0 commit comments

Comments
 (0)