diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/.gitignore b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/.gitignore index 639ef5d194..b6abc5e16a 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/.gitignore +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/.gitignore @@ -1,2 +1,3 @@ examples/.env examples/openai_agents_multi_agent_travel/.env +examples/**/.env diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/CHANGELOG.md b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/CHANGELOG.md index f01a53155a..d4fed08923 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/CHANGELOG.md +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/CHANGELOG.md @@ -9,3 +9,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Initial barebones package skeleton: minimal instrumentor stub, version module, and packaging metadata/entry point. + ([#3805](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3805)) +- Implement OpenAI Agents span processing aligned with GenAI semantic conventions. + ([#3817](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3817)) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/handoffs/.env.example b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/handoffs/.env.example new file mode 100644 index 0000000000..6c7ed0b427 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/handoffs/.env.example @@ -0,0 +1,11 @@ +# Update this with your real OpenAI API key +OPENAI_API_KEY=sk-YOUR_API_KEY + +# Uncomment and adjust if you use a non-default OTLP collector endpoint +# OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +# OTEL_EXPORTER_OTLP_PROTOCOL=grpc + +OTEL_SERVICE_NAME=opentelemetry-python-openai-agents-handoffs + +# Optionally override the agent name reported on spans +# OTEL_GENAI_AGENT_NAME=Travel Concierge diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/handoffs/README.rst b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/handoffs/README.rst new file mode 100644 index 0000000000..e3bdd305d8 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/handoffs/README.rst @@ -0,0 +1,39 @@ +OpenTelemetry OpenAI Agents Handoff Example +========================================== + +This example shows how the OpenTelemetry OpenAI Agents instrumentation captures +spans in a small multi-agent workflow. Three agents collaborate: a primary +concierge, a concise assistant with a random-number tool, and a Spanish +specialist reached through a handoff. Running the sample produces +``invoke_agent`` spans for each agent as well as an ``execute_tool`` span for +the random-number function. + +Setup +----- + +1. Copy `.env.example <.env.example>`_ to `.env` and populate it with your real + ``OPENAI_API_KEY``. Adjust the OTLP exporter settings if your collector does + not listen on ``http://localhost:4317``. +2. Create a virtual environment and install the dependencies: + + :: + + python3 -m venv .venv + source .venv/bin/activate + pip install "python-dotenv[cli]" + pip install -r requirements.txt + +Run +--- + +Execute the workflow with ``dotenv`` so the environment variables from ``.env`` +are loaded automatically: + +:: + + dotenv run -- python main.py + +The script emits a short transcript to stdout while spans stream to the OTLP +endpoint defined in your environment. You should see multiple +``invoke_agent`` spans (one per agent) and an ``execute_tool`` span for the +random-number helper triggered during the run. diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/handoffs/main.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/handoffs/main.py new file mode 100644 index 0000000000..3a37d6f838 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/handoffs/main.py @@ -0,0 +1,162 @@ +# pylint: skip-file +"""Multi-agent handoff example instrumented with OpenTelemetry.""" + +from __future__ import annotations + +import asyncio +import json +import random + +from agents import Agent, HandoffInputData, Runner, function_tool, handoff +from agents import trace as agent_trace +from agents.extensions import handoff_filters +from agents.models import is_gpt_5_default +from dotenv import load_dotenv + +from opentelemetry import trace as otel_trace +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, +) +from opentelemetry.instrumentation.openai_agents import ( + OpenAIAgentsInstrumentor, +) +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor + + +def configure_otel() -> None: + """Configure the OpenTelemetry SDK and enable the Agents instrumentation.""" + + provider = TracerProvider() + provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) + otel_trace.set_tracer_provider(provider) + + OpenAIAgentsInstrumentor().instrument(tracer_provider=provider) + + +@function_tool +def random_number_tool(maximum: int) -> int: + """Return a random integer between 0 and ``maximum``.""" + + return random.randint(0, maximum) + + +def spanish_handoff_message_filter( + handoff_message_data: HandoffInputData, +) -> HandoffInputData: + """Trim the message history forwarded to the Spanish-speaking agent.""" + + if is_gpt_5_default(): + # When GPT-5 is enabled we skip additional filtering. + return HandoffInputData( + input_history=handoff_message_data.input_history, + pre_handoff_items=tuple(handoff_message_data.pre_handoff_items), + new_items=tuple(handoff_message_data.new_items), + ) + + filtered = handoff_filters.remove_all_tools(handoff_message_data) + history = ( + tuple(filtered.input_history[2:]) + if isinstance(filtered.input_history, tuple) + else filtered.input_history[2:] + ) + + return HandoffInputData( + input_history=history, + pre_handoff_items=tuple(filtered.pre_handoff_items), + new_items=tuple(filtered.new_items), + ) + + +assistant = Agent( + name="Assistant", + instructions="Be extremely concise.", + tools=[random_number_tool], +) + +spanish_assistant = Agent( + name="Spanish Assistant", + instructions="You only speak Spanish and are extremely concise.", + handoff_description="A Spanish-speaking assistant.", +) + +concierge = Agent( + name="Concierge", + instructions=( + "Be a helpful assistant. If the traveler switches to Spanish, handoff to" + " the Spanish specialist. Use the random number tool when asked for" + " numbers." + ), + handoffs=[ + handoff(spanish_assistant, input_filter=spanish_handoff_message_filter) + ], +) + + +async def run_workflow() -> None: + """Execute a conversation that triggers tool calls and handoffs.""" + + with agent_trace(workflow_name="Travel concierge handoff"): + # Step 1: Basic conversation with the initial assistant. + result = await Runner.run( + assistant, + input="I'm planning a trip to Madrid. Can you help?", + ) + + print("Step 1 complete") + + # Step 2: Ask for a random number to exercise the tool span. + result = await Runner.run( + assistant, + input=result.to_input_list() + + [ + { + "content": "Pick a lucky number between 0 and 20", + "role": "user", + } + ], + ) + + print("Step 2 complete") + + # Step 3: Continue the conversation with the concierge agent. + result = await Runner.run( + concierge, + input=result.to_input_list() + + [ + { + "content": "Recommend some sights in Madrid for a weekend trip.", + "role": "user", + } + ], + ) + + print("Step 3 complete") + + # Step 4: Switch to Spanish to cause a handoff to the specialist. + result = await Runner.run( + concierge, + input=result.to_input_list() + + [ + { + "content": "Por favor habla en español. ¿Puedes resumir el plan?", + "role": "user", + } + ], + ) + + print("Step 4 complete") + + print("\n=== Conversation Transcript ===\n") + for message in result.to_input_list(): + print(json.dumps(message, indent=2, ensure_ascii=False)) + + +def main() -> None: + load_dotenv() + configure_otel() + asyncio.run(run_workflow()) + + +if __name__ == "__main__": + main() diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/handoffs/requirements.txt b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/handoffs/requirements.txt new file mode 100644 index 0000000000..3510fe42eb --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/handoffs/requirements.txt @@ -0,0 +1,6 @@ +openai-agents~=0.3.3 +python-dotenv~=1.0 + +opentelemetry-sdk~=1.36.0 +opentelemetry-exporter-otlp-proto-grpc~=1.36.0 +opentelemetry-instrumentation-openai-agents~=0.1.0.dev diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/manual/.env.example b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/manual/.env.example new file mode 100644 index 0000000000..84c2f28c7f --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/manual/.env.example @@ -0,0 +1,11 @@ +# Update this with your real OpenAI API key +OPENAI_API_KEY=sk-YOUR_API_KEY + +# Uncomment and adjust if you use a non-default OTLP collector endpoint +# OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +# OTEL_EXPORTER_OTLP_PROTOCOL=grpc + +OTEL_SERVICE_NAME=opentelemetry-python-openai-agents-manual + +# Optionally override the agent name reported on spans +# OTEL_GENAI_AGENT_NAME=Travel Concierge diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/manual/README.rst b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/manual/README.rst new file mode 100644 index 0000000000..1f3be9f4de --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/manual/README.rst @@ -0,0 +1,42 @@ +OpenTelemetry OpenAI Agents Instrumentation Example +=================================================== + +This example demonstrates how to manually configure the OpenTelemetry SDK +alongside the OpenAI Agents instrumentation. + +Running `main.py `_ produces spans for the end-to-end agent run, +including tool invocations and model generations. Spans are exported through +OTLP/gRPC to the endpoint configured in the environment. + +Setup +----- + +1. Copy `.env.example <.env.example>`_ to `.env` and update it with your real + ``OPENAI_API_KEY``. If your + OTLP collector is not reachable via ``http://localhost:4317``, adjust the + endpoint variables as needed. +2. Create a virtual environment and install the dependencies: + + :: + + python3 -m venv .venv + source .venv/bin/activate + pip install "python-dotenv[cli]" + pip install -r requirements.txt + +Run +--- + +Execute the sample with ``dotenv`` so the environment variables from ``.env`` +are applied: + +:: + + dotenv run -- python main.py + +The script automatically loads environment variables from ``.env`` so running +``python main.py`` directly also works if the shell already has the required +values exported. + +You should see the agent response printed to the console while spans export to +your configured observability backend. diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/manual/main.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/manual/main.py new file mode 100644 index 0000000000..a750c94f6c --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/manual/main.py @@ -0,0 +1,65 @@ +# pylint: skip-file +"""Manual OpenAI Agents instrumentation example.""" + +from __future__ import annotations + +from agents import Agent, Runner, function_tool +from dotenv import load_dotenv + +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, +) +from opentelemetry.instrumentation.openai_agents import ( + OpenAIAgentsInstrumentor, +) +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor + + +def configure_otel() -> None: + """Configure the OpenTelemetry SDK for exporting spans.""" + + provider = TracerProvider() + provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) + trace.set_tracer_provider(provider) + + OpenAIAgentsInstrumentor().instrument(tracer_provider=provider) + + +@function_tool +def get_weather(city: str) -> str: + """Return a canned weather response for the requested city.""" + + return f"The forecast for {city} is sunny with pleasant temperatures." + + +def run_agent() -> None: + """Create a simple agent and execute a single run.""" + + assistant = Agent( + name="Travel Concierge", + instructions=( + "You are a concise travel concierge. Use the weather tool when the" + " traveler asks about local conditions." + ), + tools=[get_weather], + ) + + result = Runner.run_sync( + assistant, + "I'm visiting Barcelona this weekend. How should I pack?", + ) + + print("Agent response:") + print(result.final_output) + + +def main() -> None: + load_dotenv() + configure_otel() + run_agent() + + +if __name__ == "__main__": + main() diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/manual/requirements.txt b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/manual/requirements.txt new file mode 100644 index 0000000000..3510fe42eb --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/manual/requirements.txt @@ -0,0 +1,6 @@ +openai-agents~=0.3.3 +python-dotenv~=1.0 + +opentelemetry-sdk~=1.36.0 +opentelemetry-exporter-otlp-proto-grpc~=1.36.0 +opentelemetry-instrumentation-openai-agents~=0.1.0.dev diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/zero-code/.env.example b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/zero-code/.env.example new file mode 100644 index 0000000000..8f39668502 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/zero-code/.env.example @@ -0,0 +1,14 @@ +# Update this with your real OpenAI API key +OPENAI_API_KEY=sk-YOUR_API_KEY + +# Uncomment and adjust if you use a non-default OTLP collector endpoint +# OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +# OTEL_EXPORTER_OTLP_PROTOCOL=grpc + +OTEL_SERVICE_NAME=opentelemetry-python-openai-agents-zero-code + +# Enable auto-instrumentation for logs if desired +OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED=true + +# Optionally override the agent name reported on spans +# OTEL_GENAI_AGENT_NAME=Travel Concierge diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/zero-code/README.rst b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/zero-code/README.rst new file mode 100644 index 0000000000..75a9ff5385 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/zero-code/README.rst @@ -0,0 +1,41 @@ +OpenTelemetry OpenAI Agents Zero-Code Instrumentation Example +============================================================= + +This example shows how to capture telemetry from OpenAI Agents without +changing your application code by using ``opentelemetry-instrument``. + +When `main.py `_ is executed, spans describing the agent workflow are +exported to the configured OTLP endpoint. The spans include details such as the +operation name, tool usage, and token consumption (when available). + +Setup +----- + +1. Copy `.env.example <.env.example>`_ to `.env` and update it with your real + ``OPENAI_API_KEY``. Adjust the + OTLP endpoint settings if your collector is not reachable via + ``http://localhost:4317``. +2. Create a virtual environment and install the dependencies: + + :: + + python3 -m venv .venv + source .venv/bin/activate + pip install "python-dotenv[cli]" + pip install -r requirements.txt + +Run +--- + +Execute the sample via ``opentelemetry-instrument`` so the OpenAI Agents +instrumentation is activated automatically: + +:: + + dotenv run -- opentelemetry-instrument python main.py + +Because ``main.py`` invokes ``load_dotenv``, running ``python main.py`` directly +also works when the required environment variables are already exported. + +You should see the agent response printed to the console while spans export to +your observability backend. diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/zero-code/main.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/zero-code/main.py new file mode 100644 index 0000000000..4f59c01644 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/zero-code/main.py @@ -0,0 +1,66 @@ +"""Zero-code OpenAI Agents example.""" + +from __future__ import annotations + +from agents import Agent, Runner, function_tool +from dotenv import load_dotenv + +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, +) +from opentelemetry.instrumentation.openai_agents import ( + OpenAIAgentsInstrumentor, +) +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor + + +def configure_tracing() -> None: + """Ensure tracing exports spans even without auto-instrumentation.""" + + current_provider = trace.get_tracer_provider() + if isinstance(current_provider, TracerProvider): + provider = current_provider + else: + provider = TracerProvider() + provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) + trace.set_tracer_provider(provider) + + OpenAIAgentsInstrumentor().instrument(tracer_provider=provider) + + +@function_tool +def get_weather(city: str) -> str: + """Return a canned weather response for the requested city.""" + + return f"The forecast for {city} is sunny with pleasant temperatures." + + +def run_agent() -> None: + assistant = Agent( + name="Travel Concierge", + instructions=( + "You are a concise travel concierge. Use the weather tool when the" + " traveler asks about local conditions." + ), + tools=[get_weather], + ) + + result = Runner.run_sync( + assistant, + "I'm visiting Barcelona this weekend. How should I pack?", + ) + + print("Agent response:") + print(result.final_output) + + +def main() -> None: + load_dotenv() + configure_tracing() + run_agent() + + +if __name__ == "__main__": + main() diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/zero-code/requirements.txt b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/zero-code/requirements.txt new file mode 100644 index 0000000000..de86e88601 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/examples/zero-code/requirements.txt @@ -0,0 +1,7 @@ +openai-agents~=0.3.3 +python-dotenv~=1.0 + +opentelemetry-sdk~=1.36.0 +opentelemetry-exporter-otlp-proto-grpc~=1.36.0 +opentelemetry-distro~=0.57b0 +opentelemetry-instrumentation-openai-agents~=0.1.0.dev diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/src/opentelemetry/instrumentation/openai_agents/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/src/opentelemetry/instrumentation/openai_agents/__init__.py index 6cf9599b64..985760f5a9 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/src/opentelemetry/instrumentation/openai_agents/__init__.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/src/opentelemetry/instrumentation/openai_agents/__init__.py @@ -12,34 +12,120 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Barebones OpenAI Agents instrumentation package. +"""OpenAI Agents instrumentation for OpenTelemetry.""" -This branch provides only the minimal package skeleton: -- Instrumentor class stub -- Version module -- Packaging metadata/entry point -""" +from __future__ import annotations -from typing import Collection +import importlib +import os +from typing import TYPE_CHECKING, Any, Collection, Protocol from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.semconv.schemas import Schemas +from opentelemetry.trace import get_tracer from .package import _instruments +from .span_processor import _OpenAIAgentsSpanProcessor from .version import __version__ # noqa: F401 -__all__ = [ - "OpenAIAgentsInstrumentor", -] +if TYPE_CHECKING: + from agents.tracing.processor_interface import TracingProcessor +else: # pragma: no cover - runtime fallback when Agents SDK isn't installed + TracingProcessor = Any + + +class _ProcessorHolder(Protocol): + _processors: Collection[TracingProcessor] + + +class _TraceProviderLike(Protocol): + _multi_processor: _ProcessorHolder + + +__all__ = ["OpenAIAgentsInstrumentor"] + + +def _load_tracing_module(): + return importlib.import_module("agents.tracing") + + +def _resolve_system(value: str | None) -> str: + if not value: + return GenAI.GenAiSystemValues.OPENAI.value + + normalized = value.strip().lower() + for member in GenAI.GenAiSystemValues: + if normalized == member.value: + return member.value + if normalized == member.name.lower(): + return member.value + return value + + +def _get_registered_processors( + provider: _TraceProviderLike, +) -> list[TracingProcessor]: + """Return tracing processors registered on the OpenAI Agents trace provider. + + The provider exposes a private `_multi_processor` attribute with a `_processors` + collection that stores the currently registered processors in execution order. + """ + multi = getattr(provider, "_multi_processor", None) + processors = getattr(multi, "_processors", ()) + return list(processors) class OpenAIAgentsInstrumentor(BaseInstrumentor): - """Minimal instrumentor stub (no-op).""" + """Instrumentation that bridges OpenAI Agents tracing to OpenTelemetry spans.""" + + def __init__(self) -> None: + super().__init__() + self._processor: _OpenAIAgentsSpanProcessor | None = None + + def _instrument(self, **kwargs) -> None: + if self._processor is not None: + return + + tracer_provider = kwargs.get("tracer_provider") + tracer = get_tracer( + __name__, + "", + tracer_provider, + schema_url=Schemas.V1_28_0.value, + ) + + system = _resolve_system(kwargs.get("system")) + agent_name_override = kwargs.get("agent_name") or os.getenv( + "OTEL_GENAI_AGENT_NAME" + ) + + processor = _OpenAIAgentsSpanProcessor( + tracer=tracer, + system=system, + agent_name_override=agent_name_override, + ) + + tracing = _load_tracing_module() + provider = tracing.get_trace_provider() + existing = _get_registered_processors(provider) + provider.set_processors([*existing, processor]) + self._processor = processor + + def _uninstrument(self, **kwargs) -> None: + if self._processor is None: + return - def _instrument(self, **kwargs) -> None: # pragma: no cover - stub - return + tracing = _load_tracing_module() + provider = tracing.get_trace_provider() + current = _get_registered_processors(provider) + filtered = [proc for proc in current if proc is not self._processor] + provider.set_processors(filtered) - def _uninstrument(self, **kwargs) -> None: # pragma: no cover - stub - return + self._processor.shutdown() + self._processor = None def instrumentation_dependencies(self) -> Collection[str]: return _instruments diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/src/opentelemetry/instrumentation/openai_agents/span_processor.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/src/opentelemetry/instrumentation/openai_agents/span_processor.py new file mode 100644 index 0000000000..8c2943799b --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/src/opentelemetry/instrumentation/openai_agents/span_processor.py @@ -0,0 +1,510 @@ +from __future__ import annotations + +from collections.abc import Mapping, Sequence +from dataclasses import dataclass +from datetime import datetime +from threading import RLock +from time import time_ns +from typing import Any +from urllib.parse import urlparse + +try: # pragma: no cover - used when OpenAI Agents is available + from agents.tracing.processor_interface import TracingProcessor + from agents.tracing.spans import Span as AgentsSpan + from agents.tracing.traces import Trace as AgentsTrace +except ImportError: # pragma: no cover - fallback for tests + + class TracingProcessor: # type: ignore[misc] + pass + + AgentsSpan = Any # type: ignore[assignment] + AgentsTrace = Any # type: ignore[assignment] +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAI, +) +from opentelemetry.semconv._incubating.attributes import ( + server_attributes as ServerAttributes, +) +from opentelemetry.trace import Span, SpanKind, Tracer, set_span_in_context +from opentelemetry.trace.status import Status, StatusCode + +SPAN_TYPE_GENERATION = "generation" +SPAN_TYPE_RESPONSE = "response" +SPAN_TYPE_AGENT = "agent" +SPAN_TYPE_AGENT_CREATION = "agent_creation" +SPAN_TYPE_FUNCTION = "function" +SPAN_TYPE_SPEECH = "speech" +SPAN_TYPE_TRANSCRIPTION = "transcription" +SPAN_TYPE_SPEECH_GROUP = "speech_group" +SPAN_TYPE_GUARDRAIL = "guardrail" +SPAN_TYPE_HANDOFF = "handoff" +SPAN_TYPE_MCP_TOOLS = "mcp_tools" + +_CLIENT_SPAN_TYPES = frozenset( + { + SPAN_TYPE_GENERATION, + SPAN_TYPE_RESPONSE, + SPAN_TYPE_SPEECH, + SPAN_TYPE_TRANSCRIPTION, + SPAN_TYPE_AGENT, + SPAN_TYPE_AGENT_CREATION, + } +) + +_GEN_AI_PROVIDER_NAME = "gen_ai.provider.name" + + +def _parse_iso8601(timestamp: str | None) -> int | None: + """Return nanosecond timestamp for ISO8601 string.""" + + if not timestamp: + return None + + try: + if timestamp.endswith("Z"): + timestamp = timestamp[:-1] + "+00:00" + dt = datetime.fromisoformat(timestamp) + except ValueError: + return None + + return int(dt.timestamp() * 1_000_000_000) + + +def _extract_server_attributes( + config: Mapping[str, Any] | None, +) -> dict[str, Any]: + if not config: + return {} + + base_url = config.get("base_url") + if not isinstance(base_url, str): + return {} + + try: + parsed = urlparse(base_url) + except ValueError: + return {} + + attributes: dict[str, Any] = {} + if parsed.hostname: + attributes[ServerAttributes.SERVER_ADDRESS] = parsed.hostname + if parsed.port: + attributes[ServerAttributes.SERVER_PORT] = parsed.port + + return attributes + + +def _looks_like_chat(messages: Sequence[Mapping[str, Any]] | None) -> bool: + if not messages: + return False + for message in messages: + if isinstance(message, Mapping) and message.get("role"): + return True + return False + + +def _collect_finish_reasons(choices: Sequence[Any] | None) -> list[str]: + reasons: list[str] = [] + if not choices: + return reasons + + for choice in choices: + if isinstance(choice, Mapping): + reason = choice.get("finish_reason") or choice.get("stop_reason") + if reason: + reasons.append(str(reason)) + continue + + finish_reason = getattr(choice, "finish_reason", None) + if finish_reason: + reasons.append(str(finish_reason)) + + return reasons + + +def _clean_stop_sequences(value: Any) -> Sequence[str] | None: + if value is None: + return None + if isinstance(value, str): + return [value] + if isinstance(value, Sequence): + cleaned: list[str] = [] + for item in value: + if item is None: + continue + cleaned.append(str(item)) + return cleaned if cleaned else None + return None + + +@dataclass +class _SpanContext: + span: Span + kind: SpanKind + + +class _OpenAIAgentsSpanProcessor(TracingProcessor): + """Convert OpenAI Agents traces into OpenTelemetry spans.""" + + def __init__( + self, + tracer: Tracer, + system: str, + agent_name_override: str | None = None, + ) -> None: + self._tracer = tracer + self._system = system + self._agent_name_override = ( + agent_name_override.strip() + if isinstance(agent_name_override, str) + and agent_name_override.strip() + else None + ) + self._spans: dict[str, _SpanContext] = {} + self._root_spans: dict[str, Span] = {} + self._lock = RLock() + + def _operation_name(self, span_data: Any) -> str: + span_type = getattr(span_data, "type", None) + explicit_operation = getattr(span_data, "operation", None) + normalized_operation = ( + explicit_operation.strip().lower() + if isinstance(explicit_operation, str) + else None + ) + if span_type == SPAN_TYPE_GENERATION: + if _looks_like_chat(getattr(span_data, "input", None)): + return GenAI.GenAiOperationNameValues.CHAT.value + return GenAI.GenAiOperationNameValues.TEXT_COMPLETION.value + if span_type == SPAN_TYPE_AGENT: + if normalized_operation in {"create", "create_agent"}: + return GenAI.GenAiOperationNameValues.CREATE_AGENT.value + if normalized_operation in {"invoke", "invoke_agent"}: + return GenAI.GenAiOperationNameValues.INVOKE_AGENT.value + return GenAI.GenAiOperationNameValues.INVOKE_AGENT.value + if span_type == SPAN_TYPE_AGENT_CREATION: + return GenAI.GenAiOperationNameValues.CREATE_AGENT.value + if span_type == SPAN_TYPE_FUNCTION: + return GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value + if span_type == SPAN_TYPE_RESPONSE: + return GenAI.GenAiOperationNameValues.CHAT.value + return span_type or "operation" + + def _span_kind(self, span_data: Any) -> SpanKind: + span_type = getattr(span_data, "type", None) + if span_type in _CLIENT_SPAN_TYPES: + return SpanKind.CLIENT + # Tool invocations (e.g. span type "function") execute inside the agent + # runtime, so there is no remote peer to model; we keep them INTERNAL. + return SpanKind.INTERNAL + + def _span_name(self, operation: str, attributes: Mapping[str, Any]) -> str: + model = attributes.get(GenAI.GEN_AI_REQUEST_MODEL) or attributes.get( + GenAI.GEN_AI_RESPONSE_MODEL + ) + agent_name = attributes.get(GenAI.GEN_AI_AGENT_NAME) + tool_name = attributes.get(GenAI.GEN_AI_TOOL_NAME) + + if operation in ( + GenAI.GenAiOperationNameValues.CHAT.value, + GenAI.GenAiOperationNameValues.TEXT_COMPLETION.value, + GenAI.GenAiOperationNameValues.GENERATE_CONTENT.value, + GenAI.GenAiOperationNameValues.EMBEDDINGS.value, + ): + return f"{operation} {model}" if model else operation + if operation == GenAI.GenAiOperationNameValues.INVOKE_AGENT.value: + return f"{operation} {agent_name}" if agent_name else operation + if operation == GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value: + return f"{operation} {tool_name}" if tool_name else operation + if operation == GenAI.GenAiOperationNameValues.CREATE_AGENT.value: + return f"{operation} {agent_name}" if agent_name else operation + return operation + + def _base_attributes(self) -> dict[str, Any]: + return {_GEN_AI_PROVIDER_NAME: self._system} + + def _attributes_from_generation(self, span_data: Any) -> dict[str, Any]: + attributes = self._base_attributes() + attributes[GenAI.GEN_AI_OPERATION_NAME] = self._operation_name( + span_data + ) + + model = getattr(span_data, "model", None) + if model: + attributes[GenAI.GEN_AI_REQUEST_MODEL] = model + + attributes.update( + _extract_server_attributes( + getattr(span_data, "model_config", None) + ) + ) + + usage = getattr(span_data, "usage", None) + if isinstance(usage, Mapping): + input_tokens = usage.get("prompt_tokens") or usage.get( + "input_tokens" + ) + if input_tokens is not None: + attributes[GenAI.GEN_AI_USAGE_INPUT_TOKENS] = input_tokens + output_tokens = usage.get("completion_tokens") or usage.get( + "output_tokens" + ) + if output_tokens is not None: + attributes[GenAI.GEN_AI_USAGE_OUTPUT_TOKENS] = output_tokens + + model_config = getattr(span_data, "model_config", None) + if isinstance(model_config, Mapping): + mapping = { + "temperature": GenAI.GEN_AI_REQUEST_TEMPERATURE, + "top_p": GenAI.GEN_AI_REQUEST_TOP_P, + "top_k": GenAI.GEN_AI_REQUEST_TOP_K, + "frequency_penalty": GenAI.GEN_AI_REQUEST_FREQUENCY_PENALTY, + "presence_penalty": GenAI.GEN_AI_REQUEST_PRESENCE_PENALTY, + "seed": GenAI.GEN_AI_REQUEST_SEED, + "n": GenAI.GEN_AI_REQUEST_CHOICE_COUNT, + } + for key, attr in mapping.items(): + value = model_config.get(key) + if value is not None: + attributes[attr] = value + + for max_key in ("max_tokens", "max_completion_tokens"): + value = model_config.get(max_key) + if value is not None: + attributes[GenAI.GEN_AI_REQUEST_MAX_TOKENS] = value + break + + stop_sequences = _clean_stop_sequences(model_config.get("stop")) + if stop_sequences: + attributes[GenAI.GEN_AI_REQUEST_STOP_SEQUENCES] = ( + stop_sequences + ) + + attributes[GenAI.GEN_AI_RESPONSE_FINISH_REASONS] = ( + _collect_finish_reasons(getattr(span_data, "output", None)) + ) + + return attributes + + def _attributes_from_response(self, span_data: Any) -> dict[str, Any]: + attributes = self._base_attributes() + attributes[GenAI.GEN_AI_OPERATION_NAME] = self._operation_name( + span_data + ) + + response = getattr(span_data, "response", None) + if response is None: + return attributes + + response_id = getattr(response, "id", None) + if response_id is not None: + attributes[GenAI.GEN_AI_RESPONSE_ID] = response_id + + response_model = getattr(response, "model", None) + if response_model: + attributes[GenAI.GEN_AI_RESPONSE_MODEL] = response_model + + usage = getattr(response, "usage", None) + if usage is not None: + input_tokens = getattr(usage, "input_tokens", None) or getattr( + usage, "prompt_tokens", None + ) + if input_tokens is not None: + attributes[GenAI.GEN_AI_USAGE_INPUT_TOKENS] = input_tokens + output_tokens = getattr(usage, "output_tokens", None) or getattr( + usage, "completion_tokens", None + ) + if output_tokens is not None: + attributes[GenAI.GEN_AI_USAGE_OUTPUT_TOKENS] = output_tokens + + output = getattr(response, "output", None) + if output: + attributes[GenAI.GEN_AI_RESPONSE_FINISH_REASONS] = ( + _collect_finish_reasons(output) + ) + + return attributes + + def _attributes_from_agent(self, span_data: Any) -> dict[str, Any]: + attributes = self._base_attributes() + attributes[GenAI.GEN_AI_OPERATION_NAME] = ( + GenAI.GenAiOperationNameValues.INVOKE_AGENT.value + ) + + name = self._agent_name_override or getattr(span_data, "name", None) + if name: + attributes[GenAI.GEN_AI_AGENT_NAME] = name + output_type = getattr(span_data, "output_type", None) + if output_type: + attributes[GenAI.GEN_AI_OUTPUT_TYPE] = output_type + + return attributes + + def _attributes_from_agent_creation( + self, span_data: Any + ) -> dict[str, Any]: + attributes = self._base_attributes() + attributes[GenAI.GEN_AI_OPERATION_NAME] = ( + GenAI.GenAiOperationNameValues.CREATE_AGENT.value + ) + + name = self._agent_name_override or getattr(span_data, "name", None) + if name: + attributes[GenAI.GEN_AI_AGENT_NAME] = name + description = getattr(span_data, "description", None) + if description: + attributes[GenAI.GEN_AI_AGENT_DESCRIPTION] = description + agent_id = getattr(span_data, "agent_id", None) or getattr( + span_data, "id", None + ) + if agent_id: + attributes[GenAI.GEN_AI_AGENT_ID] = agent_id + model = getattr(span_data, "model", None) + if model: + attributes[GenAI.GEN_AI_REQUEST_MODEL] = model + + return attributes + + def _attributes_from_function(self, span_data: Any) -> dict[str, Any]: + attributes = self._base_attributes() + attributes[GenAI.GEN_AI_OPERATION_NAME] = ( + GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value + ) + + name = getattr(span_data, "name", None) + if name: + attributes[GenAI.GEN_AI_TOOL_NAME] = name + attributes[GenAI.GEN_AI_TOOL_TYPE] = "function" + + return attributes + + def _attributes_from_generic(self, span_data: Any) -> dict[str, Any]: + attributes = self._base_attributes() + attributes[GenAI.GEN_AI_OPERATION_NAME] = self._operation_name( + span_data + ) + return attributes + + def _attributes_for_span(self, span_data: Any) -> dict[str, Any]: + span_type = getattr(span_data, "type", None) + if span_type == SPAN_TYPE_GENERATION: + return self._attributes_from_generation(span_data) + if span_type == SPAN_TYPE_RESPONSE: + return self._attributes_from_response(span_data) + if span_type == SPAN_TYPE_AGENT: + operation = getattr(span_data, "operation", None) + if isinstance(operation, str) and operation.strip().lower() in { + "create", + "create_agent", + }: + return self._attributes_from_agent_creation(span_data) + return self._attributes_from_agent(span_data) + if span_type == SPAN_TYPE_AGENT_CREATION: + return self._attributes_from_agent_creation(span_data) + if span_type == SPAN_TYPE_FUNCTION: + return self._attributes_from_function(span_data) + if span_type in { + SPAN_TYPE_GUARDRAIL, + SPAN_TYPE_HANDOFF, + SPAN_TYPE_SPEECH_GROUP, + SPAN_TYPE_SPEECH, + SPAN_TYPE_TRANSCRIPTION, + SPAN_TYPE_MCP_TOOLS, + }: + return self._attributes_from_generic(span_data) + return self._base_attributes() + + def on_trace_start(self, trace: AgentsTrace) -> None: + attributes = self._base_attributes() + start_time = ( + _parse_iso8601(getattr(trace, "started_at", None)) or time_ns() + ) + + with self._lock: + span = self._tracer.start_span( + name=trace.name, + kind=SpanKind.SERVER, + attributes=attributes, + start_time=start_time, + ) + self._root_spans[trace.trace_id] = span + + def on_trace_end(self, trace: AgentsTrace) -> None: + end_time = _parse_iso8601(getattr(trace, "ended_at", None)) + + with self._lock: + span = self._root_spans.pop(trace.trace_id, None) + + if span: + span.end(end_time=end_time) + + def on_span_start(self, span: AgentsSpan[Any]) -> None: + span_data = span.span_data + start_time = _parse_iso8601(span.started_at) + attributes = self._attributes_for_span(span_data) + operation = attributes.get(GenAI.GEN_AI_OPERATION_NAME, "operation") + name = self._span_name(operation, attributes) + kind = self._span_kind(span_data) + + with self._lock: + parent_span = None + if span.parent_id and span.parent_id in self._spans: + parent_span = self._spans[span.parent_id].span + elif span.trace_id in self._root_spans: + parent_span = self._root_spans[span.trace_id] + + context = set_span_in_context(parent_span) if parent_span else None + otel_span = self._tracer.start_span( + name=name, + kind=kind, + attributes=attributes, + start_time=start_time, + context=context, + ) + self._spans[span.span_id] = _SpanContext(span=otel_span, kind=kind) + + def on_span_end(self, span: AgentsSpan[Any]) -> None: + end_time = _parse_iso8601(span.ended_at) + + with self._lock: + context = self._spans.pop(span.span_id, None) + + if context is None: + return + + otel_span = context.span + if otel_span.is_recording(): + attributes = self._attributes_for_span(span.span_data) + for key, value in attributes.items(): + otel_span.set_attribute(key, value) + + error = span.error + if error: + description = error.get("message") or "" + otel_span.set_status(Status(StatusCode.ERROR, description)) + else: + otel_span.set_status(Status(StatusCode.OK)) + + otel_span.end(end_time=end_time) + + def shutdown(self) -> None: + with self._lock: + spans = list(self._spans.values()) + self._spans.clear() + roots = list(self._root_spans.values()) + self._root_spans.clear() + + for context in spans: + context.span.set_status(Status(StatusCode.ERROR, "shutdown")) + context.span.end() + + for root in roots: + root.set_status(Status(StatusCode.ERROR, "shutdown")) + root.end() + + def force_flush(self) -> None: + # no batching + return + + +__all__ = ["_OpenAIAgentsSpanProcessor"] diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/conftest.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/conftest.py new file mode 100644 index 0000000000..f652ad6ddc --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/conftest.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +import sys +from pathlib import Path + +TESTS_ROOT = Path(__file__).resolve().parent +GENAI_ROOT = TESTS_ROOT.parent +REPO_ROOT = GENAI_ROOT.parent +PROJECT_ROOT = REPO_ROOT.parent + +for path in ( + PROJECT_ROOT / "opentelemetry-instrumentation" / "src", + GENAI_ROOT / "src", + REPO_ROOT / "openai_agents_lib", + REPO_ROOT / "openai_lib", + TESTS_ROOT / "stubs", +): + path_str = str(path) + if path_str not in sys.path: + sys.path.insert(0, path_str) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/stubs/agents/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/stubs/agents/__init__.py new file mode 100644 index 0000000000..7804f9c08e --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/stubs/agents/__init__.py @@ -0,0 +1 @@ +# Stub package for tests diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/stubs/agents/tracing/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/stubs/agents/tracing/__init__.py new file mode 100644 index 0000000000..59d54ddaf8 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/stubs/agents/tracing/__init__.py @@ -0,0 +1,230 @@ +from __future__ import annotations + +from contextlib import contextmanager +from dataclasses import dataclass +from itertools import count +from typing import Any, Mapping, Sequence + +from .processor_interface import TracingProcessor +from .spans import Span +from .traces import Trace + +SPAN_TYPE_AGENT = "agent" +SPAN_TYPE_FUNCTION = "function" +SPAN_TYPE_GENERATION = "generation" +SPAN_TYPE_RESPONSE = "response" + +__all__ = [ + "TraceProvider", + "get_trace_provider", + "set_trace_processors", + "trace", + "agent_span", + "generation_span", + "function_span", + "response_span", + "AgentSpanData", + "GenerationSpanData", + "FunctionSpanData", + "ResponseSpanData", +] + + +@dataclass +class AgentSpanData: + name: str | None = None + tools: list[str] | None = None + output_type: str | None = None + description: str | None = None + agent_id: str | None = None + model: str | None = None + operation: str | None = None + + @property + def type(self) -> str: + return SPAN_TYPE_AGENT + + +@dataclass +class FunctionSpanData: + name: str | None = None + input: Any = None + output: Any = None + + @property + def type(self) -> str: + return SPAN_TYPE_FUNCTION + + +@dataclass +class GenerationSpanData: + input: Sequence[Mapping[str, Any]] | None = None + output: Sequence[Mapping[str, Any]] | None = None + model: str | None = None + model_config: Mapping[str, Any] | None = None + usage: Mapping[str, Any] | None = None + + @property + def type(self) -> str: + return SPAN_TYPE_GENERATION + + +@dataclass +class ResponseSpanData: + response: Any = None + + @property + def type(self) -> str: + return SPAN_TYPE_RESPONSE + + +class _ProcessorFanout(TracingProcessor): + def __init__(self) -> None: + self._processors: list[TracingProcessor] = [] + + def add_tracing_processor(self, processor: TracingProcessor) -> None: + self._processors.append(processor) + + def set_processors(self, processors: list[TracingProcessor]) -> None: + self._processors = list(processors) + + def on_trace_start(self, trace: Trace) -> None: + for processor in list(self._processors): + processor.on_trace_start(trace) + + def on_trace_end(self, trace: Trace) -> None: + for processor in list(self._processors): + processor.on_trace_end(trace) + + def on_span_start(self, span: Span) -> None: + for processor in list(self._processors): + processor.on_span_start(span) + + def on_span_end(self, span: Span) -> None: + for processor in list(self._processors): + processor.on_span_end(span) + + def shutdown(self) -> None: + for processor in list(self._processors): + processor.shutdown() + + def force_flush(self) -> None: + for processor in list(self._processors): + processor.force_flush() + + +class TraceProvider: + def __init__(self) -> None: + self._multi_processor = _ProcessorFanout() + self._ids = count(1) + + def register_processor(self, processor: TracingProcessor) -> None: + self._multi_processor.add_tracing_processor(processor) + + def set_processors(self, processors: list[TracingProcessor]) -> None: + self._multi_processor.set_processors(processors) + + def create_trace( + self, + name: str, + trace_id: str | None = None, + group_id: str | None = None, + metadata: Mapping[str, Any] | None = None, + disabled: bool = False, + ) -> Trace: + trace_id = trace_id or f"trace_{next(self._ids)}" + return Trace(name, trace_id, self._multi_processor) + + def create_span( + self, + span_data: Any, + span_id: str | None = None, + parent: Trace | Span | None = None, + disabled: bool = False, + ) -> Span: + span_id = span_id or f"span_{next(self._ids)}" + if isinstance(parent, Span): + trace_id = parent.trace_id + parent_id = parent.span_id + elif isinstance(parent, Trace): + trace_id = parent.trace_id + parent_id = None + else: + trace_id = f"trace_{next(self._ids)}" + parent_id = None + return Span( + trace_id, span_id, span_data, parent_id, self._multi_processor + ) + + def shutdown(self) -> None: + self._multi_processor.shutdown() + + +_PROVIDER = TraceProvider() +_CURRENT_TRACE: Trace | None = None + + +def get_trace_provider() -> TraceProvider: + return _PROVIDER + + +def set_trace_processors(processors: list[TracingProcessor]) -> None: + _PROVIDER.set_processors(processors) + + +@contextmanager +def trace(name: str, **kwargs: Any): + global _CURRENT_TRACE + trace_obj = _PROVIDER.create_trace(name, **kwargs) + previous = _CURRENT_TRACE + _CURRENT_TRACE = trace_obj + trace_obj.start() + try: + yield trace_obj + finally: + trace_obj.finish() + _CURRENT_TRACE = previous + + +@contextmanager +def generation_span(**kwargs: Any): + data = GenerationSpanData(**kwargs) + span = _PROVIDER.create_span(data, parent=_CURRENT_TRACE) + span.start() + try: + yield span + finally: + span.finish() + + +@contextmanager +def agent_span(**kwargs: Any): + data = AgentSpanData(**kwargs) + span = _PROVIDER.create_span(data, parent=_CURRENT_TRACE) + span.start() + try: + yield span + finally: + span.finish() + + +@contextmanager +def function_span(**kwargs: Any): + data = FunctionSpanData(**kwargs) + span = _PROVIDER.create_span(data, parent=_CURRENT_TRACE) + span.start() + try: + yield span + finally: + span.finish() + + +@contextmanager +def response_span(**kwargs: Any): + data = ResponseSpanData(**kwargs) + span = _PROVIDER.create_span(data, parent=_CURRENT_TRACE) + span.start() + try: + yield span + finally: + span.finish() diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/stubs/agents/tracing/processor_interface.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/stubs/agents/tracing/processor_interface.py new file mode 100644 index 0000000000..46455551b6 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/stubs/agents/tracing/processor_interface.py @@ -0,0 +1,21 @@ +from __future__ import annotations + + +class TracingProcessor: + def on_trace_start(self, trace): # pragma: no cover - stub + pass + + def on_trace_end(self, trace): # pragma: no cover - stub + pass + + def on_span_start(self, span): # pragma: no cover - stub + pass + + def on_span_end(self, span): # pragma: no cover - stub + pass + + def shutdown(self) -> None: # pragma: no cover - stub + pass + + def force_flush(self) -> None: # pragma: no cover - stub + pass diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/stubs/agents/tracing/spans.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/stubs/agents/tracing/spans.py new file mode 100644 index 0000000000..5b7335a650 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/stubs/agents/tracing/spans.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any + + +class Span: + def __init__( + self, + trace_id: str, + span_id: str, + span_data: Any, + parent_id: str | None, + processor, + ) -> None: + self.trace_id = trace_id + self.span_id = span_id + self.span_data = span_data + self.parent_id = parent_id + self.started_at: str | None = None + self.ended_at: str | None = None + self.error = None + self._processor = processor + + def start(self) -> None: + if self.started_at is not None: + return + self.started_at = datetime.utcnow().isoformat() + "Z" + self._processor.on_span_start(self) + + def finish(self) -> None: + if self.ended_at is not None: + return + self.ended_at = datetime.utcnow().isoformat() + "Z" + self._processor.on_span_end(self) + + def __enter__(self) -> "Span": + self.start() + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.finish() diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/stubs/agents/tracing/traces.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/stubs/agents/tracing/traces.py new file mode 100644 index 0000000000..895c0e3e76 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/stubs/agents/tracing/traces.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +from datetime import datetime + + +class Trace: + def __init__(self, name: str, trace_id: str, processor) -> None: + self.name = name + self.trace_id = trace_id + self._processor = processor + self.started_at: str | None = None + self.ended_at: str | None = None + + def start(self) -> None: + if self.started_at is not None: + return + self.started_at = datetime.utcnow().isoformat() + "Z" + self._processor.on_trace_start(self) + + def finish(self) -> None: + if self.ended_at is not None: + return + self.ended_at = datetime.utcnow().isoformat() + "Z" + self._processor.on_trace_end(self) + + def __enter__(self) -> "Trace": + self.start() + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.finish() diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/stubs/opentelemetry/instrumentation/instrumentor.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/stubs/opentelemetry/instrumentation/instrumentor.py new file mode 100644 index 0000000000..c64d31de90 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/stubs/opentelemetry/instrumentation/instrumentor.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +from typing import Collection + + +class BaseInstrumentor: + def __init__(self) -> None: + pass + + def instrument(self, **kwargs) -> None: + self._instrument(**kwargs) + + def uninstrument(self, **kwargs) -> None: + self._uninstrument(**kwargs) + + # Subclasses override + def _instrument(self, **kwargs) -> None: # pragma: no cover - stub + raise NotImplementedError + + def _uninstrument(self, **kwargs) -> None: # pragma: no cover - stub + raise NotImplementedError + + def instrumentation_dependencies( + self, + ) -> Collection[str]: # pragma: no cover + return [] diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/test_tracer.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/test_tracer.py new file mode 100644 index 0000000000..21ffd86ca3 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents/tests/test_tracer.py @@ -0,0 +1,297 @@ +from __future__ import annotations + +import sys +from pathlib import Path + +TESTS_ROOT = Path(__file__).resolve().parent +stub_path = TESTS_ROOT / "stubs" +if str(stub_path) not in sys.path: + sys.path.insert(0, str(stub_path)) + +sys.modules.pop("agents", None) +sys.modules.pop("agents.tracing", None) + +from agents.tracing import ( # noqa: E402 + agent_span, + function_span, + generation_span, + response_span, + set_trace_processors, + trace, +) + +from opentelemetry.instrumentation.openai_agents import ( # noqa: E402 + OpenAIAgentsInstrumentor, +) +from opentelemetry.sdk.trace import TracerProvider # noqa: E402 + +try: + from opentelemetry.sdk.trace.export import ( # type: ignore[attr-defined] + InMemorySpanExporter, + SimpleSpanProcessor, + ) +except ImportError: # pragma: no cover - support older/newer SDK layouts + from opentelemetry.sdk.trace.export import ( + SimpleSpanProcessor, # noqa: E402 + ) + from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( # noqa: E402 + InMemorySpanExporter, + ) +from opentelemetry.semconv._incubating.attributes import ( # noqa: E402 + gen_ai_attributes as GenAI, +) +from opentelemetry.semconv._incubating.attributes import ( # noqa: E402 + server_attributes as ServerAttributes, +) +from opentelemetry.trace import SpanKind # noqa: E402 + + +def _instrument_with_provider(**instrument_kwargs): + set_trace_processors([]) + provider = TracerProvider() + exporter = InMemorySpanExporter() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + + instrumentor = OpenAIAgentsInstrumentor() + instrumentor.instrument(tracer_provider=provider, **instrument_kwargs) + + return instrumentor, exporter + + +def test_generation_span_creates_client_span(): + instrumentor, exporter = _instrument_with_provider() + + try: + with trace("workflow"): + with generation_span( + input=[{"role": "user", "content": "hi"}], + model="gpt-4o-mini", + model_config={ + "temperature": 0.2, + "base_url": "https://api.openai.com", + }, + usage={"input_tokens": 12, "output_tokens": 3}, + ): + pass + + spans = exporter.get_finished_spans() + client_spans = [span for span in spans if span.kind is SpanKind.CLIENT] + server_spans = [span for span in spans if span.kind is SpanKind.SERVER] + + assert len(server_spans) == 1 + server_span = server_spans[0] + assert server_span.name == "workflow" + assert server_span.attributes["gen_ai.provider.name"] == "openai" + assert client_spans + client_span = next(iter(client_spans)) + + assert client_span.attributes["gen_ai.provider.name"] == "openai" + assert client_span.attributes[GenAI.GEN_AI_OPERATION_NAME] == "chat" + assert ( + client_span.attributes[GenAI.GEN_AI_REQUEST_MODEL] == "gpt-4o-mini" + ) + assert client_span.name == "chat gpt-4o-mini" + assert client_span.attributes[GenAI.GEN_AI_USAGE_INPUT_TOKENS] == 12 + assert client_span.attributes[GenAI.GEN_AI_USAGE_OUTPUT_TOKENS] == 3 + assert ( + client_span.attributes[ServerAttributes.SERVER_ADDRESS] + == "api.openai.com" + ) + finally: + instrumentor.uninstrument() + exporter.clear() + + +def test_generation_span_without_roles_uses_text_completion(): + instrumentor, exporter = _instrument_with_provider() + + try: + with trace("workflow"): + with generation_span( + input=[{"content": "tell me a joke"}], + model="gpt-4o-mini", + model_config={"temperature": 0.7}, + ): + pass + + spans = exporter.get_finished_spans() + completion_span = next( + span + for span in spans + if span.attributes[GenAI.GEN_AI_OPERATION_NAME] + == GenAI.GenAiOperationNameValues.TEXT_COMPLETION.value + ) + assert completion_span.kind is SpanKind.CLIENT + server_spans = [span for span in spans if span.kind is SpanKind.SERVER] + assert len(server_spans) == 1 + assert server_spans[0].name == "workflow" + assert server_spans[0].attributes["gen_ai.provider.name"] == "openai" + assert [span for span in spans if span.kind is SpanKind.CLIENT] + + assert completion_span.kind is SpanKind.CLIENT + assert completion_span.name == "text_completion gpt-4o-mini" + assert ( + completion_span.attributes[GenAI.GEN_AI_REQUEST_MODEL] + == "gpt-4o-mini" + ) + finally: + instrumentor.uninstrument() + exporter.clear() + + +def test_function_span_records_tool_attributes(): + instrumentor, exporter = _instrument_with_provider() + + try: + with trace("workflow"): + with function_span( + name="fetch_weather", input='{"city": "Paris"}' + ): + pass + + spans = exporter.get_finished_spans() + tool_span = next( + span for span in spans if span.kind is SpanKind.INTERNAL + ) + + server_spans = [span for span in spans if span.kind is SpanKind.SERVER] + assert len(server_spans) == 1 + assert server_spans[0].name == "workflow" + assert server_spans[0].attributes["gen_ai.provider.name"] == "openai" + + assert ( + tool_span.attributes[GenAI.GEN_AI_OPERATION_NAME] == "execute_tool" + ) + assert tool_span.attributes[GenAI.GEN_AI_TOOL_NAME] == "fetch_weather" + assert tool_span.attributes[GenAI.GEN_AI_TOOL_TYPE] == "function" + assert tool_span.attributes["gen_ai.provider.name"] == "openai" + finally: + instrumentor.uninstrument() + exporter.clear() + + +def test_agent_create_span_records_attributes(): + instrumentor, exporter = _instrument_with_provider() + + try: + with trace("workflow"): + with agent_span( + operation="create", + name="support_bot", + description="Answers support questions", + agent_id="agt_123", + model="gpt-4o-mini", + ): + pass + + spans = exporter.get_finished_spans() + create_span = next( + span + for span in spans + if span.attributes[GenAI.GEN_AI_OPERATION_NAME] + == GenAI.GenAiOperationNameValues.CREATE_AGENT.value + ) + server_spans = [span for span in spans if span.kind is SpanKind.SERVER] + assert len(server_spans) == 1 + assert server_spans[0].name == "workflow" + assert server_spans[0].attributes["gen_ai.provider.name"] == "openai" + assert [span for span in spans if span.kind is SpanKind.CLIENT] + + assert create_span.kind is SpanKind.CLIENT + assert create_span.name == "create_agent support_bot" + assert create_span.attributes["gen_ai.provider.name"] == "openai" + assert create_span.attributes[GenAI.GEN_AI_AGENT_NAME] == "support_bot" + assert ( + create_span.attributes[GenAI.GEN_AI_AGENT_DESCRIPTION] + == "Answers support questions" + ) + assert create_span.attributes[GenAI.GEN_AI_AGENT_ID] == "agt_123" + assert ( + create_span.attributes[GenAI.GEN_AI_REQUEST_MODEL] == "gpt-4o-mini" + ) + finally: + instrumentor.uninstrument() + exporter.clear() + + +def test_agent_name_override_applied_to_agent_spans(): + instrumentor, exporter = _instrument_with_provider( + agent_name="Travel Concierge" + ) + + try: + with trace("workflow"): + with agent_span(operation="invoke", name="support_bot"): + pass + + spans = exporter.get_finished_spans() + agent_span_record = next( + span + for span in spans + if span.attributes[GenAI.GEN_AI_OPERATION_NAME] + == GenAI.GenAiOperationNameValues.INVOKE_AGENT.value + ) + server_spans = [span for span in spans if span.kind is SpanKind.SERVER] + assert len(server_spans) == 1 + assert server_spans[0].name == "workflow" + assert server_spans[0].attributes["gen_ai.provider.name"] == "openai" + assert [span for span in spans if span.kind is SpanKind.CLIENT] + + assert agent_span_record.kind is SpanKind.CLIENT + assert agent_span_record.name == "invoke_agent Travel Concierge" + assert ( + agent_span_record.attributes[GenAI.GEN_AI_AGENT_NAME] + == "Travel Concierge" + ) + finally: + instrumentor.uninstrument() + exporter.clear() + + +def test_response_span_records_response_attributes(): + instrumentor, exporter = _instrument_with_provider() + + class _Usage: + def __init__(self, input_tokens: int, output_tokens: int) -> None: + self.input_tokens = input_tokens + self.output_tokens = output_tokens + + class _Response: + def __init__(self) -> None: + self.id = "resp-123" + self.model = "gpt-4o-mini" + self.usage = _Usage(42, 9) + self.output = [{"finish_reason": "stop"}] + + try: + with trace("workflow"): + with response_span(response=_Response()): + pass + + spans = exporter.get_finished_spans() + response = next( + span + for span in spans + if span.attributes[GenAI.GEN_AI_OPERATION_NAME] + == GenAI.GenAiOperationNameValues.CHAT.value + ) + + assert response.kind is SpanKind.CLIENT + assert response.name == "chat gpt-4o-mini" + assert response.attributes["gen_ai.provider.name"] == "openai" + assert response.attributes[GenAI.GEN_AI_RESPONSE_ID] == "resp-123" + assert ( + response.attributes[GenAI.GEN_AI_RESPONSE_MODEL] == "gpt-4o-mini" + ) + assert response.attributes[GenAI.GEN_AI_USAGE_INPUT_TOKENS] == 42 + assert response.attributes[GenAI.GEN_AI_USAGE_OUTPUT_TOKENS] == 9 + assert response.attributes[GenAI.GEN_AI_RESPONSE_FINISH_REASONS] == ( + "stop", + ) + server_spans = [span for span in spans if span.kind is SpanKind.SERVER] + assert len(server_spans) == 1 + assert server_spans[0].name == "workflow" + assert server_spans[0].attributes["gen_ai.provider.name"] == "openai" + finally: + instrumentor.uninstrument() + exporter.clear()