Skip to content

Commit 5cc1fc4

Browse files
committed
Make BatchProcessor a member of BLRP instead of having BLRP subclass it
1 parent 8b3aa91 commit 5cc1fc4

File tree

3 files changed

+38
-29
lines changed

3 files changed

+38
-29
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import logging
1919
import sys
2020
from os import environ, linesep
21-
from typing import IO, Callable, Sequence
21+
from typing import IO, Callable, Optional, Sequence
2222

2323
from opentelemetry.context import (
2424
_SUPPRESS_INSTRUMENTATION_KEY,
@@ -131,7 +131,7 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=n
131131
return True
132132

133133

134-
class BatchLogRecordProcessor(BatchProcessor, LogRecordProcessor):
134+
class BatchLogRecordProcessor(LogRecordProcessor):
135135
"""This is an implementation of LogRecordProcessor which creates batches of
136136
received logs in the export-friendly LogData representation and
137137
send to the configured LogExporter, as soon as they are emitted.
@@ -177,7 +177,7 @@ def __init__(
177177
max_queue_size, schedule_delay_millis, max_export_batch_size
178178
)
179179
# Initializes BatchProcessor
180-
super().__init__(
180+
self._batch_processor = BatchProcessor(
181181
exporter,
182182
schedule_delay_millis,
183183
max_export_batch_size,
@@ -186,6 +186,15 @@ def __init__(
186186
"Log",
187187
)
188188

189+
def emit(self, log_data: LogData) -> None:
190+
return self._batch_processor.emit(log_data)
191+
192+
def shutdown(self):
193+
return self._batch_processor.shutdown()
194+
195+
def force_flush(self, timeout_millis: Optional[int] = None):
196+
return self._batch_processor.force_flush(timeout_millis)
197+
189198
@staticmethod
190199
def _default_max_queue_size():
191200
try:

opentelemetry-sdk/tests/logs/test_export.py

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -351,11 +351,11 @@ def test_args(self):
351351
max_export_batch_size=256,
352352
export_timeout_millis=15000,
353353
)
354-
self.assertEqual(log_record_processor._exporter, exporter)
355-
self.assertEqual(log_record_processor._max_queue_size, 1024)
356-
self.assertEqual(log_record_processor._schedule_delay, 2.5)
357-
self.assertEqual(log_record_processor._max_export_batch_size, 256)
358-
self.assertEqual(log_record_processor._export_timeout_millis, 15000)
354+
self.assertEqual(log_record_processor._batch_processor._exporter, exporter)
355+
self.assertEqual(log_record_processor._batch_processor._max_queue_size, 1024)
356+
self.assertEqual(log_record_processor._batch_processor._schedule_delay, 2.5)
357+
self.assertEqual(log_record_processor._batch_processor._max_export_batch_size, 256)
358+
self.assertEqual(log_record_processor._batch_processor._export_timeout_millis, 15000)
359359

360360
@patch.dict(
361361
"os.environ",
@@ -369,20 +369,20 @@ def test_args(self):
369369
def test_env_vars(self):
370370
exporter = InMemoryLogExporter()
371371
log_record_processor = BatchLogRecordProcessor(exporter)
372-
self.assertEqual(log_record_processor._exporter, exporter)
373-
self.assertEqual(log_record_processor._max_queue_size, 1024)
374-
self.assertEqual(log_record_processor._schedule_delay, 2.5)
375-
self.assertEqual(log_record_processor._max_export_batch_size, 256)
376-
self.assertEqual(log_record_processor._export_timeout_millis, 15000)
372+
self.assertEqual(log_record_processor._batch_processor._exporter, exporter)
373+
self.assertEqual(log_record_processor._batch_processor._max_queue_size, 1024)
374+
self.assertEqual(log_record_processor._batch_processor._schedule_delay, 2.5)
375+
self.assertEqual(log_record_processor._batch_processor._max_export_batch_size, 256)
376+
self.assertEqual(log_record_processor._batch_processor._export_timeout_millis, 15000)
377377

378378
def test_args_defaults(self):
379379
exporter = InMemoryLogExporter()
380380
log_record_processor = BatchLogRecordProcessor(exporter)
381-
self.assertEqual(log_record_processor._exporter, exporter)
382-
self.assertEqual(log_record_processor._max_queue_size, 2048)
383-
self.assertEqual(log_record_processor._schedule_delay, 5)
384-
self.assertEqual(log_record_processor._max_export_batch_size, 512)
385-
self.assertEqual(log_record_processor._export_timeout_millis, 30000)
381+
self.assertEqual(log_record_processor._batch_processor._exporter, exporter)
382+
self.assertEqual(log_record_processor._batch_processor._max_queue_size, 2048)
383+
self.assertEqual(log_record_processor._batch_processor._schedule_delay, 5)
384+
self.assertEqual(log_record_processor._batch_processor._max_export_batch_size, 512)
385+
self.assertEqual(log_record_processor._batch_processor._export_timeout_millis, 30000)
386386

387387
@patch.dict(
388388
"os.environ",
@@ -398,11 +398,11 @@ def test_args_env_var_value_error(self):
398398
_logger.disabled = True
399399
log_record_processor = BatchLogRecordProcessor(exporter)
400400
_logger.disabled = False
401-
self.assertEqual(log_record_processor._exporter, exporter)
402-
self.assertEqual(log_record_processor._max_queue_size, 2048)
403-
self.assertEqual(log_record_processor._schedule_delay, 5)
404-
self.assertEqual(log_record_processor._max_export_batch_size, 512)
405-
self.assertEqual(log_record_processor._export_timeout_millis, 30000)
401+
self.assertEqual(log_record_processor._batch_processor._exporter, exporter)
402+
self.assertEqual(log_record_processor._batch_processor._max_queue_size, 2048)
403+
self.assertEqual(log_record_processor._batch_processor._schedule_delay, 5)
404+
self.assertEqual(log_record_processor._batch_processor._max_export_batch_size, 512)
405+
self.assertEqual(log_record_processor._batch_processor._export_timeout_millis, 30000)
406406

407407
def test_args_none_defaults(self):
408408
exporter = InMemoryLogExporter()
@@ -413,11 +413,11 @@ def test_args_none_defaults(self):
413413
max_export_batch_size=None,
414414
export_timeout_millis=None,
415415
)
416-
self.assertEqual(log_record_processor._exporter, exporter)
417-
self.assertEqual(log_record_processor._max_queue_size, 2048)
418-
self.assertEqual(log_record_processor._schedule_delay, 5)
419-
self.assertEqual(log_record_processor._max_export_batch_size, 512)
420-
self.assertEqual(log_record_processor._export_timeout_millis, 30000)
416+
self.assertEqual(log_record_processor._batch_processor._exporter, exporter)
417+
self.assertEqual(log_record_processor._batch_processor._max_queue_size, 2048)
418+
self.assertEqual(log_record_processor._batch_processor._schedule_delay, 5)
419+
self.assertEqual(log_record_processor._batch_processor._max_export_batch_size, 512)
420+
self.assertEqual(log_record_processor._batch_processor._export_timeout_millis, 30000)
421421

422422
def test_validation_negative_max_queue_size(self):
423423
exporter = InMemoryLogExporter()

opentelemetry-sdk/tests/shared_internal/test_batch_processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ def test_telemetry_flushed_before_shutdown_and_dropped_after_shutdown(
103103
batch_processor.emit(telemetry)
104104
batch_processor.shutdown()
105105
exporter.export.assert_called_once_with([telemetry])
106-
assert batch_processor._shutdown is True
106+
assert batch_processor._batch_processor._shutdown is True
107107

108108
# This should not be flushed.
109109
batch_processor.emit(telemetry)

0 commit comments

Comments
 (0)