diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md b/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md index c3d51f9fef..3c1c9beb97 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md @@ -8,4 +8,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased - Added span support for genAI langchain llm invocation. - ([#3665](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3665)) \ No newline at end of file + ([#3665](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3665)) +- Use weak reference in langchain instrumentation span map. + ([#3735](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3735)) \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/pyproject.toml b/instrumentation-genai/opentelemetry-instrumentation-langchain/pyproject.toml index 4f3e88115b..c1f217b68a 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/pyproject.toml +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/pyproject.toml @@ -27,7 +27,8 @@ classifiers = [ dependencies = [ "opentelemetry-api >= 1.31.0", "opentelemetry-instrumentation ~= 0.57b0", - "opentelemetry-semantic-conventions ~= 0.57b0" + "opentelemetry-semantic-conventions ~= 0.57b0", + "cachetools >= 5.2.0", ] [project.optional-dependencies] diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/span_manager.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/span_manager.py index 636bfc3bc3..f02f828f3b 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/span_manager.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/span_manager.py @@ -13,9 +13,11 @@ # limitations under the License. from dataclasses import dataclass, field +from threading import RLock from typing import Dict, List, Optional from uuid import UUID +from cachetools import TTLCache from opentelemetry.semconv._incubating.attributes import ( gen_ai_attributes as GenAI, ) @@ -40,10 +42,11 @@ def __init__( tracer: Tracer, ) -> None: self._tracer = tracer + self._lock = RLock() # Map from run_id -> _SpanState, to keep track of spans and parent/child relationships - # TODO: Use weak references or a TTL cache to avoid memory leaks in long-running processes. See #3735 - self.spans: Dict[UUID, _SpanState] = {} + # Using a TTL cache to avoid memory leaks in long-running processes where end_span might not be called. + self.spans: TTLCache[UUID, _SpanState] = TTLCache(maxsize=1024, ttl=3600) def _create_span( self, @@ -52,23 +55,24 @@ def _create_span( span_name: str, kind: SpanKind = SpanKind.INTERNAL, ) -> Span: - if parent_run_id is not None and parent_run_id in self.spans: - parent_state = self.spans[parent_run_id] - parent_span = parent_state.span - ctx = set_span_in_context(parent_span) - span = self._tracer.start_span( - name=span_name, kind=kind, context=ctx - ) - parent_state.children.append(run_id) - else: - # top-level or missing parent - span = self._tracer.start_span(name=span_name, kind=kind) - set_span_in_context(span) - - span_state = _SpanState(span=span) - self.spans[run_id] = span_state - - return span + with self._lock: + if parent_run_id is not None and parent_run_id in self.spans: + parent_state = self.spans[parent_run_id] + parent_span = parent_state.span + ctx = set_span_in_context(parent_span) + span = self._tracer.start_span( + name=span_name, kind=kind, context=ctx + ) + parent_state.children.append(run_id) + else: + # top-level or missing parent + span = self._tracer.start_span(name=span_name, kind=kind) + set_span_in_context(span) + + span_state = _SpanState(span=span) + self.spans[run_id] = span_state + + return span def create_chat_span( self, @@ -92,18 +96,25 @@ def create_chat_span( return span def end_span(self, run_id: UUID) -> None: - state = self.spans[run_id] - for child_id in state.children: - child_state = self.spans.get(child_id) - if child_state: - child_state.span.end() - del self.spans[child_id] - state.span.end() - del self.spans[run_id] + with self._lock: + state = self.spans.get(run_id) + if not state: + return + # End children first (make a copy to avoid modification during iteration) + for child_id in list(state.children): + child_state = self.spans.get(child_id) + if child_state: + child_state.span.end() + # Use pop to avoid KeyError if already expired + self.spans.pop(child_id, None) + state.span.end() + # Use pop to avoid KeyError if already expired + self.spans.pop(run_id, None) def get_span(self, run_id: UUID) -> Optional[Span]: - state = self.spans.get(run_id) - return state.span if state else None + with self._lock: + state = self.spans.get(run_id) + return state.span if state else None def handle_error(self, error: BaseException, run_id: UUID): span = self.get_span(run_id) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_span_manager.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_span_manager.py index 69de5a7146..131e0409e6 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_span_manager.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_span_manager.py @@ -1,5 +1,6 @@ import unittest.mock import uuid +import time import pytest @@ -98,3 +99,24 @@ def test_end_span(self, handler): child_mock_span.end.assert_called_once() assert run_id not in handler.spans assert child_run_id not in handler.spans + + def test_ttl_cache_expires_spans(self, tracer): + # Arrange - Create handler with short TTL + from cachetools import TTLCache + + handler = _SpanManager(tracer=tracer) + # Replace the cache with one that has a very short TTL + handler.spans = TTLCache(maxsize=1024, ttl=0.05) + + run_id = uuid.uuid4() + mock_span = unittest.mock.Mock(spec=Span) + handler.spans[run_id] = _SpanState(span=mock_span) + + # Assert - Span exists immediately + assert run_id in handler.spans + + # Act - Wait for TTL to expire + time.sleep(0.1) + + # Assert - Span is automatically removed by TTL + assert run_id not in handler.spans