Skip to content

Commit ade1058

Browse files
DylanRussellxrmx
andauthored
Filter duplicate logs out of some logger's logs that might otherwise endlessly log (#4695)
* Add DuplicateFilter to http/grpc exporter * Precommit and Changelog * Fix lint issue. Add comment * Update CHANGELOG.md Co-authored-by: Riccardo Magliocchetti <[email protected]> * Update exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal/__init__.py Co-authored-by: Riccardo Magliocchetti <[email protected]> * Add filter to BatchProcessor class * Run precommit and add DuplicateFilter to another place.. * Move DuplicateFilter to SDK * improve changelog entry * Precommit and comment changes * Fix broken test * Precommit * Fix lint issue * precommit * test repro of issue * add print statements * undo debug stuff * Update time to 20s --------- Co-authored-by: Riccardo Magliocchetti <[email protected]>
1 parent 05343a5 commit ade1058

File tree

6 files changed

+52
-3
lines changed

6 files changed

+52
-3
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1212
1313
## Unreleased
1414

15+
- Filter duplicate logs out of some internal `logger`'s logs on the export logs path that might otherwise endlessly log or cause a recursion depth exceeded issue in cases where logging itself results in an exception.
16+
([#4695](https://github.com/open-telemetry/opentelemetry-python/pull/4695)).
1517
- docs: linked the examples with their github source code location and added Prometheus example
1618
([#4728](https://github.com/open-telemetry/opentelemetry-python/pull/4728))
1719

exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
KeyValue,
6060
)
6161
from opentelemetry.proto.resource.v1.resource_pb2 import Resource # noqa: F401
62+
from opentelemetry.sdk._shared_internal import DuplicateFilter
6263
from opentelemetry.sdk.environment_variables import (
6364
OTEL_EXPORTER_OTLP_CERTIFICATE,
6465
OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE,
@@ -87,6 +88,8 @@
8788
)
8889
_MAX_RETRYS = 6
8990
logger = getLogger(__name__)
91+
# This prevents logs generated when a log fails to be written to generate another log which fails to be written etc. etc.
92+
logger.addFilter(DuplicateFilter())
9093
SDKDataT = TypeVar("SDKDataT")
9194
ResourceDataT = TypeVar("ResourceDataT")
9295
TypingResourceT = TypeVar("TypingResourceT")

exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
LogExporter,
3939
LogExportResult,
4040
)
41+
from opentelemetry.sdk._shared_internal import DuplicateFilter
4142
from opentelemetry.sdk.environment_variables import (
4243
OTEL_EXPORTER_OTLP_CERTIFICATE,
4344
OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE,
@@ -57,6 +58,8 @@
5758
from opentelemetry.util.re import parse_env_headers
5859

5960
_logger = logging.getLogger(__name__)
61+
# This prevents logs generated when a log fails to be written to generate another log which fails to be written etc. etc.
62+
_logger.addFilter(DuplicateFilter())
6063

6164

6265
DEFAULT_COMPRESSION = Compression.NoCompression

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
set_value,
2828
)
2929
from opentelemetry.sdk._logs import LogData, LogRecord, LogRecordProcessor
30-
from opentelemetry.sdk._shared_internal import BatchProcessor
30+
from opentelemetry.sdk._shared_internal import BatchProcessor, DuplicateFilter
3131
from opentelemetry.sdk.environment_variables import (
3232
OTEL_BLRP_EXPORT_TIMEOUT,
3333
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
@@ -43,6 +43,7 @@
4343
"Unable to parse value for %s as integer. Defaulting to %s."
4444
)
4545
_logger = logging.getLogger(__name__)
46+
_logger.addFilter(DuplicateFilter())
4647

4748

4849
class LogExportResult(enum.Enum):

opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,29 @@
3939
from opentelemetry.util._once import Once
4040

4141

42+
class DuplicateFilter(logging.Filter):
43+
"""Filter that can be applied to internal `logger`'s.
44+
45+
Currently applied to `logger`s on the export logs path that could otherwise cause endless logging of errors or a
46+
recursion depth exceeded issue in cases where logging itself results in an exception."""
47+
48+
def filter(self, record):
49+
current_log = (
50+
record.module,
51+
record.levelno,
52+
record.msg,
53+
# We need to pick a time longer than the OTLP LogExporter timeout
54+
# which defaults to 10 seconds, but not pick something so long that
55+
# it filters out useful logs.
56+
time.time() // 20,
57+
)
58+
if current_log != getattr(self, "last_log", None):
59+
self.last_log = current_log # pylint: disable=attribute-defined-outside-init
60+
return True
61+
# False means python's `logging` module will no longer process this log.
62+
return False
63+
64+
4265
class BatchExportStrategy(enum.Enum):
4366
EXPORT_ALL = 0
4467
EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD = 1
@@ -89,6 +112,7 @@ def __init__(
89112
daemon=True,
90113
)
91114
self._logger = logging.getLogger(__name__)
115+
self._logger.addFilter(DuplicateFilter())
92116
self._exporting = exporting
93117

94118
self._shutdown = False

opentelemetry-sdk/tests/shared_internal/test_batch_processor.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
# pylint: disable=protected-access
1616
import gc
17+
import logging
1718
import multiprocessing
1819
import os
1920
import threading
@@ -33,6 +34,9 @@
3334
from opentelemetry.sdk._logs.export import (
3435
BatchLogRecordProcessor,
3536
)
37+
from opentelemetry.sdk._shared_internal import (
38+
DuplicateFilter,
39+
)
3640
from opentelemetry.sdk.trace import ReadableSpan
3741
from opentelemetry.sdk.trace.export import BatchSpanProcessor
3842
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
@@ -56,6 +60,7 @@ def __init__(self, export_sleep: int):
5660
self.num_export_calls = 0
5761
self.export_sleep = export_sleep
5862
self._shutdown = False
63+
self.sleep_interrupted = False
5964
self.export_sleep_event = threading.Event()
6065

6166
def export(self, _: list[Any]):
@@ -65,6 +70,7 @@ def export(self, _: list[Any]):
6570

6671
sleep_interrupted = self.export_sleep_event.wait(self.export_sleep)
6772
if sleep_interrupted:
73+
self.sleep_interrupted = True
6874
raise ValueError("Did not get to finish !")
6975

7076
def shutdown(self):
@@ -219,7 +225,7 @@ def test_record_processor_is_garbage_collected(
219225
assert weak_ref() is None
220226

221227
def test_shutdown_allows_1_export_to_finish(
222-
self, batch_processor_class, telemetry, caplog
228+
self, batch_processor_class, telemetry
223229
):
224230
# This exporter throws an exception if it's export sleep cannot finish.
225231
exporter = MockExporterForTesting(export_sleep=2)
@@ -244,5 +250,15 @@ def test_shutdown_allows_1_export_to_finish(
244250
time.sleep(0.1)
245251
assert processor._batch_processor._worker_thread.is_alive() is False
246252
# Expect the second call to be interrupted by shutdown, and the third call to never be made.
247-
assert "Exception while exporting" in caplog.text
253+
assert exporter.sleep_interrupted is True
248254
assert 2 == exporter.num_export_calls
255+
256+
257+
class TestCommonFuncs(unittest.TestCase):
258+
def test_duplicate_logs_filter_works(self):
259+
test_logger = logging.getLogger("testLogger")
260+
test_logger.addFilter(DuplicateFilter())
261+
with self.assertLogs("testLogger") as cm:
262+
test_logger.info("message")
263+
test_logger.info("message")
264+
self.assertEqual(len(cm.output), 1)

0 commit comments

Comments
 (0)