diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md b/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md index c3d51f9fef..9b8b4865f7 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md @@ -7,5 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- Added agent span support for GenAI LangChain instrumentation with `invoke_agent` operation and chain tracking. + ([#3788](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3788)) - Added span support for genAI langchain llm invocation. - ([#3665](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3665)) \ No newline at end of file + ([#3665](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3665)) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py index 138eb311a2..57948b4f98 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/callback_handler.py @@ -17,11 +17,15 @@ from typing import Any from uuid import UUID +from langchain_core.agents import AgentAction, AgentFinish # type: ignore from langchain_core.callbacks import BaseCallbackHandler # type: ignore from langchain_core.messages import BaseMessage # type: ignore from langchain_core.outputs import LLMResult # type: ignore -from opentelemetry.instrumentation.langchain.span_manager import _SpanManager +from opentelemetry.instrumentation.langchain.span_manager import ( + _OPERATION_INVOKE_AGENT, + _SpanManager, +) from opentelemetry.semconv._incubating.attributes import ( gen_ai_attributes as GenAI, ) @@ -49,9 +53,9 @@ def on_chat_model_start( messages: list[list[BaseMessage]], # type: ignore *, run_id: UUID, - tags: list[str] | None, - parent_run_id: UUID | None, - metadata: dict[str, Any] | None, + tags: list[str] | None = None, + parent_run_id: UUID | None = None, + metadata: dict[str, Any] | None = None, **kwargs: Any, ) -> None: # Other providers/LLMs may be supported in the future and telemetry for them is skipped for now. @@ -141,7 +145,7 @@ def on_llm_end( response: LLMResult, # type: ignore [reportUnknownParameterType] *, run_id: UUID, - parent_run_id: UUID | None, + parent_run_id: UUID | None = None, **kwargs: Any, ) -> None: span = self.span_manager.get_span(run_id) @@ -218,7 +222,112 @@ def on_llm_error( error: BaseException, *, run_id: UUID, - parent_run_id: UUID | None, + parent_run_id: UUID | None = None, **kwargs: Any, ) -> None: self.span_manager.handle_error(error, run_id) + + def on_chain_start( + self, + serialized: dict[str, Any], + inputs: dict[str, Any], + *, + run_id: UUID, + parent_run_id: UUID | None = None, + tags: list[str] | None = None, + metadata: dict[str, Any] | None = None, + **kwargs: Any, + ) -> None: + """Run when chain starts running.""" + # Extract chain name from serialized or kwargs + chain_name = "unknown" + if ( + serialized + and "kwargs" in serialized + and serialized["kwargs"].get("name") + ): + chain_name = serialized["kwargs"]["name"] + elif kwargs.get("name"): + chain_name = kwargs["name"] + elif serialized.get("name"): + chain_name = serialized["name"] + elif "id" in serialized: + chain_name = serialized["id"][-1] + + span = self.span_manager.create_chain_span( + run_id=run_id, + parent_run_id=parent_run_id, + chain_name=chain_name, + ) + + # If this is an agent chain, set agent-specific attributes + if metadata and "agent_name" in metadata: + span.set_attribute(GenAI.GEN_AI_AGENT_NAME, metadata["agent_name"]) + span.set_attribute(GenAI.GEN_AI_OPERATION_NAME, _OPERATION_INVOKE_AGENT) + + def on_chain_end( + self, + outputs: dict[str, Any], + *, + run_id: UUID, + parent_run_id: UUID | None = None, + tags: list[str] | None = None, + **kwargs: Any, + ) -> None: + """Run when chain ends running.""" + self.span_manager.end_span(run_id) + + def on_chain_error( + self, + error: BaseException, + *, + run_id: UUID, + parent_run_id: UUID | None = None, + tags: list[str] | None = None, + **kwargs: Any, + ) -> None: + """Run when chain errors.""" + self.span_manager.handle_error(error, run_id) + + def on_agent_action( + self, + action: AgentAction, # type: ignore[type-arg] + *, + run_id: UUID, + parent_run_id: UUID | None = None, + tags: list[str] | None = None, + **kwargs: Any, + ) -> None: + """Run on agent action.""" + # Agent actions are tracked as part of the chain span + # We can add attributes to the existing span if needed + span = self.span_manager.get_span(run_id) + if span: + tool = getattr(action, "tool", None) # type: ignore[arg-type] + if tool: + span.set_attribute("langchain.agent.action.tool", tool) + tool_input = getattr(action, "tool_input", None) # type: ignore[arg-type] + if tool_input: + span.set_attribute( + "langchain.agent.action.tool_input", str(tool_input) + ) + + def on_agent_finish( + self, + finish: AgentFinish, # type: ignore[type-arg] + *, + run_id: UUID, + parent_run_id: UUID | None = None, + tags: list[str] | None = None, + **kwargs: Any, + ) -> None: + """Run on agent finish.""" + # Agent finish is tracked as part of the chain span + span = self.span_manager.get_span(run_id) + if span: + return_values = getattr(finish, "return_values", None) # type: ignore[arg-type] + if return_values and "output" in return_values: + span.set_attribute( + "langchain.agent.finish.output", + str(return_values["output"]), + ) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/span_manager.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/span_manager.py index 636bfc3bc3..50d21cb82d 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/span_manager.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/src/opentelemetry/instrumentation/langchain/span_manager.py @@ -25,7 +25,10 @@ from opentelemetry.trace import Span, SpanKind, Tracer, set_span_in_context from opentelemetry.trace.status import Status, StatusCode -__all__ = ["_SpanManager"] +__all__ = ["_SpanManager", "_OPERATION_INVOKE_AGENT"] + +# Operation name constants +_OPERATION_INVOKE_AGENT = "invoke_agent" @dataclass @@ -91,6 +94,51 @@ def create_chat_span( return span + def create_agent_span( + self, + run_id: UUID, + parent_run_id: Optional[UUID], + agent_name: Optional[str] = None, + ) -> Span: + """Create a span for agent invocation.""" + # Use "unknown" as default if agent_name is not provided + effective_agent_name = agent_name or "unknown" + span_name = f"{_OPERATION_INVOKE_AGENT} {effective_agent_name}" + span = self._create_span( + run_id=run_id, + parent_run_id=parent_run_id, + span_name=span_name, + kind=SpanKind.CLIENT, + ) + span.set_attribute( + GenAI.GEN_AI_OPERATION_NAME, + _OPERATION_INVOKE_AGENT, + ) + span.set_attribute(GenAI.GEN_AI_AGENT_NAME, effective_agent_name) + + return span + + def create_chain_span( + self, + run_id: UUID, + parent_run_id: Optional[UUID], + chain_name: str, + ) -> Span: + """Create a span for chain execution. + + Chains are internal operations by default and don't have gen_ai.operation.name. + However, if the chain represents an agent (determined by metadata in the callback), + the operation name and agent name attributes will be set separately by the + callback handler to make it an agent span. + """ + span = self._create_span( + run_id=run_id, + parent_run_id=parent_run_id, + span_name=f"chain {chain_name}", + kind=SpanKind.INTERNAL, + ) + return span + def end_span(self, run_id: UUID) -> None: state = self.spans[run_id] for child_id in state.children: diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_agent_spans.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_agent_spans.py new file mode 100644 index 0000000000..c235f355e7 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_agent_spans.py @@ -0,0 +1,182 @@ +"""Tests for agent-related spans in LangChain instrumentation.""" + +from unittest.mock import MagicMock +from uuid import uuid4 + +import pytest +from langchain_core.agents import AgentAction, AgentFinish + +from opentelemetry.instrumentation.langchain.callback_handler import ( + OpenTelemetryLangChainCallbackHandler, +) +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.trace import SpanKind + + +@pytest.fixture +def callback_handler(tracer_provider): + tracer = tracer_provider.get_tracer("test") + return OpenTelemetryLangChainCallbackHandler(tracer=tracer) + + +def test_agent_chain_span(callback_handler, span_exporter): + """Test that agent chains create proper invoke_agent spans. + + Note: Agent chains are created via create_chain_span() but are enhanced + with agent-specific attributes (gen_ai.agent.name and gen_ai.operation.name) + when metadata contains 'agent_name'. This is different from regular chains + which remain as internal operations without gen_ai.operation.name. + """ + run_id = uuid4() + parent_run_id = uuid4() + + # Start a chain that represents an agent (note: metadata includes agent_name) + callback_handler.on_chain_start( + serialized={ + "name": "TestAgent", + "id": ["langchain", "agents", "TestAgent"], + }, + inputs={"input": "What is the capital of France?"}, + run_id=run_id, + parent_run_id=parent_run_id, + metadata={"agent_name": "TestAgent"}, + ) + + # End the chain + callback_handler.on_chain_end( + outputs={"output": "The capital of France is Paris."}, + run_id=run_id, + parent_run_id=parent_run_id, + ) + + # Verify the span + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.name == "chain TestAgent" + assert span.kind == SpanKind.INTERNAL + # Agent chains have these attributes set via callback_handler when metadata contains agent_name + assert span.attributes.get(GenAI.GEN_AI_AGENT_NAME) == "TestAgent" + assert span.attributes.get(GenAI.GEN_AI_OPERATION_NAME) == "invoke_agent" + + +def test_agent_action_tracking(callback_handler, span_exporter): + """Test that agent actions are properly tracked.""" + run_id = uuid4() + parent_run_id = uuid4() + + # Start a chain + callback_handler.on_chain_start( + serialized={"name": "Agent"}, + inputs={"input": "What is 2 + 2?"}, + run_id=run_id, + parent_run_id=parent_run_id, + ) + + # Agent takes an action + action = MagicMock(spec=AgentAction) + action.tool = "calculator" + action.tool_input = "2 + 2" + + callback_handler.on_agent_action( + action=action, + run_id=run_id, + parent_run_id=parent_run_id, + ) + + # Agent finishes + finish = MagicMock(spec=AgentFinish) + finish.return_values = {"output": "The answer is 4"} + + callback_handler.on_agent_finish( + finish=finish, + run_id=run_id, + parent_run_id=parent_run_id, + ) + + # End the chain + callback_handler.on_chain_end( + outputs={"output": "The answer is 4"}, + run_id=run_id, + parent_run_id=parent_run_id, + ) + + # Verify the span + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.attributes.get("langchain.agent.action.tool") == "calculator" + assert span.attributes.get("langchain.agent.action.tool_input") == "2 + 2" + assert ( + span.attributes.get("langchain.agent.finish.output") + == "The answer is 4" + ) + + +def test_regular_chain_without_agent(callback_handler, span_exporter): + """Test that regular chains don't get agent attributes.""" + run_id = uuid4() + parent_run_id = uuid4() + + # Start a regular chain (not an agent) + callback_handler.on_chain_start( + serialized={"name": "RegularChain"}, + inputs={"input": "Test input"}, + run_id=run_id, + parent_run_id=parent_run_id, + metadata={}, # No agent_name in metadata + ) + + # End the chain + callback_handler.on_chain_end( + outputs={"output": "Test output"}, + run_id=run_id, + parent_run_id=parent_run_id, + ) + + # Verify the span + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.name == "chain RegularChain" + assert span.kind == SpanKind.INTERNAL + assert GenAI.GEN_AI_AGENT_NAME not in span.attributes + assert ( + GenAI.GEN_AI_OPERATION_NAME not in span.attributes + ) # Regular chains don't have operation name + + +def test_chain_error_handling(callback_handler, span_exporter): + """Test that chain errors are properly handled.""" + run_id = uuid4() + parent_run_id = uuid4() + + # Start a chain + callback_handler.on_chain_start( + serialized={"name": "ErrorChain"}, + inputs={"input": "Test input"}, + run_id=run_id, + parent_run_id=parent_run_id, + ) + + # Chain encounters an error + error = ValueError("Test error") + callback_handler.on_chain_error( + error=error, + run_id=run_id, + parent_run_id=parent_run_id, + ) + + # Verify the span + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.name == "chain ErrorChain" + assert span.status.status_code.name == "ERROR" + assert span.attributes.get("error.type") == "ValueError" diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_chain_spans.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_chain_spans.py new file mode 100644 index 0000000000..478746fca0 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/tests/test_chain_spans.py @@ -0,0 +1,214 @@ +"""Tests for chain execution spans in LangChain instrumentation.""" + +from uuid import uuid4 + +import pytest + +from opentelemetry.instrumentation.langchain.callback_handler import ( + OpenTelemetryLangChainCallbackHandler, +) +from opentelemetry.trace import SpanKind + + +@pytest.fixture +def callback_handler(tracer_provider): + tracer = tracer_provider.get_tracer("test") + return OpenTelemetryLangChainCallbackHandler(tracer=tracer) + + +def test_basic_chain_span(callback_handler, span_exporter): + """Test that chains create proper spans.""" + run_id = uuid4() + parent_run_id = uuid4() + + # Start a chain + callback_handler.on_chain_start( + serialized={"name": "TestChain"}, + inputs={"question": "What is 2 + 2?"}, + run_id=run_id, + parent_run_id=parent_run_id, + ) + + # End the chain + callback_handler.on_chain_end( + outputs={"answer": "4"}, + run_id=run_id, + parent_run_id=parent_run_id, + ) + + # Verify the span + spans = span_exporter.get_finished_spans() + assert len(spans) == 1 + + span = spans[0] + assert span.name == "chain TestChain" + assert span.kind == SpanKind.INTERNAL + + +def test_nested_chains(callback_handler, span_exporter): + """Test that nested chains create proper parent-child relationships.""" + parent_chain_id = uuid4() + child_chain_id = uuid4() + + # Start parent chain + callback_handler.on_chain_start( + serialized={"name": "ParentChain"}, + inputs={"input": "parent input"}, + run_id=parent_chain_id, + parent_run_id=None, + ) + + # Start child chain + callback_handler.on_chain_start( + serialized={"name": "ChildChain"}, + inputs={"input": "child input"}, + run_id=child_chain_id, + parent_run_id=parent_chain_id, + ) + + # End child chain + callback_handler.on_chain_end( + outputs={"output": "child output"}, + run_id=child_chain_id, + parent_run_id=parent_chain_id, + ) + + # End parent chain + callback_handler.on_chain_end( + outputs={"output": "parent output"}, + run_id=parent_chain_id, + parent_run_id=None, + ) + + # Verify spans + spans = span_exporter.get_finished_spans() + assert len(spans) == 2 + + # Find the spans + parent_span = next(s for s in spans if "ParentChain" in s.name) + child_span = next(s for s in spans if "ChildChain" in s.name) + + assert parent_span.name == "chain ParentChain" + assert child_span.name == "chain ChildChain" + + # The child should have been ended before the parent + assert child_span.parent is not None + + +def test_chain_name_extraction_fallbacks(callback_handler, span_exporter): + """Test various methods for extracting chain names.""" + + # Test 1: Name from kwargs in serialized + run_id1 = uuid4() + callback_handler.on_chain_start( + serialized={"kwargs": {"name": "KwargsChain"}}, + inputs={"input": "test"}, + run_id=run_id1, + ) + callback_handler.on_chain_end(outputs={"output": "result"}, run_id=run_id1) + + # Test 2: Name from kwargs parameter + run_id2 = uuid4() + callback_handler.on_chain_start( + serialized={}, + inputs={"input": "test"}, + run_id=run_id2, + name="DirectKwargsChain", + ) + callback_handler.on_chain_end(outputs={"output": "result"}, run_id=run_id2) + + # Test 3: Name from serialized name field + run_id3 = uuid4() + callback_handler.on_chain_start( + serialized={"name": "SerializedNameChain"}, + inputs={"input": "test"}, + run_id=run_id3, + ) + callback_handler.on_chain_end(outputs={"output": "result"}, run_id=run_id3) + + # Test 4: Name from serialized id (last element) + run_id4 = uuid4() + callback_handler.on_chain_start( + serialized={"id": ["langchain", "chains", "IdChain"]}, + inputs={"input": "test"}, + run_id=run_id4, + ) + callback_handler.on_chain_end(outputs={"output": "result"}, run_id=run_id4) + + # Test 5: Fallback to "unknown" + run_id5 = uuid4() + callback_handler.on_chain_start( + serialized={}, + inputs={"input": "test"}, + run_id=run_id5, + ) + callback_handler.on_chain_end(outputs={"output": "result"}, run_id=run_id5) + + # Verify all spans + spans = span_exporter.get_finished_spans() + assert len(spans) == 5 + + span_names = [span.name for span in spans] + assert "chain KwargsChain" in span_names + assert "chain DirectKwargsChain" in span_names + assert "chain SerializedNameChain" in span_names + assert "chain IdChain" in span_names + assert "chain unknown" in span_names + + +def test_chain_with_nested_structure(callback_handler, span_exporter): + """Test a complex chain with nested chains.""" + main_chain_id = uuid4() + sub_chain1_id = uuid4() + sub_chain2_id = uuid4() + + # Start the main chain + callback_handler.on_chain_start( + serialized={"name": "MainChain"}, + inputs={"query": "complex task"}, + run_id=main_chain_id, + ) + + # First sub-chain + callback_handler.on_chain_start( + serialized={"name": "SubChain1"}, + inputs={"input": "sub task 1"}, + run_id=sub_chain1_id, + parent_run_id=main_chain_id, + ) + callback_handler.on_chain_end( + outputs={"output": "sub result 1"}, + run_id=sub_chain1_id, + parent_run_id=main_chain_id, + ) + + # Second sub-chain + callback_handler.on_chain_start( + serialized={"name": "SubChain2"}, + inputs={"input": "sub task 2"}, + run_id=sub_chain2_id, + parent_run_id=main_chain_id, + ) + callback_handler.on_chain_end( + outputs={"output": "sub result 2"}, + run_id=sub_chain2_id, + parent_run_id=main_chain_id, + ) + + # End the main chain + callback_handler.on_chain_end( + outputs={"result": "final answer"}, + run_id=main_chain_id, + ) + + # Verify spans + spans = span_exporter.get_finished_spans() + assert len(spans) == 3 + + chain_spans = [s for s in spans if "chain" in s.name] + assert len(chain_spans) == 3 + + chain_names = {span.name for span in chain_spans} + assert "chain MainChain" in chain_names + assert "chain SubChain1" in chain_names + assert "chain SubChain2" in chain_names