Skip to content

Commit da51419

Browse files
committed
fix: log and trace processor memory leak
Update register_at_fork calls in processors to use weak references Add tests for all processors that us register_at_fork Strong references in register_at_fork persist after the processor objects are deleted. This prevents garbage collection as the reference count for the processor object never drops to 0.
1 parent ac7329c commit da51419

File tree

7 files changed

+70
-3
lines changed

7 files changed

+70
-3
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1717
([#4444](https://github.com/open-telemetry/opentelemetry-python/pull/4444))
1818
- Updated `tracecontext-integration-test` gitref to `d782773b2cf2fa4afd6a80a93b289d8a74ca894d`
1919
([#4448](https://github.com/open-telemetry/opentelemetry-python/pull/4448))
20+
- Fix memory leak in Log & Trace exporter
21+
([#4449](https://github.com/open-telemetry/opentelemetry-python/pull/4449))
2022

2123
## Version 1.30.0/0.51b0 (2025-02-03)
2224

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import os
2020
import sys
2121
import threading
22+
import weakref
2223
from os import environ, linesep
2324
from time import time_ns
2425
from typing import IO, Callable, Deque, List, Optional, Sequence
@@ -215,7 +216,8 @@ def __init__(
215216
self._log_records = [None] * self._max_export_batch_size
216217
self._worker_thread.start()
217218
if hasattr(os, "register_at_fork"):
218-
os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access
219+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
220+
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda
219221
self._pid = os.getpid()
220222

221223
def _at_fork_reinit(self):

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ def __init__(
492492
weak_at_fork = weakref.WeakMethod(self._at_fork_reinit)
493493

494494
os.register_at_fork(
495-
after_in_child=lambda: weak_at_fork()() # pylint: disable=unnecessary-lambda, protected-access
495+
after_in_child=lambda: weak_at_fork()() # pylint: disable=unnecessary-lambda
496496
)
497497
elif self._export_interval_millis <= 0:
498498
raise ValueError(

opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import sys
1919
import threading
2020
import typing
21+
import weakref
2122
from enum import Enum
2223
from os import environ, linesep
2324
from time import time_ns
@@ -200,7 +201,8 @@ def __init__(
200201
self.spans_list = [None] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]]
201202
self.worker_thread.start()
202203
if hasattr(os, "register_at_fork"):
203-
os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access
204+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
205+
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda
204206
self._pid = os.getpid()
205207

206208
def on_start(

opentelemetry-sdk/tests/logs/test_export.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313
# limitations under the License.
1414

1515
# pylint: disable=protected-access
16+
import gc
1617
import logging
1718
import multiprocessing
1819
import os
1920
import time
2021
import unittest
22+
import weakref
2123
from concurrent.futures import ThreadPoolExecutor
2224
from unittest.mock import Mock, patch
2325

@@ -619,6 +621,23 @@ def _target():
619621

620622
log_record_processor.shutdown()
621623

624+
def test_batch_log_record_processor_gc(self):
625+
# Given a BatchLogRecordProcessor
626+
exporter = InMemoryLogExporter()
627+
processor = BatchLogRecordProcessor(exporter)
628+
weak_ref = weakref.ref(processor)
629+
processor.shutdown()
630+
631+
# When the processor is garbage collected
632+
del processor
633+
gc.collect()
634+
635+
# Then the reference to the processor should no longer exist
636+
self.assertIsNone(
637+
weak_ref(),
638+
"The BatchLogRecordProcessor object created by this test wasn't garbage collected",
639+
)
640+
622641

623642
class TestConsoleLogExporter(unittest.TestCase):
624643
def test_export(self): # pylint: disable=no-self-use

opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414

1515
# pylint: disable=protected-access,invalid-name,no-self-use
1616

17+
import gc
1718
import math
19+
import weakref
1820
from logging import WARNING
1921
from time import sleep, time_ns
2022
from typing import Optional, Sequence
@@ -257,3 +259,24 @@ def test_metric_timeout_does_not_kill_worker_thread(self):
257259
sleep(0.1)
258260
self.assertTrue(pmr._daemon_thread.is_alive())
259261
pmr.shutdown()
262+
263+
def test_metric_exporer_gc(self):
264+
# Given a PeriodicExportingMetricReader
265+
exporter = FakeMetricsExporter(
266+
preferred_aggregation={
267+
Counter: LastValueAggregation(),
268+
},
269+
)
270+
processor = PeriodicExportingMetricReader(exporter)
271+
weak_ref = weakref.ref(processor)
272+
processor.shutdown()
273+
274+
# When we garbage collect the reader
275+
del processor
276+
gc.collect()
277+
278+
# Then the reference to the reader should no longer exist
279+
self.assertIsNone(
280+
weak_ref(),
281+
"The PeriodicExportingMetricReader object created by this test wasn't garbage collected",
282+
)

opentelemetry-sdk/tests/trace/export/test_export.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import gc
1516
import multiprocessing
1617
import os
1718
import threading
1819
import time
1920
import unittest
21+
import weakref
2022
from concurrent.futures import ThreadPoolExecutor
2123
from logging import WARNING
2224
from platform import python_implementation, system
@@ -585,6 +587,23 @@ def test_batch_span_processor_parameters(self):
585587
max_export_batch_size=512,
586588
)
587589

590+
def test_batch_span_processor_gc(self):
591+
# Given a BatchSpanProcessor
592+
exporter = MySpanExporter(destination=[])
593+
processor = export.BatchSpanProcessor(exporter)
594+
weak_ref = weakref.ref(processor)
595+
processor.shutdown()
596+
597+
# When the processor is garbage collected
598+
del processor
599+
gc.collect()
600+
601+
# Then the reference to the processor should no longer exist
602+
self.assertIsNone(
603+
weak_ref(),
604+
"The BatchSpanProcessor object created by this test wasn't garbage collected",
605+
)
606+
588607

589608
class TestConsoleSpanExporter(unittest.TestCase):
590609
def test_export(self): # pylint: disable=no-self-use

0 commit comments

Comments
 (0)