Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Added agent span support for GenAI LangChain instrumentation with `invoke_agent` operation and chain tracking.
([#3788](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3788))
- Added span support for genAI langchain llm invocation.
([#3665](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3665))
([#3665](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3665))
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import Any
from uuid import UUID

from langchain_core.agents import AgentAction, AgentFinish # type: ignore
from langchain_core.callbacks import BaseCallbackHandler # type: ignore
from langchain_core.messages import BaseMessage # type: ignore
from langchain_core.outputs import LLMResult # type: ignore
Expand Down Expand Up @@ -49,9 +50,9 @@ def on_chat_model_start(
messages: list[list[BaseMessage]], # type: ignore
*,
run_id: UUID,
tags: list[str] | None,
parent_run_id: UUID | None,
metadata: dict[str, Any] | None,
tags: list[str] | None = None,
parent_run_id: UUID | None = None,
metadata: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
# Other providers/LLMs may be supported in the future and telemetry for them is skipped for now.
Expand Down Expand Up @@ -141,7 +142,7 @@ def on_llm_end(
response: LLMResult, # type: ignore [reportUnknownParameterType]
*,
run_id: UUID,
parent_run_id: UUID | None,
parent_run_id: UUID | None = None,
**kwargs: Any,
) -> None:
span = self.span_manager.get_span(run_id)
Expand Down Expand Up @@ -218,7 +219,112 @@ def on_llm_error(
error: BaseException,
*,
run_id: UUID,
parent_run_id: UUID | None,
parent_run_id: UUID | None = None,
**kwargs: Any,
) -> None:
self.span_manager.handle_error(error, run_id)

def on_chain_start(
self,
serialized: dict[str, Any],
inputs: dict[str, Any],
*,
run_id: UUID,
parent_run_id: UUID | None = None,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
"""Run when chain starts running."""
# Extract chain name from serialized or kwargs
chain_name = "unknown"
if (
serialized
and "kwargs" in serialized
and serialized["kwargs"].get("name")
):
chain_name = serialized["kwargs"]["name"]
elif kwargs.get("name"):
chain_name = kwargs["name"]
elif serialized.get("name"):
chain_name = serialized["name"]
elif "id" in serialized:
chain_name = serialized["id"][-1]

span = self.span_manager.create_chain_span(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make sure it creates internal span

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sound good - confirmed chain spans use SpanKind.INTERNAL here:

    def create_chain_span(
        self,
        run_id: UUID,
        parent_run_id: Optional[UUID],
        chain_name: str,
    ) -> Span:
        """Create a span for chain execution."""
        span = self._create_span(
            run_id=run_id,
            parent_run_id=parent_run_id,
            span_name=f"chain {chain_name}",
            kind=SpanKind.INTERNAL,
        )

run_id=run_id,
parent_run_id=parent_run_id,
chain_name=chain_name,
)

# If this is an agent chain, set agent-specific attributes
if metadata and "agent_name" in metadata:
span.set_attribute(GenAI.GEN_AI_AGENT_NAME, metadata["agent_name"])
span.set_attribute(GenAI.GEN_AI_OPERATION_NAME, "invoke_agent")

def on_chain_end(
self,
outputs: dict[str, Any],
*,
run_id: UUID,
parent_run_id: UUID | None = None,
tags: list[str] | None = None,
**kwargs: Any,
) -> None:
"""Run when chain ends running."""
self.span_manager.end_span(run_id)

def on_chain_error(
self,
error: BaseException,
*,
run_id: UUID,
parent_run_id: UUID | None = None,
tags: list[str] | None = None,
**kwargs: Any,
) -> None:
"""Run when chain errors."""
self.span_manager.handle_error(error, run_id)

def on_agent_action(
self,
action: AgentAction, # type: ignore[type-arg]
*,
run_id: UUID,
parent_run_id: UUID | None = None,
tags: list[str] | None = None,
**kwargs: Any,
) -> None:
"""Run on agent action."""
# Agent actions are tracked as part of the chain span
# We can add attributes to the existing span if needed
span = self.span_manager.get_span(run_id)
if span:
tool = getattr(action, "tool", None) # type: ignore[arg-type]
if tool:
span.set_attribute("langchain.agent.action.tool", tool)
tool_input = getattr(action, "tool_input", None) # type: ignore[arg-type]
if tool_input:
span.set_attribute(
"langchain.agent.action.tool_input", str(tool_input)
)

def on_agent_finish(
self,
finish: AgentFinish, # type: ignore[type-arg]
*,
run_id: UUID,
parent_run_id: UUID | None = None,
tags: list[str] | None = None,
**kwargs: Any,
) -> None:
"""Run on agent finish."""
# Agent finish is tracked as part of the chain span
span = self.span_manager.get_span(run_id)
if span:
return_values = getattr(finish, "return_values", None) # type: ignore[arg-type]
if return_values and "output" in return_values:
span.set_attribute(
"langchain.agent.finish.output",
str(return_values["output"]),
)
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,48 @@ def create_chat_span(

return span

def create_agent_span(
self,
run_id: UUID,
parent_run_id: Optional[UUID],
agent_name: Optional[str] = None,
) -> Span:
"""Create a span for agent invocation."""
span_name = (
f"invoke_agent {agent_name}" if agent_name else "invoke_agent"
)
span = self._create_span(
run_id=run_id,
parent_run_id=parent_run_id,
span_name=span_name,
kind=SpanKind.CLIENT,
)
span.set_attribute(
GenAI.GEN_AI_OPERATION_NAME,
"invoke_agent",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that the operation name might be changed in the future, if so, declaring it a constant for it might be better.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - I'm not sure if this semantic convention will change but agree that this should be moved to a constant since it's currently hardcoded in a few places.

)
if agent_name:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If agent_name is None, in that case, should a default value be passed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah makes sense, i'll default it to "unknown" for now. Open to suggestions on the naming though.

span.set_attribute(GenAI.GEN_AI_AGENT_NAME, agent_name)

return span

def create_chain_span(
self,
run_id: UUID,
parent_run_id: Optional[UUID],
chain_name: str,
) -> Span:
"""Create a span for chain execution."""
span = self._create_span(
run_id=run_id,
parent_run_id=parent_run_id,
span_name=f"chain {chain_name}",
kind=SpanKind.INTERNAL,
)
# Chains are internal operations, not direct GenAI operations
# We can track them but they don't have a gen_ai.operation.name
return span

def end_span(self, run_id: UUID) -> None:
state = self.spans[run_id]
for child_id in state.children:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
"""Tests for agent-related spans in LangChain instrumentation."""

from unittest.mock import MagicMock
from uuid import uuid4

import pytest
from langchain_core.agents import AgentAction, AgentFinish

from opentelemetry.instrumentation.langchain.callback_handler import (
OpenTelemetryLangChainCallbackHandler,
)
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
)
from opentelemetry.trace import SpanKind


@pytest.fixture
def callback_handler(tracer_provider):
tracer = tracer_provider.get_tracer("test")
return OpenTelemetryLangChainCallbackHandler(tracer=tracer)


def test_agent_chain_span(callback_handler, span_exporter):
"""Test that agent chains create proper invoke_agent spans."""
run_id = uuid4()
parent_run_id = uuid4()

# Start a chain that represents an agent
callback_handler.on_chain_start(
serialized={
"name": "TestAgent",
"id": ["langchain", "agents", "TestAgent"],
},
inputs={"input": "What is the capital of France?"},
run_id=run_id,
parent_run_id=parent_run_id,
metadata={"agent_name": "TestAgent"},
)

# End the chain
callback_handler.on_chain_end(
outputs={"output": "The capital of France is Paris."},
run_id=run_id,
parent_run_id=parent_run_id,
)

# Verify the span
spans = span_exporter.get_finished_spans()
assert len(spans) == 1

span = spans[0]
assert span.name == "chain TestAgent"
assert span.kind == SpanKind.INTERNAL
assert span.attributes.get(GenAI.GEN_AI_AGENT_NAME) == "TestAgent"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the above comment you mentioned that chain spans are internal operations and will not have an operation name but there you are adding assert statements for it, did I miss something?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review! Let me try to clarify.

Regular chains don't have gen_ai.operation.name since they're just internal operations. However, Agent chains are different. In LangChain's architecture, agents are implemented as chains so we:

  1. create them using create_agent_span() (which creates an internal span)
  2. detect if it's actually an agent by checking for agent_name in metadata
  3. if it is an agent, we add the agent-specific attributes:
  • gen_ai.agent.name
  • gen_ai.operation.name = "invoke_agent"

I'll update the docstrings to make this distinction clearer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome. Thanks for the clarification.

assert span.attributes.get(GenAI.GEN_AI_OPERATION_NAME) == "invoke_agent"


def test_agent_action_tracking(callback_handler, span_exporter):
"""Test that agent actions are properly tracked."""
run_id = uuid4()
parent_run_id = uuid4()

# Start a chain
callback_handler.on_chain_start(
serialized={"name": "Agent"},
inputs={"input": "What is 2 + 2?"},
run_id=run_id,
parent_run_id=parent_run_id,
)

# Agent takes an action
action = MagicMock(spec=AgentAction)
action.tool = "calculator"
action.tool_input = "2 + 2"

callback_handler.on_agent_action(
action=action,
run_id=run_id,
parent_run_id=parent_run_id,
)

# Agent finishes
finish = MagicMock(spec=AgentFinish)
finish.return_values = {"output": "The answer is 4"}

callback_handler.on_agent_finish(
finish=finish,
run_id=run_id,
parent_run_id=parent_run_id,
)

# End the chain
callback_handler.on_chain_end(
outputs={"output": "The answer is 4"},
run_id=run_id,
parent_run_id=parent_run_id,
)

# Verify the span
spans = span_exporter.get_finished_spans()
assert len(spans) == 1

span = spans[0]
assert span.attributes.get("langchain.agent.action.tool") == "calculator"
assert span.attributes.get("langchain.agent.action.tool_input") == "2 + 2"
assert (
span.attributes.get("langchain.agent.finish.output")
== "The answer is 4"
)


def test_regular_chain_without_agent(callback_handler, span_exporter):
"""Test that regular chains don't get agent attributes."""
run_id = uuid4()
parent_run_id = uuid4()

# Start a regular chain (not an agent)
callback_handler.on_chain_start(
serialized={"name": "RegularChain"},
inputs={"input": "Test input"},
run_id=run_id,
parent_run_id=parent_run_id,
metadata={}, # No agent_name in metadata
)

# End the chain
callback_handler.on_chain_end(
outputs={"output": "Test output"},
run_id=run_id,
parent_run_id=parent_run_id,
)

# Verify the span
spans = span_exporter.get_finished_spans()
assert len(spans) == 1

span = spans[0]
assert span.name == "chain RegularChain"
assert span.kind == SpanKind.INTERNAL
assert GenAI.GEN_AI_AGENT_NAME not in span.attributes
assert (
GenAI.GEN_AI_OPERATION_NAME not in span.attributes
) # Regular chains don't have operation name


def test_chain_error_handling(callback_handler, span_exporter):
"""Test that chain errors are properly handled."""
run_id = uuid4()
parent_run_id = uuid4()

# Start a chain
callback_handler.on_chain_start(
serialized={"name": "ErrorChain"},
inputs={"input": "Test input"},
run_id=run_id,
parent_run_id=parent_run_id,
)

# Chain encounters an error
error = ValueError("Test error")
callback_handler.on_chain_error(
error=error,
run_id=run_id,
parent_run_id=parent_run_id,
)

# Verify the span
spans = span_exporter.get_finished_spans()
assert len(spans) == 1

span = spans[0]
assert span.name == "chain ErrorChain"
assert span.status.status_code.name == "ERROR"
assert span.attributes.get("error.type") == "ValueError"
Loading