Skip to content

Filter duplicate logs out of some logger's logs that might otherwise endlessly log #4695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Filter duplicate logs out of some internal `logger`'s logs on the export logs path that might otherwise endlessly log or cause a recursion depth exceeded issue in cases where logging itself results in an exception.
([#4695](https://github.com/open-telemetry/opentelemetry-python/pull/4695)).

## Version 1.36.0/0.57b0 (2025-07-29)

- Add missing Prometheus exporter documentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
KeyValue,
)
from opentelemetry.proto.resource.v1.resource_pb2 import Resource # noqa: F401
from opentelemetry.sdk._shared_internal import DuplicateFilter
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_CERTIFICATE,
OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE,
Expand Down Expand Up @@ -87,6 +88,8 @@
)
_MAX_RETRYS = 6
logger = getLogger(__name__)
# This prevents logs generated when a log fails to be written to generate another log which fails to be written etc. etc.
logger.addFilter(DuplicateFilter())
SDKDataT = TypeVar("SDKDataT")
ResourceDataT = TypeVar("ResourceDataT")
TypingResourceT = TypeVar("TypingResourceT")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
LogExporter,
LogExportResult,
)
from opentelemetry.sdk._shared_internal import DuplicateFilter
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_CERTIFICATE,
OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE,
Expand All @@ -57,6 +58,8 @@
from opentelemetry.util.re import parse_env_headers

_logger = logging.getLogger(__name__)
# This prevents logs generated when a log fails to be written to generate another log which fails to be written etc. etc.
_logger.addFilter(DuplicateFilter())


DEFAULT_COMPRESSION = Compression.NoCompression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
set_value,
)
from opentelemetry.sdk._logs import LogData, LogRecord, LogRecordProcessor
from opentelemetry.sdk._shared_internal import BatchProcessor
from opentelemetry.sdk._shared_internal import BatchProcessor, DuplicateFilter
from opentelemetry.sdk.environment_variables import (
OTEL_BLRP_EXPORT_TIMEOUT,
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
Expand All @@ -43,6 +43,7 @@
"Unable to parse value for %s as integer. Defaulting to %s."
)
_logger = logging.getLogger(__name__)
_logger.addFilter(DuplicateFilter())


class LogExportResult(enum.Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,26 @@
from opentelemetry.util._once import Once


class DuplicateFilter(logging.Filter):
"""Filter that can be applied to internal `logger`'s.

Currently applied to `logger`s on the export logs path that could otherwise cause endless logging of errors or a
recursion depth exceeded issue in cases where logging itself results in an exception."""

def filter(self, record):
current_log = (
record.module,
record.levelno,
record.msg,
time.time() // 60,
)
if current_log != getattr(self, "last_log", None):
self.last_log = current_log # pylint: disable=attribute-defined-outside-init
return True
# False means python's `logging` module will no longer process this log.
return False


class BatchExportStrategy(enum.Enum):
EXPORT_ALL = 0
EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD = 1
Expand Down Expand Up @@ -89,6 +109,7 @@ def __init__(
daemon=True,
)
self._logger = logging.getLogger(__name__)
self._logger.addFilter(DuplicateFilter())
self._exporting = exporting

self._shutdown = False
Expand Down
20 changes: 18 additions & 2 deletions opentelemetry-sdk/tests/shared_internal/test_batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

# pylint: disable=protected-access
import gc
import logging
import multiprocessing
import os
import threading
Expand All @@ -33,6 +34,9 @@
from opentelemetry.sdk._logs.export import (
BatchLogRecordProcessor,
)
from opentelemetry.sdk._shared_internal import (
DuplicateFilter,
)
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
Expand All @@ -56,6 +60,7 @@ def __init__(self, export_sleep: int):
self.num_export_calls = 0
self.export_sleep = export_sleep
self._shutdown = False
self.sleep_interrupted = False
self.export_sleep_event = threading.Event()

def export(self, _: list[Any]):
Expand All @@ -65,6 +70,7 @@ def export(self, _: list[Any]):

sleep_interrupted = self.export_sleep_event.wait(self.export_sleep)
if sleep_interrupted:
self.sleep_interrupted = True
raise ValueError("Did not get to finish !")

def shutdown(self):
Expand Down Expand Up @@ -219,7 +225,7 @@ def test_record_processor_is_garbage_collected(
assert weak_ref() is None

def test_shutdown_allows_1_export_to_finish(
self, batch_processor_class, telemetry, caplog
self, batch_processor_class, telemetry
):
# This exporter throws an exception if it's export sleep cannot finish.
exporter = MockExporterForTesting(export_sleep=2)
Expand All @@ -244,5 +250,15 @@ def test_shutdown_allows_1_export_to_finish(
time.sleep(0.1)
assert processor._batch_processor._worker_thread.is_alive() is False
# Expect the second call to be interrupted by shutdown, and the third call to never be made.
assert "Exception while exporting" in caplog.text
assert exporter.sleep_interrupted is True
assert 2 == exporter.num_export_calls


class TestCommonFuncs(unittest.TestCase):
def test_duplicate_logs_filter_works(self):
test_logger = logging.getLogger("testLogger")
test_logger.addFilter(DuplicateFilter())
with self.assertLogs("testLogger") as cm:
test_logger.info("message")
test_logger.info("message")
self.assertEqual(len(cm.output), 1)
Loading