Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#4782](https://github.com/open-telemetry/opentelemetry-python/pull/4782))
- semantic-conventions: Bump to 1.38.0
([#4791](https://github.com/open-telemetry/opentelemetry-python/pull/4791))
- Prevent possible endless recursion from happening in `SimpleLogRecordProcessor.on_emit`,
([#4799](https://github.com/open-telemetry/opentelemetry-python/pull/4799)).

## Version 1.38.0/0.59b0 (2025-10-16)

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-api/test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ wrapt==1.16.0
zipp==3.20.2
-e opentelemetry-sdk
-e opentelemetry-semantic-conventions
-e tests/opentelemetry-test-utils
-e opentelemetry-api
-e tests/opentelemetry-test-utils
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import enum
import logging
import sys
import traceback
from os import environ, linesep
from typing import IO, Callable, Optional, Sequence

Expand Down Expand Up @@ -49,6 +50,10 @@
_logger = logging.getLogger(__name__)
_logger.addFilter(DuplicateFilter())

_propagate_false_logger = logging.getLogger(__name__ + ".propagate.false")
_propagate_false_logger.addFilter(DuplicateFilter())
_propagate_false_logger.propagate = False


class LogExportResult(enum.Enum):
SUCCESS = 0
Expand Down Expand Up @@ -118,15 +123,36 @@ def __init__(self, exporter: LogExporter):
self._shutdown = False

def on_emit(self, log_data: LogData):
if self._shutdown:
_logger.warning("Processor is already shutdown, ignoring call")
# Prevent entering a recursive loop.
if (
sum(
item.name == "on_emit"
and (
item.filename.endswith("export/__init__.py")
or item.filename.endswith(
r"export\__init__.py"
) # backward slash on windows..
)
for item in traceback.extract_stack()
)
> 3
):
_propagate_false_logger.warning(
"SimpleLogRecordProcessor.on_emit has entered a recursive loop. Dropping log and exiting the loop."
)
return
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
try:
self._exporter.export((log_data,))
except Exception: # pylint: disable=broad-exception-caught
_logger.exception("Exception while exporting logs.")
detach(token)
if self._shutdown:
_logger.warning("Processor is already shutdown, ignoring call")
return

try:
self._exporter.export((log_data,))
except Exception: # pylint: disable=broad-exception-caught
_logger.exception("Exception while exporting logs.")
finally:
detach(token)

def shutdown(self):
self._shutdown = True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
class DuplicateFilter(logging.Filter):
"""Filter that can be applied to internal `logger`'s.

Currently applied to `logger`s on the export logs path that could otherwise cause endless logging of errors or a
recursion depth exceeded issue in cases where logging itself results in an exception."""
Currently applied to `logger`'s on the export logs path to prevent endlessly logging the same log
in cases where logging itself is failing."""

def filter(self, record):
current_log = (
Expand Down Expand Up @@ -81,6 +81,13 @@ def shutdown(self):
raise NotImplementedError


_logger = logging.getLogger(__name__)
_logger.addFilter(DuplicateFilter())
_propagate_false_logger = logging.getLogger(__name__ + ".propagate.false")
_propagate_false_logger.addFilter(DuplicateFilter())
_propagate_false_logger.propagate = False


class BatchProcessor(Generic[Telemetry]):
"""This class can be used with exporter's that implement the above
Exporter interface to buffer and send telemetry in batch through
Expand Down Expand Up @@ -111,8 +118,6 @@ def __init__(
target=self.worker,
daemon=True,
)
self._logger = logging.getLogger(__name__)
self._logger.addFilter(DuplicateFilter())
self._exporting = exporting

self._shutdown = False
Expand Down Expand Up @@ -189,20 +194,20 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None:
]
)
except Exception: # pylint: disable=broad-exception-caught
self._logger.exception(
_logger.exception(
"Exception while exporting %s.", self._exporting
)
detach(token)

# Do not add any logging.log statements to this function, they can be being routed back to this `emit` function,
# resulting in endless recursive calls that crash the program.
# See https://github.com/open-telemetry/opentelemetry-python/issues/4261
def emit(self, data: Telemetry) -> None:
if self._shutdown:
_logger.info("Shutdown called, ignoring %s.", self._exporting)
return
if self._pid != os.getpid():
self._bsp_reset_once.do_once(self._at_fork_reinit)
# This will drop a log from the right side if the queue is at _max_queue_length.
if len(self._queue) == self._max_queue_size:
_logger.warning("Queue full, dropping %s.", self._exporting)
# This will drop a log from the right side if the queue is at _max_queue_size.
self._queue.appendleft(data)
if len(self._queue) >= self._max_export_batch_size:
self._worker_awaken.set()
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ tomli==2.0.1
typing_extensions==4.10.0
wrapt==1.16.0
zipp==3.19.2
-e tests/opentelemetry-test-utils
-e opentelemetry-api
-e tests/opentelemetry-test-utils
-e opentelemetry-semantic-conventions
-e opentelemetry-sdk
77 changes: 43 additions & 34 deletions opentelemetry-sdk/tests/logs/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
# pylint: disable=protected-access
import logging
import os
import sys
import time
import unittest
from concurrent.futures import ThreadPoolExecutor
from sys import version_info
from typing import Sequence
from unittest.mock import Mock, patch

from pytest import mark
Expand All @@ -36,6 +37,7 @@
BatchLogRecordProcessor,
ConsoleLogExporter,
InMemoryLogExporter,
LogExporter,
SimpleLogRecordProcessor,
)
from opentelemetry.sdk.environment_variables import (
Expand All @@ -61,6 +63,46 @@


class TestSimpleLogRecordProcessor(unittest.TestCase):
@mark.skipif(
sys.version_info == (3, 13),
reason="This will fail on 3.13 due to https://github.com/python/cpython/pull/131812 which prevents recursive log messages but was later rolled back.",
)
def test_simple_log_record_processor_doesnt_enter_recursive_loop(self):
class Exporter(LogExporter):
def shutdown(self):
pass

def export(self, batch: Sequence[LogData]):
logger = logging.getLogger("any logger..")
logger.warning("Something happened.")

exporter = Exporter()
logger_provider = LoggerProvider()
logger_provider.add_log_record_processor(
SimpleLogRecordProcessor(exporter)
)
root_logger = logging.getLogger()
# Add the OTLP handler to the root logger like is done in auto instrumentation.
# This causes logs generated from within SimpleLogRecordProcessor.on_emit (such as the above log in export)
# to be sent back to SimpleLogRecordProcessor.on_emit
handler = LoggingHandler(
level=logging.DEBUG, logger_provider=logger_provider
)
root_logger.addHandler(handler)
propagate_false_logger = logging.getLogger(
"opentelemetry.sdk._logs._internal.export.propagate.false"
)
# This would cause a max recursion depth exceeded error..
try:
with self.assertLogs(propagate_false_logger) as cm:
root_logger.warning("hello!")
assert (
"SimpleLogRecordProcessor.on_emit has entered a recursive loop"
in cm.output[0]
)
finally:
root_logger.removeHandler(handler)

def test_simple_log_record_processor_default_level(self):
exporter = InMemoryLogExporter()
logger_provider = LoggerProvider()
Expand Down Expand Up @@ -383,39 +425,6 @@ def bulk_emit(num_emit):
time.sleep(2)
assert len(exporter.get_finished_logs()) == total_expected_logs

@mark.skipif(
version_info < (3, 10),
reason="assertNoLogs only exists in python 3.10+.",
)
def test_logging_lib_not_invoked_in_batch_log_record_emit(self): # pylint: disable=no-self-use
# See https://github.com/open-telemetry/opentelemetry-python/issues/4261
exporter = Mock()
processor = BatchLogRecordProcessor(exporter)
logger_provider = LoggerProvider(
resource=SDKResource.create(
{
"service.name": "shoppingcart",
"service.instance.id": "instance-12",
}
),
)
logger_provider.add_log_record_processor(processor)
handler = LoggingHandler(
level=logging.INFO, logger_provider=logger_provider
)
sdk_logger = logging.getLogger("opentelemetry.sdk")
# Attach OTLP handler to SDK logger
sdk_logger.addHandler(handler)
# If `emit` calls logging.log then this test will throw a maximum recursion depth exceeded exception and fail.
try:
with self.assertNoLogs(sdk_logger, logging.NOTSET):
processor.on_emit(EMPTY_LOG)
processor.shutdown()
with self.assertNoLogs(sdk_logger, logging.NOTSET):
processor.on_emit(EMPTY_LOG)
finally:
sdk_logger.removeHandler(handler)

def test_args(self):
exporter = InMemoryLogExporter()
log_record_processor = BatchLogRecordProcessor(
Expand Down
Loading