Skip to content

Commit 4dc6b3b

Browse files
authored
fix: log and trace processor memory leak (#4449)
1 parent 34b3ac6 commit 4dc6b3b

File tree

7 files changed

+70
-3
lines changed

7 files changed

+70
-3
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
5757
([#4458](https://github.com/open-telemetry/opentelemetry-python/pull/4458))
5858
- pylint-ci updated python version to 3.13
5959
([#4450](https://github.com/open-telemetry/opentelemetry-python/pull/4450))
60+
- Fix memory leak in Log & Trace exporter
61+
([#4449](https://github.com/open-telemetry/opentelemetry-python/pull/4449))
6062

6163
## Version 1.30.0/0.51b0 (2025-02-03)
6264

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import os
2121
import sys
2222
import threading
23+
import weakref
2324
from os import environ, linesep
2425
from time import time_ns
2526
from typing import IO, Callable, Deque, List, Optional, Sequence
@@ -216,7 +217,8 @@ def __init__(
216217
self._log_records = [None] * self._max_export_batch_size
217218
self._worker_thread.start()
218219
if hasattr(os, "register_at_fork"):
219-
os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access
220+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
221+
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda
220222
self._pid = os.getpid()
221223

222224
def _at_fork_reinit(self):

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ def __init__(
501501
weak_at_fork = weakref.WeakMethod(self._at_fork_reinit)
502502

503503
os.register_at_fork(
504-
after_in_child=lambda: weak_at_fork()() # pylint: disable=unnecessary-lambda, protected-access
504+
after_in_child=lambda: weak_at_fork()() # pylint: disable=unnecessary-lambda
505505
)
506506
elif self._export_interval_millis <= 0:
507507
raise ValueError(

opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import sys
2020
import threading
2121
import typing
22+
import weakref
2223
from enum import Enum
2324
from os import environ, linesep
2425
from time import time_ns
@@ -200,7 +201,8 @@ def __init__(
200201
self.spans_list = [None] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]]
201202
self.worker_thread.start()
202203
if hasattr(os, "register_at_fork"):
203-
os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access
204+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
205+
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pylint: disable=unnecessary-lambda
204206
self._pid = os.getpid()
205207

206208
def on_start(

opentelemetry-sdk/tests/logs/test_export.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313
# limitations under the License.
1414

1515
# pylint: disable=protected-access
16+
import gc
1617
import logging
1718
import multiprocessing
1819
import os
1920
import time
2021
import unittest
22+
import weakref
2123
from concurrent.futures import ThreadPoolExecutor
2224
from unittest.mock import Mock, patch
2325

@@ -619,6 +621,23 @@ def _target():
619621

620622
log_record_processor.shutdown()
621623

624+
def test_batch_log_record_processor_gc(self):
625+
# Given a BatchLogRecordProcessor
626+
exporter = InMemoryLogExporter()
627+
processor = BatchLogRecordProcessor(exporter)
628+
weak_ref = weakref.ref(processor)
629+
processor.shutdown()
630+
631+
# When the processor is garbage collected
632+
del processor
633+
gc.collect()
634+
635+
# Then the reference to the processor should no longer exist
636+
self.assertIsNone(
637+
weak_ref(),
638+
"The BatchLogRecordProcessor object created by this test wasn't garbage collected",
639+
)
640+
622641

623642
class TestConsoleLogExporter(unittest.TestCase):
624643
def test_export(self): # pylint: disable=no-self-use

opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414

1515
# pylint: disable=protected-access,invalid-name,no-self-use
1616

17+
import gc
1718
import math
19+
import weakref
1820
from logging import WARNING
1921
from time import sleep, time_ns
2022
from typing import Optional, Sequence
@@ -257,3 +259,24 @@ def test_metric_timeout_does_not_kill_worker_thread(self):
257259
sleep(0.1)
258260
self.assertTrue(pmr._daemon_thread.is_alive())
259261
pmr.shutdown()
262+
263+
def test_metric_exporer_gc(self):
264+
# Given a PeriodicExportingMetricReader
265+
exporter = FakeMetricsExporter(
266+
preferred_aggregation={
267+
Counter: LastValueAggregation(),
268+
},
269+
)
270+
processor = PeriodicExportingMetricReader(exporter)
271+
weak_ref = weakref.ref(processor)
272+
processor.shutdown()
273+
274+
# When we garbage collect the reader
275+
del processor
276+
gc.collect()
277+
278+
# Then the reference to the reader should no longer exist
279+
self.assertIsNone(
280+
weak_ref(),
281+
"The PeriodicExportingMetricReader object created by this test wasn't garbage collected",
282+
)

opentelemetry-sdk/tests/trace/export/test_export.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import gc
1516
import multiprocessing
1617
import os
1718
import threading
1819
import time
1920
import unittest
21+
import weakref
2022
from concurrent.futures import ThreadPoolExecutor
2123
from logging import WARNING
2224
from platform import python_implementation, system
@@ -585,6 +587,23 @@ def test_batch_span_processor_parameters(self):
585587
max_export_batch_size=512,
586588
)
587589

590+
def test_batch_span_processor_gc(self):
591+
# Given a BatchSpanProcessor
592+
exporter = MySpanExporter(destination=[])
593+
processor = export.BatchSpanProcessor(exporter)
594+
weak_ref = weakref.ref(processor)
595+
processor.shutdown()
596+
597+
# When the processor is garbage collected
598+
del processor
599+
gc.collect()
600+
601+
# Then the reference to the processor should no longer exist
602+
self.assertIsNone(
603+
weak_ref(),
604+
"The BatchSpanProcessor object created by this test wasn't garbage collected",
605+
)
606+
588607

589608
class TestConsoleSpanExporter(unittest.TestCase):
590609
def test_export(self): # pylint: disable=no-self-use

0 commit comments

Comments
 (0)