Skip to content

Commit 41cd520

Browse files
committed
Respond to comments
1 parent 217463e commit 41cd520

File tree

3 files changed

+23
-22
lines changed

3 files changed

+23
-22
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ def __init__(
199199
self._schedule_delay = schedule_delay_millis / 1e3
200200
self._max_export_batch_size = max_export_batch_size
201201
# Not used. No way currently to pass timeout to export.
202+
# TODO(https://github.com/open-telemetry/opentelemetry-python/issues/4555): figure out what this should do.
202203
self._export_timeout_millis = export_timeout_millis
203204
# Deque is thread safe.
204205
self._queue = collections.deque([], max_queue_size)
@@ -210,7 +211,7 @@ def __init__(
210211

211212
self._shutdown = False
212213
self._export_lock = threading.Lock()
213-
self._worker_sleep = threading.Event()
214+
self._worker_awaken = threading.Event()
214215
self._worker_thread.start()
215216
if hasattr(os, "register_at_fork"):
216217
os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access
@@ -224,15 +225,15 @@ def _should_export_batch(
224225
# Always continue to export while queue length exceeds max batch size.
225226
if len(self._queue) >= self._max_export_batch_size:
226227
return True
227-
if batch_strategy == BatchLogExportStrategy.EXPORT_ALL:
228+
if batch_strategy is BatchLogExportStrategy.EXPORT_ALL:
228229
return True
229-
if batch_strategy == BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH:
230+
if batch_strategy is BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH:
230231
return num_iterations == 0
231232
return False
232233

233234
def _at_fork_reinit(self):
234235
self._export_lock = threading.Lock()
235-
self._worker_sleep = threading.Event()
236+
self._worker_awaken = threading.Event()
236237
self._queue.clear()
237238
self._worker_thread = threading.Thread(
238239
name="OtelBatchLogRecordProcessor",
@@ -247,15 +248,15 @@ def worker(self):
247248
# Lots of strategies in the spec for setting next timeout.
248249
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#batching-processor.
249250
# Shutdown will interrupt this sleep. Emit will interrupt this sleep only if the queue is bigger then threshold.
250-
sleep_interrupted = self._worker_sleep.wait(self._schedule_delay)
251+
sleep_interrupted = self._worker_awaken.wait(self._schedule_delay)
251252
if self._shutdown:
252253
break
253254
self._export(
254255
BatchLogExportStrategy.EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD
255256
if sleep_interrupted
256257
else BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH
257258
)
258-
self._worker_sleep.clear()
259+
self._worker_awaken.clear()
259260
self._export(BatchLogExportStrategy.EXPORT_ALL)
260261

261262
def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
@@ -285,7 +286,7 @@ def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
285286

286287
def emit(self, log_data: LogData) -> None:
287288
if self._shutdown:
288-
_logger.warning("Shutdown called, ignoring log.")
289+
_logger.info("Shutdown called, ignoring log.")
289290
return
290291
if self._pid != os.getpid():
291292
_BSP_RESET_ONCE.do_once(self._at_fork_reinit)
@@ -294,15 +295,15 @@ def emit(self, log_data: LogData) -> None:
294295
_logger.warning("Queue full, dropping log.")
295296
self._queue.appendleft(log_data)
296297
if len(self._queue) >= self._max_export_batch_size:
297-
self._worker_sleep.set()
298+
self._worker_awaken.set()
298299

299300
def shutdown(self):
300301
if self._shutdown:
301302
return
302303
# Prevents emit and force_flush from further calling export.
303304
self._shutdown = True
304305
# Interrupts sleep in the worker, if it's sleeping.
305-
self._worker_sleep.set()
306+
self._worker_awaken.set()
306307
# Main worker loop should exit after one final export call with flush all strategy.
307308
self._worker_thread.join()
308309
self._exporter.shutdown()

opentelemetry-sdk/src/opentelemetry/sdk/environment_variables/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
.. envvar:: OTEL_BLRP_EXPORT_TIMEOUT
8888
8989
The :envvar:`OTEL_BLRP_EXPORT_TIMEOUT` represents the maximum allowed time to export data from the BatchLogRecordProcessor.
90+
This environment variable currently does nothing, see https://github.com/open-telemetry/opentelemetry-python/issues/4555.
9091
Default: 30000
9192
"""
9293

opentelemetry-sdk/tests/logs/test_export.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -485,24 +485,19 @@ def test_logs_exported_once_batch_size_reached(self):
485485
exporter.export.assert_called_once()
486486
after_export = time.time_ns()
487487
# Shows the worker's 30 second sleep was interrupted within a second.
488-
self.assertTrue((after_export - before_export) < 1e9)
488+
self.assertLess(after_export - before_export, 1e9)
489489

490-
# pylint: disable=no-self-use
491490
def test_logs_exported_once_schedule_delay_reached(self):
492491
exporter = Mock()
493492
log_record_processor = BatchLogRecordProcessor(
494493
exporter=exporter,
495-
# Should not reach this during the test, instead export should be called when delay millis is hit.
496494
max_queue_size=15,
497495
max_export_batch_size=15,
498496
schedule_delay_millis=100,
499497
)
500-
for _ in range(15):
501-
log_record_processor.emit(EMPTY_LOG)
502-
time.sleep(0.11)
503-
exporter.export.assert_has_calls(
504-
[call([EMPTY_LOG]) for _ in range(15)]
505-
)
498+
log_record_processor.emit(EMPTY_LOG)
499+
time.sleep(0.11)
500+
exporter.export.assert_called_once_with([EMPTY_LOG])
506501

507502
def test_logs_flushed_before_shutdown_and_dropped_after_shutdown(self):
508503
exporter = Mock()
@@ -516,15 +511,16 @@ def test_logs_flushed_before_shutdown_and_dropped_after_shutdown(self):
516511
# This log should be flushed because it was written before shutdown.
517512
log_record_processor.emit(EMPTY_LOG)
518513
log_record_processor.shutdown()
514+
exporter.export.assert_called_once_with([EMPTY_LOG])
519515
self.assertTrue(exporter._stopped)
520516

521-
with self.assertLogs(level="WARNING") as log:
517+
with self.assertLogs(level="INFO") as log:
522518
# This log should not be flushed.
523519
log_record_processor.emit(EMPTY_LOG)
524520
self.assertEqual(len(log.output), 1)
525521
self.assertEqual(len(log.records), 1)
526522
self.assertIn("Shutdown called, ignoring log.", log.output[0])
527-
exporter.export.assert_called_once_with([EMPTY_LOG])
523+
exporter.export.assert_called_once()
528524

529525
# pylint: disable=no-self-use
530526
def test_force_flush_flushes_logs(self):
@@ -570,7 +566,9 @@ def test_batch_log_record_processor_fork(self):
570566
max_export_batch_size=64,
571567
schedule_delay_millis=30000,
572568
)
573-
# These are not expected to be flushed. Calling fork clears any logs not flushed.
569+
# These logs should be flushed only from the parent process.
570+
# _at_fork_reinit should be called in the child process, to
571+
# clear these logs in the child process.
574572
for _ in range(10):
575573
log_record_processor.emit(EMPTY_LOG)
576574

@@ -590,7 +588,8 @@ def child(conn):
590588
process.start()
591589
self.assertTrue(parent_conn.recv())
592590
process.join()
593-
self.assertTrue(len(exporter.get_finished_logs()) == 0)
591+
log_record_processor.force_flush()
592+
self.assertTrue(len(exporter.get_finished_logs()) == 10)
594593

595594

596595
class TestConsoleLogExporter(unittest.TestCase):

0 commit comments

Comments
 (0)