Skip to content

Commit a48b45a

Browse files
committed
Make changes to approach
1 parent 9b904ac commit a48b45a

File tree

3 files changed

+63
-58
lines changed

3 files changed

+63
-58
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import enum
1818
import logging
1919
import sys
20-
import traceback
20+
from contextvars import ContextVar
2121
from os import environ, linesep
2222
from typing import IO, Callable, Optional, Sequence
2323

@@ -50,6 +50,10 @@
5050
_logger = logging.getLogger(__name__)
5151
_logger.addFilter(DuplicateFilter())
5252

53+
_propagate_false_logger = logging.getLogger(__name__ + ".propagate.false")
54+
_propagate_false_logger.addFilter(DuplicateFilter())
55+
_propagate_false_logger.propagate = False
56+
5357

5458
class LogExportResult(enum.Enum):
5559
SUCCESS = 0
@@ -115,26 +119,31 @@ class SimpleLogRecordProcessor(LogRecordProcessor):
115119
"""
116120

117121
def __init__(self, exporter: LogExporter):
122+
self._emit_executing = ContextVar("var", default=False)
118123
self._exporter = exporter
119124
self._shutdown = False
120125

121126
def on_emit(self, log_data: LogData):
122-
# Prevent entering recursive loop.
123-
if sum(
124-
item.name == "on_emit"
125-
and item.filename.endswith("export/__init__.py")
126-
for item in traceback.extract_stack()
127-
) > 1:
128-
return
129-
if self._shutdown:
130-
_logger.warning("Processor is already shutdown, ignoring call")
127+
# Prevent entering a recursive loop.
128+
if self._emit_executing.get():
129+
_propagate_false_logger.warning(
130+
"SimpleLogRecordProcessor.on_emit has entered a recursive loop. Dropping log and exiting the loop."
131+
)
131132
return
132-
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
133+
emit_token = self._emit_executing.set(True)
134+
suppress_token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
133135
try:
134-
self._exporter.export((log_data,))
135-
except Exception: # pylint: disable=broad-exception-caught
136-
_logger.exception("Exception while exporting logs.")
137-
detach(token)
136+
if self._shutdown:
137+
_logger.warning("Processor is already shutdown, ignoring call")
138+
return
139+
140+
try:
141+
self._exporter.export((log_data,))
142+
except Exception: # pylint: disable=broad-exception-caught
143+
_logger.exception("Exception while exporting logs.")
144+
finally:
145+
self._emit_executing.reset(emit_token)
146+
detach(suppress_token)
138147

139148
def shutdown(self):
140149
self._shutdown = True

opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@
4242
class DuplicateFilter(logging.Filter):
4343
"""Filter that can be applied to internal `logger`'s.
4444
45-
Currently applied to `logger`s on the export logs path that could otherwise cause endless logging of errors or a
46-
recursion depth exceeded issue in cases where logging itself results in an exception."""
45+
Currently applied to `logger`'s on the export logs path to prevent endlessly logging the same log
46+
in cases where logging itself is failing."""
4747

4848
def filter(self, record):
4949
current_log = (
@@ -83,6 +83,9 @@ def shutdown(self):
8383

8484
_logger = logging.getLogger(__name__)
8585
_logger.addFilter(DuplicateFilter())
86+
_propagate_false_logger = logging.getLogger(__name__ + ".propagate.false")
87+
_propagate_false_logger.addFilter(DuplicateFilter())
88+
_propagate_false_logger.propagate = False
8689

8790

8891
class BatchProcessor(Generic[Telemetry]):
@@ -196,15 +199,19 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None:
196199
)
197200
detach(token)
198201

199-
# Do not add any logging.log statements to this function, they can be being routed back to this `emit` function,
200-
# resulting in endless recursive calls that crash the program.
201-
# See https://github.com/open-telemetry/opentelemetry-python/issues/4261
202202
def emit(self, data: Telemetry) -> None:
203203
if self._shutdown:
204+
_logger.info(
205+
"Shutdown called, ignoring %s.", self._exporting
206+
)
204207
return
205208
if self._pid != os.getpid():
206209
self._bsp_reset_once.do_once(self._at_fork_reinit)
207-
# This will drop a log from the right side if the queue is at _max_queue_length.
210+
if len(self._queue) == self._max_queue_size:
211+
_logger.warning(
212+
"Queue full, dropping %s.", self._exporting
213+
)
214+
# This will drop a log from the right side if the queue is at _max_queue_size.
208215
self._queue.appendleft(data)
209216
if len(self._queue) >= self._max_export_batch_size:
210217
self._worker_awaken.set()

opentelemetry-sdk/tests/logs/test_export.py

Lines changed: 26 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
# pylint: disable=protected-access
1616
import logging
1717
import os
18+
import random
1819
import time
1920
import unittest
2021
from concurrent.futures import ThreadPoolExecutor
21-
from sys import version_info
2222
from typing import Sequence
2323
from unittest.mock import Mock, patch
2424

@@ -63,23 +63,45 @@
6363

6464

6565
class TestSimpleLogRecordProcessor(unittest.TestCase):
66+
@mark.skipif(
67+
version_info=(3, 13),
68+
reason="This will fail on 3.13 due to https://github.com/python/cpython/pull/131812 which prevents recursive log messages but was later rolled back.",
69+
)
6670
def test_simple_log_record_processor_doesnt_enter_recursive_loop(self):
6771
class Exporter(LogExporter):
6872
def shutdown(self):
6973
pass
7074

7175
def export(self, batch: Sequence[LogData]):
72-
raise ValueError("Exception raised !")
76+
raise ValueError(
77+
"Exception raised ! {}".format(random.randint(1, 10000))
78+
)
7379

7480
exporter = Exporter()
7581
logger_provider = LoggerProvider()
7682
logger_provider.add_log_record_processor(
7783
SimpleLogRecordProcessor(exporter)
7884
)
7985
root_logger = logging.getLogger()
80-
root_logger.addHandler(LoggingHandler(level=logging.NOTSET,logger_provider=logger_provider))
86+
# Add the OTLP handler to the root logger like is done in auto instrumentation.
87+
# This means logs generated from within SimpleLogRecordProcessor.on_emit are sent back to SimpleLogRecordProcessor.on_emit
88+
handler = LoggingHandler(
89+
level=logging.DEBUG, logger_provider=logger_provider
90+
)
91+
root_logger.addHandler(handler)
92+
propagate_false_logger = logging.getLogger(
93+
"opentelemetry.sdk._logs._internal.export.propagate.false"
94+
)
8195
# This would cause a max recursion depth exceeded error..
82-
root_logger.warning("Something is wrong")
96+
try:
97+
with self.assertLogs(propagate_false_logger) as cm:
98+
root_logger.warning("hello!")
99+
assert (
100+
"SimpleLogRecordProcessor.on_emit has entered a recursive loop"
101+
in cm.output[0]
102+
)
103+
finally:
104+
root_logger.removeHandler(handler)
83105

84106
def test_simple_log_record_processor_default_level(self):
85107
exporter = InMemoryLogExporter()
@@ -403,39 +425,6 @@ def bulk_emit(num_emit):
403425
time.sleep(2)
404426
assert len(exporter.get_finished_logs()) == total_expected_logs
405427

406-
@mark.skipif(
407-
version_info < (3, 10),
408-
reason="assertNoLogs only exists in python 3.10+.",
409-
)
410-
def test_logging_lib_not_invoked_in_batch_log_record_emit(self): # pylint: disable=no-self-use
411-
# See https://github.com/open-telemetry/opentelemetry-python/issues/4261
412-
exporter = Mock()
413-
processor = BatchLogRecordProcessor(exporter)
414-
logger_provider = LoggerProvider(
415-
resource=SDKResource.create(
416-
{
417-
"service.name": "shoppingcart",
418-
"service.instance.id": "instance-12",
419-
}
420-
),
421-
)
422-
logger_provider.add_log_record_processor(processor)
423-
handler = LoggingHandler(
424-
level=logging.INFO, logger_provider=logger_provider
425-
)
426-
sdk_logger = logging.getLogger("opentelemetry.sdk")
427-
# Attach OTLP handler to SDK logger
428-
sdk_logger.addHandler(handler)
429-
# If `emit` calls logging.log then this test will throw a maximum recursion depth exceeded exception and fail.
430-
try:
431-
with self.assertNoLogs(sdk_logger, logging.NOTSET):
432-
processor.on_emit(EMPTY_LOG)
433-
processor.shutdown()
434-
with self.assertNoLogs(sdk_logger, logging.NOTSET):
435-
processor.on_emit(EMPTY_LOG)
436-
finally:
437-
sdk_logger.removeHandler(handler)
438-
439428
def test_args(self):
440429
exporter = InMemoryLogExporter()
441430
log_record_processor = BatchLogRecordProcessor(

0 commit comments

Comments
 (0)