Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased

- Added span support for genAI langchain llm invocation.
([#3665](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3665))
([#3665](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3665))
- Use weak reference in langchain instrumentation span map.
([#3735](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3735))
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ classifiers = [
dependencies = [
"opentelemetry-api >= 1.31.0",
"opentelemetry-instrumentation ~= 0.57b0",
"opentelemetry-semantic-conventions ~= 0.57b0"
"opentelemetry-semantic-conventions ~= 0.57b0",
"cachetools >= 5.2.0",
]

[project.optional-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
# limitations under the License.

from dataclasses import dataclass, field
from threading import RLock
from typing import Dict, List, Optional
from uuid import UUID

from cachetools import TTLCache
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
)
Expand All @@ -40,10 +42,11 @@ def __init__(
tracer: Tracer,
) -> None:
self._tracer = tracer
self._lock = RLock()

# Map from run_id -> _SpanState, to keep track of spans and parent/child relationships
# TODO: Use weak references or a TTL cache to avoid memory leaks in long-running processes. See #3735
self.spans: Dict[UUID, _SpanState] = {}
# Using a TTL cache to avoid memory leaks in long-running processes where end_span might not be called.
self.spans: TTLCache[UUID, _SpanState] = TTLCache(maxsize=1024, ttl=3600)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and experimented with using a WeakValueDictionary, but it would be a much larger refactor as we
store don't hold a strong ref to _SpanState outside of span_manager, if I am correct. Let me know your thoughts please, thank you.


def _create_span(
self,
Expand All @@ -52,23 +55,24 @@ def _create_span(
span_name: str,
kind: SpanKind = SpanKind.INTERNAL,
) -> Span:
if parent_run_id is not None and parent_run_id in self.spans:
parent_state = self.spans[parent_run_id]
parent_span = parent_state.span
ctx = set_span_in_context(parent_span)
span = self._tracer.start_span(
name=span_name, kind=kind, context=ctx
)
parent_state.children.append(run_id)
else:
# top-level or missing parent
span = self._tracer.start_span(name=span_name, kind=kind)
set_span_in_context(span)

span_state = _SpanState(span=span)
self.spans[run_id] = span_state

return span
with self._lock:
if parent_run_id is not None and parent_run_id in self.spans:
parent_state = self.spans[parent_run_id]
parent_span = parent_state.span
ctx = set_span_in_context(parent_span)
span = self._tracer.start_span(
name=span_name, kind=kind, context=ctx
)
parent_state.children.append(run_id)
else:
# top-level or missing parent
span = self._tracer.start_span(name=span_name, kind=kind)
set_span_in_context(span)

span_state = _SpanState(span=span)
self.spans[run_id] = span_state

return span

def create_chat_span(
self,
Expand All @@ -92,18 +96,25 @@ def create_chat_span(
return span

def end_span(self, run_id: UUID) -> None:
state = self.spans[run_id]
for child_id in state.children:
child_state = self.spans.get(child_id)
if child_state:
child_state.span.end()
del self.spans[child_id]
state.span.end()
del self.spans[run_id]
with self._lock:
state = self.spans.get(run_id)
if not state:
return
# End children first (make a copy to avoid modification during iteration)
for child_id in list(state.children):
child_state = self.spans.get(child_id)
if child_state:
child_state.span.end()
# Use pop to avoid KeyError if already expired
self.spans.pop(child_id, None)
state.span.end()
# Use pop to avoid KeyError if already expired
self.spans.pop(run_id, None)

def get_span(self, run_id: UUID) -> Optional[Span]:
state = self.spans.get(run_id)
return state.span if state else None
with self._lock:
state = self.spans.get(run_id)
return state.span if state else None

def handle_error(self, error: BaseException, run_id: UUID):
span = self.get_span(run_id)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import unittest.mock
import uuid
import time

import pytest

Expand Down Expand Up @@ -98,3 +99,24 @@ def test_end_span(self, handler):
child_mock_span.end.assert_called_once()
assert run_id not in handler.spans
assert child_run_id not in handler.spans

def test_ttl_cache_expires_spans(self, tracer):
# Arrange - Create handler with short TTL
from cachetools import TTLCache

handler = _SpanManager(tracer=tracer)
# Replace the cache with one that has a very short TTL
handler.spans = TTLCache(maxsize=1024, ttl=0.05)

run_id = uuid.uuid4()
mock_span = unittest.mock.Mock(spec=Span)
handler.spans[run_id] = _SpanState(span=mock_span)

# Assert - Span exists immediately
assert run_id in handler.spans

# Act - Wait for TTL to expire
time.sleep(0.1)

# Assert - Span is automatically removed by TTL
assert run_id not in handler.spans