Skip to content

Commit c7e309c

Browse files
authored
fix(langchain): context detach exception (#3255)
1 parent 9603e1b commit c7e309c

File tree

2 files changed

+146
-2
lines changed

2 files changed

+146
-2
lines changed

packages/opentelemetry-instrumentation-langchain/opentelemetry/instrumentation/langchain/callback_handler.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,12 @@ def _end_span(self, span: Span, run_id: UUID) -> None:
188188
token = self.spans[run_id].token
189189
if token:
190190
try:
191-
context_api.detach(token)
192-
except ValueError:
191+
# Use the runtime context directly to avoid logging from context_api.detach()
192+
from opentelemetry.context import _RUNTIME_CONTEXT
193+
_RUNTIME_CONTEXT.detach(token)
194+
except (ValueError, RuntimeError, Exception):
193195
# Context detach can fail in async scenarios when tokens are created in different contexts
196+
# This includes ValueError, RuntimeError, and other context-related exceptions
194197
# This is expected behavior and doesn't affect the correct span hierarchy
195198
pass
196199

packages/opentelemetry-instrumentation-langchain/tests/test_langgraph.py

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,3 +338,144 @@ async def run_test_agent():
338338
assert http_call_task_span.parent.span_id == workflow_span.context.span_id
339339
assert otel_span_task_span.parent.span_id == workflow_span.context.span_id
340340
assert workflow_span.parent.span_id == root_span.context.span_id
341+
342+
343+
def test_context_detachment_error_handling(
344+
instrument_legacy, span_exporter, tracer_provider, caplog
345+
):
346+
"""
347+
Test that context detachment errors are handled properly without logging.
348+
349+
This test specifically validates the fix for the issue where OpenTelemetry
350+
context detachment failures in async scenarios would cause error logging:
351+
'ERROR:opentelemetry.context:Failed to detach context'
352+
353+
The test creates conditions that trigger context tokens to be created in
354+
one context and detached in another, which previously caused ValueError
355+
exceptions to be logged by OpenTelemetry's context_api.detach().
356+
"""
357+
import asyncio
358+
import logging
359+
from opentelemetry import trace
360+
from langgraph.graph import END, START, StateGraph
361+
362+
trace.set_tracer_provider(tracer_provider)
363+
tracer = trace.get_tracer(__name__)
364+
365+
with caplog.at_level(logging.ERROR):
366+
367+
class AsyncTestState(TypedDict):
368+
counter: int
369+
result: str
370+
371+
async def concurrent_span_node(state: AsyncTestState) -> dict:
372+
"""Node that creates spans in async context, triggering potential context issues."""
373+
with tracer.start_as_current_span("concurrent_async_span") as span:
374+
span.set_attribute("node.type", "concurrent_async")
375+
span.set_attribute("input.counter", state["counter"])
376+
377+
await asyncio.sleep(0.001)
378+
379+
with tracer.start_as_current_span("nested_span") as nested_span:
380+
nested_span.set_attribute("nested.work", True)
381+
await asyncio.sleep(0.001)
382+
383+
result = f"processed_{state['counter']}"
384+
span.set_attribute("output.result", result)
385+
386+
return {"counter": state["counter"] + 1, "result": result}
387+
388+
async def parallel_processing_node(state: AsyncTestState) -> dict:
389+
"""Node that processes multiple tasks in parallel, stressing context management."""
390+
391+
async def parallel_task(task_id: int):
392+
with tracer.start_as_current_span(f"parallel_task_{task_id}") as span:
393+
span.set_attribute("task.id", task_id)
394+
await asyncio.sleep(0.001)
395+
return f"task_{task_id}_done"
396+
397+
tasks = [parallel_task(i) for i in range(5)]
398+
parallel_results = await asyncio.gather(*tasks)
399+
combined_result = (
400+
f"{state['result']} + parallel_results: {','.join(parallel_results)}"
401+
)
402+
return {"counter": state["counter"], "result": combined_result}
403+
404+
def build_context_stress_graph():
405+
"""Build a graph designed to stress context management."""
406+
builder = StateGraph(AsyncTestState)
407+
builder.add_node("concurrent", concurrent_span_node)
408+
builder.add_node("parallel", parallel_processing_node)
409+
410+
builder.add_edge(START, "concurrent")
411+
builder.add_edge("concurrent", "parallel")
412+
builder.add_edge("parallel", END)
413+
414+
return builder.compile()
415+
416+
async def run_concurrent_executions():
417+
"""Run multiple concurrent graph executions to trigger context issues."""
418+
graph = build_context_stress_graph()
419+
420+
tasks = []
421+
for i in range(10):
422+
task = graph.ainvoke({"counter": i, "result": ""})
423+
tasks.append(task)
424+
425+
results = await asyncio.gather(*tasks, return_exceptions=True)
426+
return results
427+
428+
results = asyncio.run(run_concurrent_executions())
429+
430+
assert len(results) == 10
431+
for i, result in enumerate(results):
432+
assert not isinstance(result, Exception), f"Execution {i} failed: {result}"
433+
assert result["counter"] == i + 1
434+
assert f"processed_{i}" in result["result"]
435+
436+
spans = span_exporter.get_finished_spans()
437+
438+
assert len(spans) >= 100, f"Expected at least 100 spans, got {len(spans)}"
439+
440+
workflow_spans = [s for s in spans if s.name == "LangGraph.workflow"]
441+
concurrent_spans = [s for s in spans if s.name == "concurrent_async_span"]
442+
nested_spans = [s for s in spans if s.name == "nested_span"]
443+
parallel_task_spans = [s for s in spans if s.name.startswith("parallel_task_")]
444+
445+
assert (
446+
len(workflow_spans) == 10
447+
), f"Expected 10 workflow spans, got {len(workflow_spans)}"
448+
assert (
449+
len(concurrent_spans) == 10
450+
), f"Expected 10 concurrent spans, got {len(concurrent_spans)}"
451+
assert (
452+
len(nested_spans) == 10
453+
), f"Expected 10 nested spans, got {len(nested_spans)}"
454+
assert (
455+
len(parallel_task_spans) == 50
456+
), f"Expected 50 parallel task spans, got {len(parallel_task_spans)}"
457+
458+
error_logs = [
459+
record.message
460+
for record in caplog.records
461+
if record.levelno >= logging.ERROR
462+
]
463+
context_errors = [
464+
msg for msg in error_logs if "Failed to detach context" in msg
465+
]
466+
467+
assert len(context_errors) == 0, (
468+
f"Found {len(context_errors)} context detachment errors in logs. "
469+
f"This indicates the fix is not working properly. Errors: {context_errors}"
470+
)
471+
472+
for nested_span in nested_spans:
473+
assert nested_span.parent is not None, "Nested spans should have parents"
474+
parent_span = next(
475+
(s for s in spans if s.context.span_id == nested_span.parent.span_id),
476+
None,
477+
)
478+
assert parent_span is not None, "Parent span should exist"
479+
assert (
480+
parent_span.name == "concurrent_async_span"
481+
), "Nested span should be child of concurrent_async_span"

0 commit comments

Comments
 (0)