|
3 | 3 |
|
4 | 4 | import pydantic |
5 | 5 | from opentelemetry import context, trace |
| 6 | +from opentelemetry.context import _RUNTIME_CONTEXT |
6 | 7 |
|
7 | 8 | from langfuse._client.attributes import LangfuseOtelSpanAttributes |
| 9 | +from langfuse._client.client import Langfuse |
8 | 10 | from langfuse._client.get_client import get_client |
9 | 11 | from langfuse._client.span import ( |
10 | 12 | LangfuseAgent, |
@@ -272,7 +274,7 @@ def on_chain_start( |
272 | 274 | serialized, "chain", **kwargs |
273 | 275 | ) |
274 | 276 |
|
275 | | - span = self.client.start_observation( |
| 277 | + span = self._get_parent_observation(parent_run_id).start_observation( |
276 | 278 | name=span_name, |
277 | 279 | as_type=observation_type, |
278 | 280 | metadata=span_metadata, |
@@ -336,6 +338,22 @@ def _deregister_langfuse_prompt(self, run_id: Optional[UUID]) -> None: |
336 | 338 | if run_id is not None and run_id in self.prompt_to_parent_run_map: |
337 | 339 | del self.prompt_to_parent_run_map[run_id] |
338 | 340 |
|
| 341 | + def _get_parent_observation( |
| 342 | + self, parent_run_id: Optional[UUID] |
| 343 | + ) -> Union[ |
| 344 | + Langfuse, |
| 345 | + LangfuseAgent, |
| 346 | + LangfuseChain, |
| 347 | + LangfuseGeneration, |
| 348 | + LangfuseRetriever, |
| 349 | + LangfuseSpan, |
| 350 | + LangfuseTool, |
| 351 | + ]: |
| 352 | + if parent_run_id and parent_run_id in self.runs: |
| 353 | + return self.runs[parent_run_id] |
| 354 | + |
| 355 | + return self.client |
| 356 | + |
339 | 357 | def _attach_observation( |
340 | 358 | self, |
341 | 359 | run_id: UUID, |
@@ -369,7 +387,18 @@ def _detach_observation( |
369 | 387 | token = self.context_tokens.pop(run_id, None) |
370 | 388 |
|
371 | 389 | if token: |
372 | | - context.detach(token) |
| 390 | + try: |
| 391 | + # Directly detach from runtime context to avoid error logging |
| 392 | + _RUNTIME_CONTEXT.detach(token) |
| 393 | + except Exception: |
| 394 | + # Context detach can fail in async scenarios - this is expected and safe to ignore |
| 395 | + # The span itself was properly ended and tracing data is correctly captured |
| 396 | + # |
| 397 | + # Examples: |
| 398 | + # 1. Token created in one async task/thread, detached in another |
| 399 | + # 2. Context already detached by framework or other handlers |
| 400 | + # 3. Runtime context state mismatch in concurrent execution |
| 401 | + pass |
373 | 402 |
|
374 | 403 | return cast( |
375 | 404 | Union[ |
@@ -591,7 +620,7 @@ def on_tool_start( |
591 | 620 | serialized, "tool", **kwargs |
592 | 621 | ) |
593 | 622 |
|
594 | | - span = self.client.start_observation( |
| 623 | + span = self._get_parent_observation(parent_run_id).start_observation( |
595 | 624 | name=self.get_langchain_run_name(serialized, **kwargs), |
596 | 625 | as_type=observation_type, |
597 | 626 | input=input_str, |
@@ -626,8 +655,7 @@ def on_retriever_start( |
626 | 655 | observation_type = self._get_observation_type_from_serialized( |
627 | 656 | serialized, "retriever", **kwargs |
628 | 657 | ) |
629 | | - |
630 | | - span = self.client.start_observation( |
| 658 | + span = self._get_parent_observation(parent_run_id).start_observation( |
631 | 659 | name=span_name, |
632 | 660 | as_type=observation_type, |
633 | 661 | metadata=span_metadata, |
@@ -753,7 +781,9 @@ def __on_llm_action( |
753 | 781 | "prompt": registered_prompt, |
754 | 782 | } |
755 | 783 |
|
756 | | - generation = self.client.start_observation(as_type="generation", **content) # type: ignore |
| 784 | + generation = self._get_parent_observation(parent_run_id).start_observation( |
| 785 | + as_type="generation", **content |
| 786 | + ) # type: ignore |
757 | 787 | self._attach_observation(run_id, generation) |
758 | 788 |
|
759 | 789 | self.last_trace_id = self.runs[run_id].trace_id |
|
0 commit comments