Skip to content

Commit 610234e

Browse files
committed
fix: log and trace processor memory leak
Update register_at_fork calls in processors to use weak references Add tests for all processors that us register_at_fork Strong references in register_at_fork persist after the processor objects are deleted. This prevents garbage collection as the reference count for the processor object never drops to 0.
1 parent ac7329c commit 610234e

File tree

7 files changed

+73
-3
lines changed

7 files changed

+73
-3
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1717
([#4444](https://github.com/open-telemetry/opentelemetry-python/pull/4444))
1818
- Updated `tracecontext-integration-test` gitref to `d782773b2cf2fa4afd6a80a93b289d8a74ca894d`
1919
([#4448](https://github.com/open-telemetry/opentelemetry-python/pull/4448))
20+
- Fix memory leak in Log & Trace exporter
21+
([#4449][https://github.com/open-telemetry/opentelemetry-python/pull/4449])
2022

2123
## Version 1.30.0/0.51b0 (2025-02-03)
2224

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import os
2020
import sys
2121
import threading
22+
import weakref
2223
from os import environ, linesep
2324
from time import time_ns
2425
from typing import IO, Callable, Deque, List, Optional, Sequence
@@ -215,7 +216,8 @@ def __init__(
215216
self._log_records = [None] * self._max_export_batch_size
216217
self._worker_thread.start()
217218
if hasattr(os, "register_at_fork"):
218-
os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access
219+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
220+
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda
219221
self._pid = os.getpid()
220222

221223
def _at_fork_reinit(self):

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ def __init__(
492492
weak_at_fork = weakref.WeakMethod(self._at_fork_reinit)
493493

494494
os.register_at_fork(
495-
after_in_child=lambda: weak_at_fork()() # pylint: disable=unnecessary-lambda, protected-access
495+
after_in_child=lambda: weak_at_fork()() # pylint: disable=unnecessary-lambda
496496
)
497497
elif self._export_interval_millis <= 0:
498498
raise ValueError(

opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import sys
1919
import threading
2020
import typing
21+
import weakref
2122
from enum import Enum
2223
from os import environ, linesep
2324
from time import time_ns
@@ -200,7 +201,8 @@ def __init__(
200201
self.spans_list = [None] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]]
201202
self.worker_thread.start()
202203
if hasattr(os, "register_at_fork"):
203-
os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access
204+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
205+
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda
204206
self._pid = os.getpid()
205207

206208
def on_start(

opentelemetry-sdk/tests/logs/test_export.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313
# limitations under the License.
1414

1515
# pylint: disable=protected-access
16+
import gc
1617
import logging
1718
import multiprocessing
1819
import os
1920
import time
2021
import unittest
22+
import weakref
2123
from concurrent.futures import ThreadPoolExecutor
2224
from unittest.mock import Mock, patch
2325

@@ -619,6 +621,24 @@ def _target():
619621

620622
log_record_processor.shutdown()
621623

624+
def test_batch_log_record_processor_gc(self):
625+
exporter = InMemoryLogExporter()
626+
processor = BatchLogRecordProcessor(exporter)
627+
628+
weak_ref = weakref.ref(processor)
629+
630+
processor.shutdown()
631+
del processor
632+
633+
gc.collect()
634+
635+
processor_ref = weak_ref()
636+
637+
self.assertIsNone(
638+
processor_ref,
639+
"The specific BatchLogRecordProcessor object wasn't garbage collected",
640+
)
641+
622642

623643
class TestConsoleLogExporter(unittest.TestCase):
624644
def test_export(self): # pylint: disable=no-self-use

opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414

1515
# pylint: disable=protected-access,invalid-name,no-self-use
1616

17+
import gc
1718
import math
19+
import weakref
1820
from logging import WARNING
1921
from time import sleep, time_ns
2022
from typing import Optional, Sequence
@@ -257,3 +259,25 @@ def test_metric_timeout_does_not_kill_worker_thread(self):
257259
sleep(0.1)
258260
self.assertTrue(pmr._daemon_thread.is_alive())
259261
pmr.shutdown()
262+
263+
def test_metric_exporer_gc(self):
264+
exporter = FakeMetricsExporter(
265+
preferred_aggregation={
266+
Counter: LastValueAggregation(),
267+
},
268+
)
269+
processor = PeriodicExportingMetricReader(exporter)
270+
271+
weak_ref = weakref.ref(processor)
272+
273+
processor.shutdown()
274+
del processor
275+
276+
gc.collect()
277+
278+
processor_ref = weak_ref()
279+
280+
self.assertIsNone(
281+
processor_ref,
282+
"The BatchSpanProcessor object created by this test wasn't garbage collected",
283+
)

opentelemetry-sdk/tests/trace/export/test_export.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import gc
1516
import multiprocessing
1617
import os
1718
import threading
1819
import time
1920
import unittest
21+
import weakref
2022
from concurrent.futures import ThreadPoolExecutor
2123
from logging import WARNING
2224
from platform import python_implementation, system
@@ -585,6 +587,24 @@ def test_batch_span_processor_parameters(self):
585587
max_export_batch_size=512,
586588
)
587589

590+
def test_batch_span_processor_gc(self):
591+
exporter = MySpanExporter(destination=[])
592+
processor = export.BatchSpanProcessor(exporter)
593+
594+
weak_ref = weakref.ref(processor)
595+
596+
processor.shutdown()
597+
del processor
598+
599+
gc.collect()
600+
601+
processor_ref = weak_ref()
602+
603+
self.assertIsNone(
604+
processor_ref,
605+
"The BatchSpanProcessor object created by this test wasn't garbage collected",
606+
)
607+
588608

589609
class TestConsoleSpanExporter(unittest.TestCase):
590610
def test_export(self): # pylint: disable=no-self-use

0 commit comments

Comments
 (0)