Skip to content

Commit 495f087

Browse files
committed
Reintroduce weakref, I accidentlly undid that change in my last PR
1 parent ff1d626 commit 495f087

File tree

3 files changed

+23
-10
lines changed

3 files changed

+23
-10
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/export/__init__.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,6 @@ class LogExportResult(enum.Enum):
5050
FAILURE = 1
5151

5252

53-
class BatchLogExportStrategy(enum.Enum):
54-
EXPORT_ALL = 0
55-
EXPORT_WHILE_BATCH_EXCEEDS_THRESHOLD = 1
56-
EXPORT_AT_LEAST_ONE_BATCH = 2
57-
58-
5953
class LogExporter(abc.ABC):
6054
"""Interface for exporting logs.
6155
Interface to be implemented by services that want to export logs received

opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import logging
2020
import os
2121
import threading
22+
import weakref
2223
from abc import ABC
2324
from typing import (
2425
TYPE_CHECKING,
@@ -38,7 +39,7 @@
3839
if TYPE_CHECKING:
3940
from opentelemetry.sdk._logs import LogData
4041
from opentelemetry.sdk._logs.export import LogExporter
41-
from opentelemetry.sdk.trace import Span
42+
from opentelemetry.sdk.trace import ReadableSpan
4243
from opentelemetry.sdk.trace.export import SpanExporter
4344

4445

@@ -49,7 +50,7 @@ class BatchExportStrategy(enum.Enum):
4950

5051

5152
class BatchProcessor(ABC):
52-
_queue: Deque["Union[LogData, Span]"]
53+
_queue: Deque["Union[LogData, ReadableSpan]"]
5354

5455
def __init__(
5556
self,
@@ -84,7 +85,8 @@ def __init__(
8485
self._worker_awaken = threading.Event()
8586
self._worker_thread.start()
8687
if hasattr(os, "register_at_fork"):
87-
os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access
88+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
89+
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pyright: ignore[reportOptionalCall] pylint: disable=unnecessary-lambda
8890
self._pid = os.getpid()
8991

9092
def _should_export_batch(
@@ -156,7 +158,7 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None:
156158
)
157159
detach(token)
158160

159-
def emit(self, data: "Union[LogData, Span]") -> None:
161+
def emit(self, data: "Union[LogData, ReadableSpan]") -> None:
160162
if self._shutdown:
161163
self._logger.info("Shutdown called, ignoring %s.", self._exporting)
162164
return

opentelemetry-sdk/tests/shared_internal/test_batch_processor.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
# limitations under the License.
1414

1515
# pylint: disable=protected-access
16+
import gc
1617
import multiprocessing
1718
import os
1819
import time
1920
import unittest
21+
import weakref
2022
from concurrent.futures import ThreadPoolExecutor
2123
from platform import system
2224
from sys import version_info
@@ -194,3 +196,18 @@ def child(conn):
194196
batch_processor.force_flush()
195197
# Single export for the telemetry we emitted at the start of the test.
196198
assert exporter.export.call_count == 1
199+
200+
def test_record_processor_is_garbage_collected(
201+
self, batch_processor_class, telemetry
202+
):
203+
exporter = Mock()
204+
processor = batch_processor_class(exporter)
205+
weak_ref = weakref.ref(processor)
206+
processor.shutdown()
207+
208+
# When the processor is garbage collected
209+
del processor
210+
gc.collect()
211+
212+
# Then the reference to the processor should no longer exist
213+
assert weak_ref() is None

0 commit comments

Comments
 (0)