diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/CHANGELOG.md b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/CHANGELOG.md index d4fed08923..d4a9abde6d 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/CHANGELOG.md +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/CHANGELOG.md @@ -12,3 +12,5 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3805](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3805)) - Implement OpenAI Agents span processing aligned with GenAI semantic conventions. ([#3817](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3817)) +- Input and output according to GenAI spec. + ([#3824](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3824)) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/examples/content-capture/.env.example b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/examples/content-capture/.env.example new file mode 100644 index 0000000000..97060c4e4b --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/examples/content-capture/.env.example @@ -0,0 +1,8 @@ +# Copy to .env and add values before running the sample. +# Required for OpenAI client (only used if you swap in a real OpenAI call) +OPENAI_API_KEY= + +# Optional overrides for span attributes / exporters +OTEL_SERVICE_NAME=openai-agents-content-capture-demo +OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +OTEL_EXPORTER_OTLP_PROTOCOL=grpc diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/examples/content-capture/README.md b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/examples/content-capture/README.md new file mode 100644 index 0000000000..5ab7ae8302 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/examples/content-capture/README.md @@ -0,0 +1,42 @@ +# OpenAI Agents Content Capture Demo + +This example exercises the `OpenAIAgentsInstrumentor` with message content +capture enabled, illustrating how prompts, responses, and tool payloads are +recorded on spans and span events. + +> The demo uses the local tracing utilities from the `openai-agents` +> package—no outbound API calls are made. + +## Prerequisites + +1. Activate the repository virtual environment: + + ```bash + source ../../.venv/bin/activate + ``` + +2. Copy `.env.example` to `.env` and provide any overrides you need (for example, + setting `OTEL_EXPORTER_OTLP_ENDPOINT`). +3. Ensure `openai-agents` is installed in the environment (it is included in + the shared development venv for this repository). + +## Run the demo + +```bash +python main.py +``` + +The script will: + +- Configure the OpenTelemetry SDK with an OTLP exporter so spans reach your collector. +- Instrument the OpenAI Agents tracing hooks with content capture enabled. +- Simulate an agent invocation that performs a generation and a tool call. +- Print the resulting spans, attributes, and events (including JSON-encoded + prompts and responses) to stdout. + +## Customisation tips + +- Set `OTEL_SERVICE_NAME` before running to override the default service name. +- Adjust the OTLP exporter configuration (endpoint, protocol) through `.env`. +- Modify the prompts, tool payloads, or add additional spans in `run_workflow` + to explore different content capture scenarios. diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/examples/content-capture/main.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/examples/content-capture/main.py new file mode 100644 index 0000000000..23afcb5f50 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/examples/content-capture/main.py @@ -0,0 +1,122 @@ +""" +Content capture demo for the OpenAI Agents instrumentation. + +This script spins up the instrumentation with message capture enabled and +simulates an agent invocation plus a tool call using the tracing helpers from +the ``openai-agents`` package. Spans are exported to the console so you can +inspect captured prompts, responses, and tool payloads without making any +OpenAI API calls. +""" + +from __future__ import annotations + +import json +import os +from typing import Any + +from agents.tracing import agent_span, function_span, generation_span, trace +from dotenv import load_dotenv + +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( + OTLPSpanExporter, +) +from opentelemetry.instrumentation.openai_agents import ( + OpenAIAgentsInstrumentor, +) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor + +load_dotenv() # take environment variables from .env. + + +def configure_tracing() -> None: + """Configure a tracer provider that exports spans via OTLP.""" + resource = Resource.create( + { + "service.name": os.environ.get( + "OTEL_SERVICE_NAME", "openai-agents-content-capture-demo" + ) + } + ) + provider = TracerProvider(resource=resource) + provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) + + # Instrument with explicit content capture mode to ensure prompts/responses are recorded. + OpenAIAgentsInstrumentor().instrument( + tracer_provider=provider, + capture_message_content="span_and_event", + system="openai", + agent_name="Travel Concierge", + base_url="https://api.openai.com/v1", + ) + + +def dump(title: str, payload: Any) -> None: + """Pretty-print helper used to show intermediate context.""" + print(f"\n=== {title} ===") + print(json.dumps(payload, indent=2)) + + +def run_workflow() -> None: + """Simulate an agent workflow with a generation and a tool invocation.""" + itinerary_prompt = [ + {"role": "system", "content": "Plan high level travel itineraries."}, + { + "role": "user", + "content": "I'm visiting Paris for 3 days in November.", + }, + ] + + tool_args = {"city": "Paris", "date": "2025-11-12"} + tool_result = { + "forecast": "Mostly sunny, highs 15°C", + "packing_tips": ["light jacket", "comfortable shoes"], + } + + with trace("travel-booking-workflow"): + with agent_span(name="travel_planner") as agent: + dump( + "Agent span started", + {"span_id": agent.span_id, "trace_id": agent.trace_id}, + ) + + with generation_span( + input=itinerary_prompt, + output=[ + { + "role": "assistant", + "content": ( + "Day 1 visit the Louvre, Day 2 tour Versailles, " + "Day 3 explore Montmartre." + ), + } + ], + model="gpt-4o-mini", + usage={ + "input_tokens": 128, + "output_tokens": 96, + "total_tokens": 224, + }, + ): + pass + + with function_span( + name="fetch_weather", + input=json.dumps(tool_args), + output=tool_result, + ): + pass + + print( + "\nWorkflow complete – spans exported to the configured OTLP endpoint." + ) + + +def main() -> None: + configure_tracing() + run_workflow() + + +if __name__ == "__main__": + main() diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/examples/manual/.env.example b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/examples/manual/.env.example index 84c2f28c7f..5a6d8779d2 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/examples/manual/.env.example +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/examples/manual/.env.example @@ -1,11 +1,5 @@ -# Update this with your real OpenAI API key -OPENAI_API_KEY=sk-YOUR_API_KEY - -# Uncomment and adjust if you use a non-default OTLP collector endpoint -# OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 -# OTEL_EXPORTER_OTLP_PROTOCOL=grpc - -OTEL_SERVICE_NAME=opentelemetry-python-openai-agents-manual - -# Optionally override the agent name reported on spans -# OTEL_GENAI_AGENT_NAME=Travel Concierge +# Copy to .env and add real values before running main.py +OPENAI_API_KEY= +OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +OTEL_EXPORTER_OTLP_PROTOCOL=grpc +OTEL_SERVICE_NAME=openai-agents-manual-demo diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/examples/manual/README.rst b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/examples/manual/README.rst index 1f3be9f4de..18abec481c 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/examples/manual/README.rst +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/examples/manual/README.rst @@ -22,7 +22,7 @@ Setup python3 -m venv .venv source .venv/bin/activate pip install "python-dotenv[cli]" - pip install -r requirements.txt + uv pip install -r requirements.txt --prerelease=allow Run --- @@ -34,6 +34,8 @@ are applied: dotenv run -- python main.py +Ensure ``OPENAI_API_KEY`` is present in your environment (or ``.env`` file); the OpenAI client raises ``OpenAIError`` if the key is missing. + The script automatically loads environment variables from ``.env`` so running ``python main.py`` directly also works if the shell already has the required values exported. diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/examples/zero-code/README.rst b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/examples/zero-code/README.rst index 75a9ff5385..e2a76b4ea7 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/examples/zero-code/README.rst +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/examples/zero-code/README.rst @@ -22,7 +22,7 @@ Setup python3 -m venv .venv source .venv/bin/activate pip install "python-dotenv[cli]" - pip install -r requirements.txt + uv pip install -r requirements.txt --prerelease=allow Run --- @@ -34,6 +34,8 @@ instrumentation is activated automatically: dotenv run -- opentelemetry-instrument python main.py +Ensure ``OPENAI_API_KEY`` is set in your shell or `.env`; the OpenAI client raises ``OpenAIError`` if the key is missing. + Because ``main.py`` invokes ``load_dotenv``, running ``python main.py`` directly also works when the required environment variables are already exported. diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/pyproject.toml b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/pyproject.toml index 3f72413c32..6d692a4c64 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/pyproject.toml +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/pyproject.toml @@ -26,7 +26,8 @@ classifiers = [ dependencies = [ "opentelemetry-api >= 1.37", "opentelemetry-instrumentation >= 0.58b0", - "opentelemetry-semantic-conventions >= 0.58b0" + "opentelemetry-semantic-conventions >= 0.58b0", + "opentelemetry-util-genai" ] [project.optional-dependencies] diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/__init__.py index cdb4246f11..16e19238c7 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/__init__.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/__init__.py @@ -1,26 +1,11 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - """OpenAI Agents instrumentation for OpenTelemetry.""" from __future__ import annotations +import importlib +import logging import os -from typing import Collection, Protocol - -from agents import tracing -from agents.tracing.processor_interface import TracingProcessor +from typing import Any, Collection from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.semconv._incubating.attributes import ( @@ -30,45 +15,116 @@ from opentelemetry.trace import get_tracer from .package import _instruments -from .span_processor import _OpenAIAgentsSpanProcessor -from .version import __version__ # noqa: F401 +from .span_processor import ( + ContentCaptureMode, + GenAIEvaluationAttributes, + GenAIOperationName, + GenAIOutputType, + GenAIProvider, + GenAISemanticProcessor, + GenAIToolType, +) +__all__ = [ + "OpenAIAgentsInstrumentor", + "GenAIProvider", + "GenAIOperationName", + "GenAIToolType", + "GenAIOutputType", + "GenAIEvaluationAttributes", +] -class _ProcessorHolder(Protocol): - _processors: Collection[TracingProcessor] +logger = logging.getLogger(__name__) +_CONTENT_CAPTURE_ENV = "OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT" +_SYSTEM_OVERRIDE_ENV = "OTEL_INSTRUMENTATION_OPENAI_AGENTS_SYSTEM" +_CAPTURE_CONTENT_ENV = "OTEL_INSTRUMENTATION_OPENAI_AGENTS_CAPTURE_CONTENT" +_CAPTURE_METRICS_ENV = "OTEL_INSTRUMENTATION_OPENAI_AGENTS_CAPTURE_METRICS" -class _TraceProviderLike(Protocol): - _multi_processor: _ProcessorHolder +def _load_tracing_module(): # pragma: no cover - exercised via tests + return importlib.import_module("agents.tracing") -__all__ = ["OpenAIAgentsInstrumentor"] +def _get_registered_processors(provider) -> list: + multi = getattr(provider, "_multi_processor", None) + processors = getattr(multi, "_processors", ()) + return list(processors) -def _resolve_system(_: str | None) -> str: - # OpenAI spans must report provider name "openai" per semantic conventions. - return GenAI.GenAiSystemValues.OPENAI.value +def _resolve_system(value: str | None) -> str: + if not value: + return GenAI.GenAiSystemValues.OPENAI.value -def _get_registered_processors( - provider: _TraceProviderLike, -) -> list[TracingProcessor]: - """Return tracing processors registered on the OpenAI Agents trace provider. + normalized = value.strip().lower() + for member in GenAI.GenAiSystemValues: + if normalized == member.value: + return member.value + if normalized == member.name.lower(): + return member.value + return value - The provider exposes a private `_multi_processor` attribute with a `_processors` - collection that stores the currently registered processors in execution order. - """ - multi = getattr(provider, "_multi_processor", None) - processors = getattr(multi, "_processors", ()) - return list(processors) + +def _resolve_content_mode(value: Any) -> ContentCaptureMode: + if isinstance(value, ContentCaptureMode): + return value + if isinstance(value, bool): + return ( + ContentCaptureMode.SPAN_AND_EVENT + if value + else ContentCaptureMode.NO_CONTENT + ) + + if value is None: + return ContentCaptureMode.SPAN_AND_EVENT + + text = str(value).strip().lower() + if not text: + return ContentCaptureMode.SPAN_AND_EVENT + + mapping = { + "span_only": ContentCaptureMode.SPAN_ONLY, + "span-only": ContentCaptureMode.SPAN_ONLY, + "span": ContentCaptureMode.SPAN_ONLY, + "event_only": ContentCaptureMode.EVENT_ONLY, + "event-only": ContentCaptureMode.EVENT_ONLY, + "event": ContentCaptureMode.EVENT_ONLY, + "span_and_event": ContentCaptureMode.SPAN_AND_EVENT, + "span-and-event": ContentCaptureMode.SPAN_AND_EVENT, + "span_and_events": ContentCaptureMode.SPAN_AND_EVENT, + "all": ContentCaptureMode.SPAN_AND_EVENT, + "true": ContentCaptureMode.SPAN_AND_EVENT, + "1": ContentCaptureMode.SPAN_AND_EVENT, + "yes": ContentCaptureMode.SPAN_AND_EVENT, + "no_content": ContentCaptureMode.NO_CONTENT, + "false": ContentCaptureMode.NO_CONTENT, + "0": ContentCaptureMode.NO_CONTENT, + "no": ContentCaptureMode.NO_CONTENT, + "none": ContentCaptureMode.NO_CONTENT, + } + + return mapping.get(text, ContentCaptureMode.SPAN_AND_EVENT) + + +def _resolve_bool(value: Any, default: bool) -> bool: + if value is None: + return default + if isinstance(value, bool): + return value + text = str(value).strip().lower() + if text in {"true", "1", "yes", "on"}: + return True + if text in {"false", "0", "no", "off"}: + return False + return default class OpenAIAgentsInstrumentor(BaseInstrumentor): - """Instrumentation that bridges OpenAI Agents tracing to OpenTelemetry spans.""" + """Instrumentation that bridges OpenAI Agents tracing to OpenTelemetry.""" def __init__(self) -> None: super().__init__() - self._processor: _OpenAIAgentsSpanProcessor | None = None + self._processor: GenAISemanticProcessor | None = None def _instrument(self, **kwargs) -> None: if self._processor is not None: @@ -82,17 +138,52 @@ def _instrument(self, **kwargs) -> None: schema_url=Schemas.V1_28_0.value, ) - system = _resolve_system(kwargs.get("system")) - agent_name_override = kwargs.get("agent_name") or os.getenv( - "OTEL_GENAI_AGENT_NAME" + system_override = kwargs.get("system") or os.getenv( + _SYSTEM_OVERRIDE_ENV ) - - processor = _OpenAIAgentsSpanProcessor( + system = _resolve_system(system_override) + + content_override = kwargs.get("capture_message_content") + if content_override is None: + content_override = os.getenv(_CONTENT_CAPTURE_ENV) or os.getenv( + _CAPTURE_CONTENT_ENV + ) + content_mode = _resolve_content_mode(content_override) + + metrics_override = kwargs.get("capture_metrics") + if metrics_override is None: + metrics_override = os.getenv(_CAPTURE_METRICS_ENV) + metrics_enabled = _resolve_bool(metrics_override, default=True) + + agent_name = kwargs.get("agent_name") + agent_id = kwargs.get("agent_id") + agent_description = kwargs.get("agent_description") + base_url = kwargs.get("base_url") + server_address = kwargs.get("server_address") + server_port = kwargs.get("server_port") + + processor = GenAISemanticProcessor( tracer=tracer, - system=system, - agent_name_override=agent_name_override, + system_name=system, + include_sensitive_data=content_mode + != ContentCaptureMode.NO_CONTENT, + content_mode=content_mode, + metrics_enabled=metrics_enabled, + agent_name=agent_name, + agent_id=agent_id, + agent_description=agent_description, + base_url=base_url, + server_address=server_address, + server_port=server_port, + agent_name_default="OpenAI Agent", + agent_id_default="agent", + agent_description_default="OpenAI Agents instrumentation", + base_url_default="https://api.openai.com", + server_address_default="api.openai.com", + server_port_default=443, ) + tracing = _load_tracing_module() provider = tracing.get_trace_provider() existing = _get_registered_processors(provider) provider.set_processors([*existing, processor]) @@ -102,13 +193,16 @@ def _uninstrument(self, **kwargs) -> None: if self._processor is None: return + tracing = _load_tracing_module() provider = tracing.get_trace_provider() current = _get_registered_processors(provider) filtered = [proc for proc in current if proc is not self._processor] provider.set_processors(filtered) - self._processor.shutdown() - self._processor = None + try: + self._processor.shutdown() + finally: + self._processor = None def instrumentation_dependencies(self) -> Collection[str]: return _instruments diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/span_processor.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/span_processor.py index 52cc3ac462..68e5dc1f01 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/span_processor.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/src/opentelemetry/instrumentation/openai_agents/span_processor.py @@ -1,513 +1,2186 @@ +""" +GenAI Semantic Convention Trace Processor + +This module implements a custom trace processor that enriches spans with +OpenTelemetry GenAI semantic conventions attributes following the +OpenInference processor pattern. It adds standardized attributes for +generative AI operations using iterator-based attribute extraction. + +References: +- OpenTelemetry GenAI Semantic Conventions: + https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-spans/ +- OpenInference Pattern: https://github.com/Arize-ai/openinference +""" + from __future__ import annotations -from collections.abc import Mapping, Sequence +import importlib +import logging from dataclasses import dataclass -from datetime import datetime -from threading import RLock -from time import time_ns -from typing import Any +from datetime import datetime, timezone +from enum import Enum +from typing import TYPE_CHECKING, Any, Dict, Iterator, Optional, Sequence from urllib.parse import urlparse -try: # pragma: no cover - used when OpenAI Agents is available - from agents.tracing.processor_interface import TracingProcessor - from agents.tracing.spans import Span as AgentsSpan - from agents.tracing.traces import Trace as AgentsTrace -except ImportError: # pragma: no cover - fallback for tests - - class TracingProcessor: # type: ignore[misc] - pass - - AgentsSpan = Any # type: ignore[assignment] - AgentsTrace = Any # type: ignore[assignment] +from opentelemetry.util.genai.utils import gen_ai_json_dumps + +try: + from agents.tracing import Span, Trace, TracingProcessor + from agents.tracing.span_data import ( + AgentSpanData, + FunctionSpanData, + GenerationSpanData, + GuardrailSpanData, + HandoffSpanData, + ResponseSpanData, + SpeechSpanData, + TranscriptionSpanData, + ) +except ModuleNotFoundError: # pragma: no cover - test stubs + tracing_module = importlib.import_module("agents.tracing") + Span = getattr(tracing_module, "Span") + Trace = getattr(tracing_module, "Trace") + TracingProcessor = getattr(tracing_module, "TracingProcessor") + AgentSpanData = getattr(tracing_module, "AgentSpanData", Any) # type: ignore[assignment] + FunctionSpanData = getattr(tracing_module, "FunctionSpanData", Any) # type: ignore[assignment] + GenerationSpanData = getattr(tracing_module, "GenerationSpanData", Any) # type: ignore[assignment] + GuardrailSpanData = getattr(tracing_module, "GuardrailSpanData", Any) # type: ignore[assignment] + HandoffSpanData = getattr(tracing_module, "HandoffSpanData", Any) # type: ignore[assignment] + ResponseSpanData = getattr(tracing_module, "ResponseSpanData", Any) # type: ignore[assignment] + SpeechSpanData = getattr(tracing_module, "SpeechSpanData", Any) # type: ignore[assignment] + TranscriptionSpanData = getattr( + tracing_module, "TranscriptionSpanData", Any + ) # type: ignore[assignment] + +from opentelemetry.context import attach, detach +from opentelemetry.metrics import Histogram, get_meter from opentelemetry.semconv._incubating.attributes import ( - gen_ai_attributes as GenAI, + gen_ai_attributes as GenAIAttributes, ) from opentelemetry.semconv._incubating.attributes import ( server_attributes as ServerAttributes, ) -from opentelemetry.trace import Span, SpanKind, Tracer, set_span_in_context -from opentelemetry.trace.status import Status, StatusCode - -SPAN_TYPE_GENERATION = "generation" -SPAN_TYPE_RESPONSE = "response" -SPAN_TYPE_AGENT = "agent" -SPAN_TYPE_AGENT_CREATION = "agent_creation" -SPAN_TYPE_FUNCTION = "function" -SPAN_TYPE_SPEECH = "speech" -SPAN_TYPE_TRANSCRIPTION = "transcription" -SPAN_TYPE_SPEECH_GROUP = "speech_group" -SPAN_TYPE_GUARDRAIL = "guardrail" -SPAN_TYPE_HANDOFF = "handoff" -SPAN_TYPE_MCP_TOOLS = "mcp_tools" - -_CLIENT_SPAN_TYPES = frozenset( - { - SPAN_TYPE_GENERATION, - SPAN_TYPE_RESPONSE, - SPAN_TYPE_SPEECH, - SPAN_TYPE_TRANSCRIPTION, - SPAN_TYPE_AGENT, - SPAN_TYPE_AGENT_CREATION, +from opentelemetry.trace import Span as OtelSpan +from opentelemetry.trace import ( + SpanKind, + Status, + StatusCode, + Tracer, + set_span_in_context, +) +from opentelemetry.util.types import AttributeValue + +# Import all semantic convention constants +# ---- GenAI semantic convention helpers (embedded from constants.py) ---- + + +def _enum_values(enum_cls) -> dict[str, str]: + """Return mapping of enum member name to value.""" + return {member.name: member.value for member in enum_cls} + + +_PROVIDER_VALUES = _enum_values(GenAIAttributes.GenAiProviderNameValues) + + +class GenAIProvider: + OPENAI = _PROVIDER_VALUES["OPENAI"] + GCP_GEN_AI = _PROVIDER_VALUES["GCP_GEN_AI"] + GCP_VERTEX_AI = _PROVIDER_VALUES["GCP_VERTEX_AI"] + GCP_GEMINI = _PROVIDER_VALUES["GCP_GEMINI"] + ANTHROPIC = _PROVIDER_VALUES["ANTHROPIC"] + COHERE = _PROVIDER_VALUES["COHERE"] + AZURE_AI_INFERENCE = _PROVIDER_VALUES["AZURE_AI_INFERENCE"] + AZURE_AI_OPENAI = _PROVIDER_VALUES["AZURE_AI_OPENAI"] + IBM_WATSONX_AI = _PROVIDER_VALUES["IBM_WATSONX_AI"] + AWS_BEDROCK = _PROVIDER_VALUES["AWS_BEDROCK"] + PERPLEXITY = _PROVIDER_VALUES["PERPLEXITY"] + X_AI = _PROVIDER_VALUES["X_AI"] + DEEPSEEK = _PROVIDER_VALUES["DEEPSEEK"] + GROQ = _PROVIDER_VALUES["GROQ"] + MISTRAL_AI = _PROVIDER_VALUES["MISTRAL_AI"] + + ALL = set(_PROVIDER_VALUES.values()) + + +_OPERATION_VALUES = _enum_values(GenAIAttributes.GenAiOperationNameValues) + + +class GenAIOperationName: + CHAT = _OPERATION_VALUES["CHAT"] + GENERATE_CONTENT = _OPERATION_VALUES["GENERATE_CONTENT"] + TEXT_COMPLETION = _OPERATION_VALUES["TEXT_COMPLETION"] + EMBEDDINGS = _OPERATION_VALUES["EMBEDDINGS"] + CREATE_AGENT = _OPERATION_VALUES["CREATE_AGENT"] + INVOKE_AGENT = _OPERATION_VALUES["INVOKE_AGENT"] + EXECUTE_TOOL = _OPERATION_VALUES["EXECUTE_TOOL"] + # Operations below are not yet covered by the spec but remain for backwards compatibility + TRANSCRIPTION = "transcription" + SPEECH = "speech_generation" + GUARDRAIL = "guardrail_check" + HANDOFF = "agent_handoff" + RESPONSE = "response" # internal aggregator in current processor + + CLASS_FALLBACK = { + "generationspan": CHAT, + "responsespan": RESPONSE, + "functionspan": EXECUTE_TOOL, + "agentspan": INVOKE_AGENT, } + + +_OUTPUT_VALUES = _enum_values(GenAIAttributes.GenAiOutputTypeValues) + + +class GenAIOutputType: + TEXT = _OUTPUT_VALUES["TEXT"] + JSON = _OUTPUT_VALUES["JSON"] + IMAGE = _OUTPUT_VALUES["IMAGE"] + SPEECH = _OUTPUT_VALUES["SPEECH"] + + +class GenAIToolType: + FUNCTION = "function" + EXTENSION = "extension" + DATASTORE = "datastore" + + ALL = {FUNCTION, EXTENSION, DATASTORE} + + +class GenAIEvaluationAttributes: + NAME = "gen_ai.evaluation.name" + SCORE_VALUE = "gen_ai.evaluation.score.value" + SCORE_LABEL = "gen_ai.evaluation.score.label" + EXPLANATION = "gen_ai.evaluation.explanation" + + +def _attr(name: str, fallback: str) -> str: + return getattr(GenAIAttributes, name, fallback) + + +GEN_AI_PROVIDER_NAME = _attr("GEN_AI_PROVIDER_NAME", "gen_ai.provider.name") +GEN_AI_OPERATION_NAME = _attr("GEN_AI_OPERATION_NAME", "gen_ai.operation.name") +GEN_AI_REQUEST_MODEL = _attr("GEN_AI_REQUEST_MODEL", "gen_ai.request.model") +GEN_AI_REQUEST_MAX_TOKENS = _attr( + "GEN_AI_REQUEST_MAX_TOKENS", "gen_ai.request.max_tokens" +) +GEN_AI_REQUEST_TEMPERATURE = _attr( + "GEN_AI_REQUEST_TEMPERATURE", "gen_ai.request.temperature" +) +GEN_AI_REQUEST_TOP_P = _attr("GEN_AI_REQUEST_TOP_P", "gen_ai.request.top_p") +GEN_AI_REQUEST_TOP_K = _attr("GEN_AI_REQUEST_TOP_K", "gen_ai.request.top_k") +GEN_AI_REQUEST_FREQUENCY_PENALTY = _attr( + "GEN_AI_REQUEST_FREQUENCY_PENALTY", "gen_ai.request.frequency_penalty" +) +GEN_AI_REQUEST_PRESENCE_PENALTY = _attr( + "GEN_AI_REQUEST_PRESENCE_PENALTY", "gen_ai.request.presence_penalty" +) +GEN_AI_REQUEST_CHOICE_COUNT = _attr( + "GEN_AI_REQUEST_CHOICE_COUNT", "gen_ai.request.choice.count" +) +GEN_AI_REQUEST_STOP_SEQUENCES = _attr( + "GEN_AI_REQUEST_STOP_SEQUENCES", "gen_ai.request.stop_sequences" +) +GEN_AI_REQUEST_ENCODING_FORMATS = _attr( + "GEN_AI_REQUEST_ENCODING_FORMATS", "gen_ai.request.encoding_formats" +) +GEN_AI_REQUEST_SEED = _attr("GEN_AI_REQUEST_SEED", "gen_ai.request.seed") +GEN_AI_RESPONSE_ID = _attr("GEN_AI_RESPONSE_ID", "gen_ai.response.id") +GEN_AI_RESPONSE_MODEL = _attr("GEN_AI_RESPONSE_MODEL", "gen_ai.response.model") +GEN_AI_RESPONSE_FINISH_REASONS = _attr( + "GEN_AI_RESPONSE_FINISH_REASONS", "gen_ai.response.finish_reasons" +) +GEN_AI_USAGE_INPUT_TOKENS = _attr( + "GEN_AI_USAGE_INPUT_TOKENS", "gen_ai.usage.input_tokens" +) +GEN_AI_USAGE_OUTPUT_TOKENS = _attr( + "GEN_AI_USAGE_OUTPUT_TOKENS", "gen_ai.usage.output_tokens" +) +GEN_AI_CONVERSATION_ID = _attr( + "GEN_AI_CONVERSATION_ID", "gen_ai.conversation.id" +) +GEN_AI_AGENT_ID = _attr("GEN_AI_AGENT_ID", "gen_ai.agent.id") +GEN_AI_AGENT_NAME = _attr("GEN_AI_AGENT_NAME", "gen_ai.agent.name") +GEN_AI_AGENT_DESCRIPTION = _attr( + "GEN_AI_AGENT_DESCRIPTION", "gen_ai.agent.description" ) +GEN_AI_TOOL_NAME = _attr("GEN_AI_TOOL_NAME", "gen_ai.tool.name") +GEN_AI_TOOL_TYPE = _attr("GEN_AI_TOOL_TYPE", "gen_ai.tool.type") +GEN_AI_TOOL_CALL_ID = _attr("GEN_AI_TOOL_CALL_ID", "gen_ai.tool.call.id") +GEN_AI_TOOL_DESCRIPTION = _attr( + "GEN_AI_TOOL_DESCRIPTION", "gen_ai.tool.description" +) +GEN_AI_OUTPUT_TYPE = _attr("GEN_AI_OUTPUT_TYPE", "gen_ai.output.type") +GEN_AI_SYSTEM_INSTRUCTIONS = _attr( + "GEN_AI_SYSTEM_INSTRUCTIONS", "gen_ai.system_instructions" +) +GEN_AI_INPUT_MESSAGES = _attr("GEN_AI_INPUT_MESSAGES", "gen_ai.input.messages") +GEN_AI_OUTPUT_MESSAGES = _attr( + "GEN_AI_OUTPUT_MESSAGES", "gen_ai.output.messages" +) +GEN_AI_DATA_SOURCE_ID = _attr("GEN_AI_DATA_SOURCE_ID", "gen_ai.data_source.id") -_GEN_AI_PROVIDER_NAME = GenAI.GEN_AI_PROVIDER_NAME +# The semantic conventions currently expose multiple usage token attributes; we retain the +# completion/prompt aliases for backwards compatibility where used. +GEN_AI_USAGE_PROMPT_TOKENS = _attr( + "GEN_AI_USAGE_PROMPT_TOKENS", "gen_ai.usage.prompt_tokens" +) +GEN_AI_USAGE_COMPLETION_TOKENS = _attr( + "GEN_AI_USAGE_COMPLETION_TOKENS", "gen_ai.usage.completion_tokens" +) +# Attributes not (yet) defined in the spec retain their literal values. +GEN_AI_TOOL_CALL_ARGUMENTS = "gen_ai.tool.call.arguments" +GEN_AI_TOOL_CALL_RESULT = "gen_ai.tool.call.result" +GEN_AI_TOOL_DEFINITIONS = "gen_ai.tool.definitions" +GEN_AI_ORCHESTRATOR_AGENT_DEFINITIONS = "gen_ai.orchestrator.agent.definitions" +GEN_AI_GUARDRAIL_NAME = "gen_ai.guardrail.name" +GEN_AI_GUARDRAIL_TRIGGERED = "gen_ai.guardrail.triggered" +GEN_AI_HANDOFF_FROM_AGENT = "gen_ai.handoff.from_agent" +GEN_AI_HANDOFF_TO_AGENT = "gen_ai.handoff.to_agent" +GEN_AI_EMBEDDINGS_DIMENSION_COUNT = "gen_ai.embeddings.dimension.count" +GEN_AI_TOKEN_TYPE = _attr("GEN_AI_TOKEN_TYPE", "gen_ai.token.type") -def _parse_iso8601(timestamp: str | None) -> int | None: - """Return nanosecond timestamp for ISO8601 string.""" +# ---- Normalization utilities (embedded from utils.py) ---- - if not timestamp: - return None - try: - if timestamp.endswith("Z"): - timestamp = timestamp[:-1] + "+00:00" - dt = datetime.fromisoformat(timestamp) - except ValueError: +def normalize_provider(provider: Optional[str]) -> Optional[str]: + """Normalize provider name to spec-compliant value.""" + if not provider: return None + p = provider.strip().lower() + if p in GenAIProvider.ALL: + return p + return provider # passthrough if unknown (forward compat) + + +def validate_tool_type(tool_type: Optional[str]) -> str: + """Validate and normalize tool type.""" + if not tool_type: + return GenAIToolType.FUNCTION # default + t = tool_type.strip().lower() + return t if t in GenAIToolType.ALL else GenAIToolType.FUNCTION + + +def normalize_output_type(output_type: Optional[str]) -> str: + """Normalize output type to spec-compliant value.""" + if not output_type: + return GenAIOutputType.TEXT # default + o = output_type.strip().lower() + base_map = { + "json_object": GenAIOutputType.JSON, + "jsonschema": GenAIOutputType.JSON, + "speech_audio": GenAIOutputType.SPEECH, + "audio_speech": GenAIOutputType.SPEECH, + "image_png": GenAIOutputType.IMAGE, + "function_arguments_json": GenAIOutputType.JSON, + "tool_call": GenAIOutputType.JSON, + "transcription_json": GenAIOutputType.JSON, + } + if o in base_map: + return base_map[o] + if o in { + GenAIOutputType.TEXT, + GenAIOutputType.JSON, + GenAIOutputType.IMAGE, + GenAIOutputType.SPEECH, + }: + return o + return GenAIOutputType.TEXT # default for unknown + (normalize_output_type,) + (normalize_provider,) + (validate_tool_type,) + + +if TYPE_CHECKING: + pass + +# Legacy attributes removed + +logger = logging.getLogger(__name__) + +GEN_AI_SYSTEM_KEY = getattr(GenAIAttributes, "GEN_AI_SYSTEM", "gen_ai.system") + + +class ContentCaptureMode(Enum): + """Controls whether sensitive content is recorded on spans, events, or both.""" - return int(dt.timestamp() * 1_000_000_000) + NO_CONTENT = "no_content" + SPAN_ONLY = "span_only" + EVENT_ONLY = "event_only" + SPAN_AND_EVENT = "span_and_event" + + @property + def capture_in_span(self) -> bool: + return self in ( + ContentCaptureMode.SPAN_ONLY, + ContentCaptureMode.SPAN_AND_EVENT, + ) + + @property + def capture_in_event(self) -> bool: + return self in ( + ContentCaptureMode.EVENT_ONLY, + ContentCaptureMode.SPAN_AND_EVENT, + ) -def _extract_server_attributes( - config: Mapping[str, Any] | None, -) -> dict[str, Any]: - if not config: - return {} +@dataclass +class ContentPayload: + """Container for normalized content associated with a span.""" + + input_messages: Optional[list[dict[str, Any]]] = None + output_messages: Optional[list[dict[str, Any]]] = None + system_instructions: Optional[list[dict[str, str]]] = None + tool_arguments: Any = None + tool_result: Any = None + + +def _is_instance_of(value: Any, classes: Any) -> bool: + """Safe isinstance that tolerates typing.Any placeholders.""" + if not isinstance(classes, tuple): + classes = (classes,) + for cls in classes: + try: + if isinstance(value, cls): + return True + except TypeError: + continue + return False - base_url = config.get("base_url") - if not isinstance(base_url, str): - return {} +def _infer_server_attributes(base_url: Optional[str]) -> dict[str, Any]: + """Return server.address / server.port attributes if base_url provided.""" + out: dict[str, Any] = {} + if not base_url: + return out try: parsed = urlparse(base_url) - except ValueError: - return {} + if parsed.hostname: + out[ServerAttributes.SERVER_ADDRESS] = parsed.hostname + if parsed.port: + out[ServerAttributes.SERVER_PORT] = parsed.port + except Exception: + return out + return out - attributes: dict[str, Any] = {} - if parsed.hostname: - attributes[ServerAttributes.SERVER_ADDRESS] = parsed.hostname - if parsed.port: - attributes[ServerAttributes.SERVER_PORT] = parsed.port - return attributes +def safe_json_dumps(obj: Any) -> str: + """Safely convert object to JSON string (fallback to str).""" + try: + return gen_ai_json_dumps(obj) + except (TypeError, ValueError): + return str(obj) -def _looks_like_chat(messages: Sequence[Mapping[str, Any]] | None) -> bool: - if not messages: - return False - for message in messages: - if isinstance(message, Mapping) and message.get("role"): - return True - return False +def _as_utc_nano(dt: datetime) -> int: + """Convert datetime to UTC nanoseconds timestamp.""" + return int(dt.astimezone(timezone.utc).timestamp() * 1_000_000_000) -def _collect_finish_reasons(choices: Sequence[Any] | None) -> list[str]: - reasons: list[str] = [] - if not choices: - return reasons +def _get_span_status(span: Span[Any]) -> Status: + """Get OpenTelemetry span status from agent span.""" + if error := getattr(span, "error", None): + return Status( + status_code=StatusCode.ERROR, + description=f"{error.get('message', '')}: {error.get('data', '')}", + ) + return Status(StatusCode.OK) - for choice in choices: - if isinstance(choice, Mapping): - reason = choice.get("finish_reason") or choice.get("stop_reason") - if reason: - reasons.append(str(reason)) - continue - finish_reason = getattr(choice, "finish_reason", None) - if finish_reason: - reasons.append(str(finish_reason)) +def get_span_name( + operation_name: str, + model: Optional[str] = None, + agent_name: Optional[str] = None, + tool_name: Optional[str] = None, +) -> str: + """Generate spec-compliant span name based on operation type.""" + base_name = operation_name - return reasons + if operation_name in { + GenAIOperationName.CHAT, + GenAIOperationName.TEXT_COMPLETION, + GenAIOperationName.EMBEDDINGS, + GenAIOperationName.TRANSCRIPTION, + GenAIOperationName.SPEECH, + }: + return f"{base_name} {model}" if model else base_name + if operation_name == GenAIOperationName.CREATE_AGENT: + return f"{base_name} {agent_name}" if agent_name else base_name -def _clean_stop_sequences(value: Any) -> Sequence[str] | None: - if value is None: - return None - if isinstance(value, str): - return [value] - if isinstance(value, Sequence): - cleaned: list[str] = [] - for item in value: - if item is None: - continue - cleaned.append(str(item)) - return cleaned if cleaned else None - return None + if operation_name == GenAIOperationName.INVOKE_AGENT: + return f"{base_name} {agent_name}" if agent_name else base_name + if operation_name == GenAIOperationName.EXECUTE_TOOL: + return f"{base_name} {tool_name}" if tool_name else base_name -@dataclass -class _SpanContext: - span: Span - kind: SpanKind + if operation_name == GenAIOperationName.HANDOFF: + return f"{base_name} {agent_name}" if agent_name else base_name + return base_name -class _OpenAIAgentsSpanProcessor(TracingProcessor): - """Convert OpenAI Agents traces into OpenTelemetry spans.""" + +class GenAISemanticProcessor(TracingProcessor): + """Trace processor adding GenAI semantic convention attributes with metrics.""" def __init__( self, - tracer: Tracer, - system: str, - agent_name_override: str | None = None, - ) -> None: + tracer: Optional[Tracer] = None, + system_name: str = "openai", + include_sensitive_data: bool = True, + content_mode: ContentCaptureMode = ContentCaptureMode.SPAN_AND_EVENT, + base_url: Optional[str] = None, + agent_name: Optional[str] = None, + agent_id: Optional[str] = None, + agent_description: Optional[str] = None, + server_address: Optional[str] = None, + server_port: Optional[int] = None, + metrics_enabled: bool = True, + agent_name_default: Optional[str] = None, + agent_id_default: Optional[str] = None, + agent_description_default: Optional[str] = None, + base_url_default: Optional[str] = None, + server_address_default: Optional[str] = None, + server_port_default: Optional[int] = None, + ): + """Initialize processor with metrics support. + + Args: + tracer: Optional OpenTelemetry tracer + system_name: Provider name (openai/azure.ai.inference/etc.) + include_sensitive_data: Include model/tool IO when True + base_url: API endpoint for server.address/port + agent_name: Name of the agent (can be overridden by env var) + agent_id: ID of the agent (can be overridden by env var) + agent_description: Description of the agent (can be overridden by env var) + server_address: Server address (can be overridden by env var or base_url) + server_port: Server port (can be overridden by env var or base_url) + """ self._tracer = tracer - self._system = system - self._agent_name_override = ( - agent_name_override.strip() - if isinstance(agent_name_override, str) - and agent_name_override.strip() - else None + self.system_name = normalize_provider(system_name) or system_name + self._content_mode = content_mode + self.include_sensitive_data = include_sensitive_data and ( + content_mode.capture_in_span or content_mode.capture_in_event + ) + effective_base_url = base_url or base_url_default + self.base_url = effective_base_url + + # Agent information - prefer explicit overrides; otherwise defer to span data + self.agent_name = agent_name + self.agent_id = agent_id + self.agent_description = agent_description + self._agent_name_default = agent_name_default + self._agent_id_default = agent_id_default + self._agent_description_default = agent_description_default + + # Server information - use init parameters, then base_url inference + self.server_address = server_address or server_address_default + resolved_port = ( + server_port if server_port is not None else server_port_default ) - self._root_spans: dict[str, Span] = {} - self._spans: dict[str, _SpanContext] = {} - self._lock = RLock() + self.server_port = resolved_port + + # If server info not provided, try to extract from base_url + if ( + not self.server_address or not self.server_port + ) and effective_base_url: + server_attrs = _infer_server_attributes(effective_base_url) + if not self.server_address: + self.server_address = server_attrs.get( + ServerAttributes.SERVER_ADDRESS + ) + if not self.server_port: + self.server_port = server_attrs.get( + ServerAttributes.SERVER_PORT + ) - def _operation_name(self, span_data: Any) -> str: - span_type = getattr(span_data, "type", None) - explicit_operation = getattr(span_data, "operation", None) - normalized_operation = ( - explicit_operation.strip().lower() - if isinstance(explicit_operation, str) - else None + # Content capture configuration + self._capture_messages = ( + content_mode.capture_in_span or content_mode.capture_in_event + ) + self._capture_system_instructions = True + self._capture_tool_definitions = True + + # Span tracking + self._root_spans: dict[str, OtelSpan] = {} + self._otel_spans: dict[str, OtelSpan] = {} + self._tokens: dict[str, object] = {} + self._span_parents: dict[str, Optional[str]] = {} + self._agent_content: dict[str, Dict[str, list[Any]]] = {} + + # Metrics configuration + self._metrics_enabled = metrics_enabled + self._meter = None + self._duration_histogram: Optional[Histogram] = None + self._token_usage_histogram: Optional[Histogram] = None + if self._metrics_enabled: + self._init_metrics() + + def _get_server_attributes(self) -> dict[str, Any]: + """Get server attributes from configured values.""" + attrs = {} + if self.server_address: + attrs[ServerAttributes.SERVER_ADDRESS] = self.server_address + if self.server_port: + attrs[ServerAttributes.SERVER_PORT] = self.server_port + return attrs + + def _init_metrics(self): + """Initialize metric instruments.""" + self._meter = get_meter( + "opentelemetry.instrumentation.openai_agents", "0.1.0" + ) + + # Operation duration histogram + self._duration_histogram = self._meter.create_histogram( + name="gen_ai.client.operation.duration", + description="GenAI operation duration", + unit="s", ) - if span_type == SPAN_TYPE_GENERATION: - if _looks_like_chat(getattr(span_data, "input", None)): - return GenAI.GenAiOperationNameValues.CHAT.value - return GenAI.GenAiOperationNameValues.TEXT_COMPLETION.value - if span_type == SPAN_TYPE_AGENT: - if normalized_operation in {"create", "create_agent"}: - return GenAI.GenAiOperationNameValues.CREATE_AGENT.value - if normalized_operation in {"invoke", "invoke_agent"}: - return GenAI.GenAiOperationNameValues.INVOKE_AGENT.value - return GenAI.GenAiOperationNameValues.INVOKE_AGENT.value - if span_type == SPAN_TYPE_AGENT_CREATION: - return GenAI.GenAiOperationNameValues.CREATE_AGENT.value - if span_type == SPAN_TYPE_FUNCTION: - return GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value - if span_type == SPAN_TYPE_RESPONSE: - return GenAI.GenAiOperationNameValues.CHAT.value - return span_type or "operation" - - def _span_kind(self, span_data: Any) -> SpanKind: - span_type = getattr(span_data, "type", None) - if span_type in _CLIENT_SPAN_TYPES: - return SpanKind.CLIENT - # Tool invocations (e.g. span type "function") execute inside the agent - # runtime, so there is no remote peer to model; we keep them INTERNAL. - return SpanKind.INTERNAL - def _span_name(self, operation: str, attributes: Mapping[str, Any]) -> str: - model = attributes.get(GenAI.GEN_AI_REQUEST_MODEL) or attributes.get( - GenAI.GEN_AI_RESPONSE_MODEL + # Token usage histogram + self._token_usage_histogram = self._meter.create_histogram( + name="gen_ai.client.token.usage", + description="Number of input and output tokens used", + unit="{token}", ) - agent_name = attributes.get(GenAI.GEN_AI_AGENT_NAME) - tool_name = attributes.get(GenAI.GEN_AI_TOOL_NAME) - if operation in ( - GenAI.GenAiOperationNameValues.CHAT.value, - GenAI.GenAiOperationNameValues.TEXT_COMPLETION.value, - GenAI.GenAiOperationNameValues.GENERATE_CONTENT.value, - GenAI.GenAiOperationNameValues.EMBEDDINGS.value, + def _record_metrics( + self, span: Span[Any], attributes: dict[str, AttributeValue] + ) -> None: + """Record metrics for the span.""" + if not self._metrics_enabled or ( + self._duration_histogram is None + and self._token_usage_histogram is None ): - return f"{operation} {model}" if model else operation - if operation == GenAI.GenAiOperationNameValues.INVOKE_AGENT.value: - return f"{operation} {agent_name}" if agent_name else operation - if operation == GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value: - return f"{operation} {tool_name}" if tool_name else operation - if operation == GenAI.GenAiOperationNameValues.CREATE_AGENT.value: - return f"{operation} {agent_name}" if agent_name else operation - return operation + return + + try: + # Calculate duration + duration = None + if hasattr(span, "started_at") and hasattr(span, "ended_at"): + try: + start = datetime.fromisoformat(span.started_at) + end = datetime.fromisoformat(span.ended_at) + duration = (end - start).total_seconds() + except Exception: + pass + + # Build metric attributes + metric_attrs = { + GEN_AI_PROVIDER_NAME: attributes.get(GEN_AI_PROVIDER_NAME), + GEN_AI_OPERATION_NAME: attributes.get(GEN_AI_OPERATION_NAME), + GEN_AI_REQUEST_MODEL: ( + attributes.get(GEN_AI_REQUEST_MODEL) + or attributes.get(GEN_AI_RESPONSE_MODEL) + ), + ServerAttributes.SERVER_ADDRESS: attributes.get( + ServerAttributes.SERVER_ADDRESS + ), + ServerAttributes.SERVER_PORT: attributes.get( + ServerAttributes.SERVER_PORT + ), + } + + # Add error type if present + if error := getattr(span, "error", None): + error_type = error.get("type") or error.get("name") + if error_type: + metric_attrs["error.type"] = error_type + + # Remove None values + metric_attrs = { + k: v for k, v in metric_attrs.items() if v is not None + } - def _base_attributes(self) -> dict[str, Any]: - return {_GEN_AI_PROVIDER_NAME: self._system} + # Record duration + if duration is not None and self._duration_histogram is not None: + self._duration_histogram.record(duration, metric_attrs) + + # Record token usage + if self._token_usage_histogram: + input_tokens = attributes.get(GEN_AI_USAGE_INPUT_TOKENS) + if isinstance(input_tokens, (int, float)): + token_attrs = dict(metric_attrs) + token_attrs[GEN_AI_TOKEN_TYPE] = "input" + self._token_usage_histogram.record( + input_tokens, token_attrs + ) + + output_tokens = attributes.get(GEN_AI_USAGE_OUTPUT_TOKENS) + if isinstance(output_tokens, (int, float)): + token_attrs = dict(metric_attrs) + token_attrs[GEN_AI_TOKEN_TYPE] = "output" + self._token_usage_histogram.record( + output_tokens, token_attrs + ) + + except Exception as e: + logger.debug("Failed to record metrics: %s", e) + + def _emit_content_events( + self, + span: Span[Any], + otel_span: OtelSpan, + payload: ContentPayload, + agent_content: Optional[Dict[str, list[Any]]] = None, + ) -> None: + """Intentionally skip emitting gen_ai.* events to avoid payload duplication.""" + if ( + not self.include_sensitive_data + or not self._content_mode.capture_in_event + or not otel_span.is_recording() + ): + return - def _attributes_from_generation(self, span_data: Any) -> dict[str, Any]: - attributes = self._base_attributes() - attributes[GenAI.GEN_AI_OPERATION_NAME] = self._operation_name( - span_data + logger.debug( + "Event capture requested for span %s but is currently disabled", + getattr(span, "span_id", ""), ) + return - model = getattr(span_data, "model", None) - if model: - attributes[GenAI.GEN_AI_REQUEST_MODEL] = model + def _collect_system_instructions( + self, messages: Sequence[Any] | None + ) -> list[dict[str, str]]: + """Return system/ai role instructions as typed text objects. + + Enforces format: [{"type": "text", "content": "..."}]. + Handles message content that may be a string, list of parts, + or a dict with text/content fields. + """ + if not messages: + return [] + out: list[dict[str, str]] = [] + for m in messages: + if not isinstance(m, dict): + continue + role = m.get("role") + if role in {"system", "ai"}: + content = m.get("content") + out.extend(self._normalize_to_text_parts(content)) + return out + + def _normalize_to_text_parts(self, content: Any) -> list[dict[str, str]]: + """Normalize arbitrary content into typed text parts. + + - String -> [{type: text, content: }] + - List/Tuple -> map each item to a text part (string/dict supported) + - Dict -> use 'text' or 'content' field when available; else str(dict) + - Other -> str(value) + """ + parts: list[dict[str, str]] = [] + if content is None: + return parts + if isinstance(content, str): + parts.append({"type": "text", "content": content}) + return parts + if isinstance(content, (list, tuple)): + for item in content: + if isinstance(item, str): + parts.append({"type": "text", "content": item}) + elif isinstance(item, dict): + txt = item.get("text") or item.get("content") + if isinstance(txt, str) and txt: + parts.append({"type": "text", "content": txt}) + else: + parts.append({"type": "text", "content": str(item)}) + else: + parts.append({"type": "text", "content": str(item)}) + return parts + if isinstance(content, dict): + txt = content.get("text") or content.get("content") + if isinstance(txt, str) and txt: + parts.append({"type": "text", "content": txt}) + else: + parts.append({"type": "text", "content": str(content)}) + return parts + # Fallback for other types + parts.append({"type": "text", "content": str(content)}) + return parts + + def _redacted_text_parts(self) -> list[dict[str, str]]: + """Return a single redacted text part for system instructions.""" + return [{"type": "text", "content": "readacted"}] + + def _normalize_messages_to_role_parts( + self, messages: Sequence[Any] | None + ) -> list[dict[str, Any]]: + """Normalize input messages to enforced role+parts schema. + + Each message becomes: {"role": , "parts": [ {"type": ..., ...} ]} + Redaction: when include_sensitive_data is False, replace text content, + tool_call arguments, and tool_call_response result with "readacted". + """ + if not messages: + return [] + normalized: list[dict[str, Any]] = [] + for m in messages: + if not isinstance(m, dict): + # Fallback: treat as user text + normalized.append( + { + "role": "user", + "parts": [ + { + "type": "text", + "content": "readacted" + if not self.include_sensitive_data + else str(m), + } + ], + } + ) + continue - attributes.update( - _extract_server_attributes( - getattr(span_data, "model_config", None) - ) + role = m.get("role") or "user" + parts: list[dict[str, Any]] = [] + + # Existing parts array + if isinstance(m.get("parts"), (list, tuple)): + for p in m["parts"]: + if isinstance(p, dict): + ptype = p.get("type") or "text" + newp: dict[str, Any] = {"type": ptype} + if ptype == "text": + txt = p.get("content") or p.get("text") + newp["content"] = ( + "readacted" + if not self.include_sensitive_data + else (txt if isinstance(txt, str) else str(p)) + ) + elif ptype == "tool_call": + newp["id"] = p.get("id") + newp["name"] = p.get("name") + args = p.get("arguments") + newp["arguments"] = ( + "readacted" + if not self.include_sensitive_data + else args + ) + elif ptype == "tool_call_response": + newp["id"] = p.get("id") or m.get("tool_call_id") + result = p.get("result") or p.get("content") + newp["result"] = ( + "readacted" + if not self.include_sensitive_data + else result + ) + else: + newp["content"] = ( + "readacted" + if not self.include_sensitive_data + else str(p) + ) + parts.append(newp) + else: + parts.append( + { + "type": "text", + "content": "readacted" + if not self.include_sensitive_data + else str(p), + } + ) + + # OpenAI content + content = m.get("content") + if isinstance(content, str): + parts.append( + { + "type": "text", + "content": "readacted" + if not self.include_sensitive_data + else content, + } + ) + elif isinstance(content, (list, tuple)): + for item in content: + if isinstance(item, dict): + itype = item.get("type") or "text" + if itype == "text": + txt = item.get("text") or item.get("content") + parts.append( + { + "type": "text", + "content": "readacted" + if not self.include_sensitive_data + else ( + txt + if isinstance(txt, str) + else str(item) + ), + } + ) + else: + # Fallback for other part types + parts.append( + { + "type": "text", + "content": "readacted" + if not self.include_sensitive_data + else str(item), + } + ) + else: + parts.append( + { + "type": "text", + "content": "readacted" + if not self.include_sensitive_data + else str(item), + } + ) + + # Assistant tool_calls + if role == "assistant" and isinstance( + m.get("tool_calls"), (list, tuple) + ): + for tc in m["tool_calls"]: + if not isinstance(tc, dict): + continue + p = {"type": "tool_call"} + p["id"] = tc.get("id") + fn = tc.get("function") or {} + if isinstance(fn, dict): + p["name"] = fn.get("name") + args = fn.get("arguments") + p["arguments"] = ( + "readacted" + if not self.include_sensitive_data + else args + ) + parts.append(p) + + # Tool call response + if role in {"tool", "function"}: + p = {"type": "tool_call_response"} + p["id"] = m.get("tool_call_id") or m.get("id") + result = m.get("result") or m.get("content") + p["result"] = ( + "readacted" if not self.include_sensitive_data else result + ) + parts.append(p) + + if parts: + normalized.append({"role": role, "parts": parts}) + elif not self.include_sensitive_data: + normalized.append( + {"role": role, "parts": self._redacted_text_parts()} + ) + + return normalized + + def _normalize_output_messages_to_role_parts( + self, span_data: Any + ) -> list[dict[str, Any]]: + """Normalize output messages to enforced role+parts schema. + + Produces: [{"role": "assistant", "parts": [{"type": "text", "content": "..."}], + optional "finish_reason": "..." }] + """ + messages: list[dict[str, Any]] = [] + parts: list[dict[str, Any]] = [] + finish_reason: Optional[str] = None + + # Response span: prefer consolidated output_text + response = getattr(span_data, "response", None) + if response is not None: + # Collect text content + output_text = getattr(response, "output_text", None) + if isinstance(output_text, str) and output_text: + parts.append( + { + "type": "text", + "content": ( + "readacted" + if not self.include_sensitive_data + else output_text + ), + } + ) + else: + output = getattr(response, "output", None) + if isinstance(output, Sequence): + for item in output: + # ResponseOutputMessage may have a string representation + txt = getattr(item, "content", None) + if isinstance(txt, str) and txt: + parts.append( + { + "type": "text", + "content": ( + "readacted" + if not self.include_sensitive_data + else txt + ), + } + ) + else: + # Fallback: stringified + parts.append( + { + "type": "text", + "content": ( + "readacted" + if not self.include_sensitive_data + else str(item) + ), + } + ) + # Capture finish_reason from parts when present + fr = getattr(item, "finish_reason", None) + if isinstance(fr, str) and not finish_reason: + finish_reason = fr + + # Generation span: use span_data.output + if not parts: + output = getattr(span_data, "output", None) + if isinstance(output, Sequence): + for item in output: + if isinstance(item, dict): + if item.get("type") == "text": + txt = item.get("content") or item.get("text") + if isinstance(txt, str) and txt: + parts.append( + { + "type": "text", + "content": ( + "readacted" + if not self.include_sensitive_data + else txt + ), + } + ) + elif "content" in item and isinstance( + item["content"], str + ): + parts.append( + { + "type": "text", + "content": ( + "readacted" + if not self.include_sensitive_data + else item["content"] + ), + } + ) + else: + parts.append( + { + "type": "text", + "content": ( + "readacted" + if not self.include_sensitive_data + else str(item) + ), + } + ) + if not finish_reason and isinstance( + item.get("finish_reason"), str + ): + finish_reason = item.get("finish_reason") + elif isinstance(item, str): + parts.append( + { + "type": "text", + "content": ( + "readacted" + if not self.include_sensitive_data + else item + ), + } + ) + else: + parts.append( + { + "type": "text", + "content": ( + "readacted" + if not self.include_sensitive_data + else str(item) + ), + } + ) + + # Build assistant message + msg: dict[str, Any] = {"role": "assistant", "parts": parts} + if finish_reason: + msg["finish_reason"] = finish_reason + # Only include if there is content + if parts: + messages.append(msg) + return messages + + def _build_content_payload(self, span: Span[Any]) -> ContentPayload: + """Normalize content from span data for attribute/event capture.""" + payload = ContentPayload() + span_data = getattr(span, "span_data", None) + if span_data is None or not self.include_sensitive_data: + return payload + + capture_messages = self._capture_messages and ( + self._content_mode.capture_in_span + or self._content_mode.capture_in_event + ) + capture_system = self._capture_system_instructions and ( + self._content_mode.capture_in_span + or self._content_mode.capture_in_event ) + capture_tools = self._content_mode.capture_in_span or ( + self._content_mode.capture_in_event + and _is_instance_of(span_data, FunctionSpanData) + ) + + if _is_instance_of(span_data, GenerationSpanData): + span_input = getattr(span_data, "input", None) + if capture_messages and span_input: + payload.input_messages = ( + self._normalize_messages_to_role_parts(span_input) + ) + if capture_system and span_input: + sys_instr = self._collect_system_instructions(span_input) + if sys_instr: + payload.system_instructions = sys_instr + if capture_messages and ( + getattr(span_data, "output", None) + or getattr(span_data, "response", None) + ): + normalized_out = self._normalize_output_messages_to_role_parts( + span_data + ) + if normalized_out: + payload.output_messages = normalized_out + + elif _is_instance_of(span_data, ResponseSpanData): + span_input = getattr(span_data, "input", None) + if capture_messages and span_input: + payload.input_messages = ( + self._normalize_messages_to_role_parts(span_input) + ) + if capture_system and span_input: + sys_instr = self._collect_system_instructions(span_input) + if sys_instr: + payload.system_instructions = sys_instr + if capture_messages: + normalized_out = self._normalize_output_messages_to_role_parts( + span_data + ) + if normalized_out: + payload.output_messages = normalized_out + + elif _is_instance_of(span_data, FunctionSpanData) and capture_tools: + + def _serialize_tool_value(value: Any) -> Optional[str]: + if value is None: + return None + if isinstance(value, (dict, list)): + return safe_json_dumps(value) + return str(value) - usage = getattr(span_data, "usage", None) - if isinstance(usage, Mapping): - input_tokens = usage.get("prompt_tokens") or usage.get( - "input_tokens" + payload.tool_arguments = _serialize_tool_value( + getattr(span_data, "input", None) ) - if input_tokens is not None: - attributes[GenAI.GEN_AI_USAGE_INPUT_TOKENS] = input_tokens - output_tokens = usage.get("completion_tokens") or usage.get( - "output_tokens" + payload.tool_result = _serialize_tool_value( + getattr(span_data, "output", None) ) - if output_tokens is not None: - attributes[GenAI.GEN_AI_USAGE_OUTPUT_TOKENS] = output_tokens - - model_config = getattr(span_data, "model_config", None) - if isinstance(model_config, Mapping): - mapping = { - "temperature": GenAI.GEN_AI_REQUEST_TEMPERATURE, - "top_p": GenAI.GEN_AI_REQUEST_TOP_P, - "top_k": GenAI.GEN_AI_REQUEST_TOP_K, - "frequency_penalty": GenAI.GEN_AI_REQUEST_FREQUENCY_PENALTY, - "presence_penalty": GenAI.GEN_AI_REQUEST_PRESENCE_PENALTY, - "seed": GenAI.GEN_AI_REQUEST_SEED, - "n": GenAI.GEN_AI_REQUEST_CHOICE_COUNT, - } - for key, attr in mapping.items(): - value = model_config.get(key) - if value is not None: - attributes[attr] = value - for max_key in ("max_tokens", "max_completion_tokens"): - value = model_config.get(max_key) - if value is not None: - attributes[GenAI.GEN_AI_REQUEST_MAX_TOKENS] = value - break + return payload + + def _find_agent_parent_span_id( + self, span_id: Optional[str] + ) -> Optional[str]: + """Return nearest ancestor span id that represents an agent.""" + current = span_id + visited: set[str] = set() + while current: + if current in visited: + break + visited.add(current) + if current in self._agent_content: + return current + current = self._span_parents.get(current) + return None - stop_sequences = _clean_stop_sequences(model_config.get("stop")) - if stop_sequences: - attributes[GenAI.GEN_AI_REQUEST_STOP_SEQUENCES] = ( - stop_sequences - ) + def _update_agent_aggregate( + self, span: Span[Any], payload: ContentPayload + ) -> None: + """Accumulate child span content for parent agent span.""" + agent_id = self._find_agent_parent_span_id(span.parent_id) + if not agent_id: + return + entry = self._agent_content.setdefault( + agent_id, + { + "input_messages": [], + "output_messages": [], + "system_instructions": [], + "request_model": None, + }, + ) + if payload.input_messages: + entry["input_messages"] = self._merge_content_sequence( + entry["input_messages"], payload.input_messages + ) + if payload.output_messages: + entry["output_messages"] = self._merge_content_sequence( + entry["output_messages"], payload.output_messages + ) + if payload.system_instructions: + entry["system_instructions"] = self._merge_content_sequence( + entry["system_instructions"], payload.system_instructions + ) - attributes[GenAI.GEN_AI_RESPONSE_FINISH_REASONS] = ( - _collect_finish_reasons(getattr(span_data, "output", None)) + if not entry.get("request_model"): + model = getattr(span.span_data, "model", None) + if not model: + response_obj = getattr(span.span_data, "response", None) + model = getattr(response_obj, "model", None) + if model: + entry["request_model"] = model + + def _infer_output_type(self, span_data: Any) -> str: + """Infer gen_ai.output.type for multiple span kinds.""" + if _is_instance_of(span_data, FunctionSpanData): + # Tool results are typically JSON + return GenAIOutputType.JSON + if _is_instance_of(span_data, TranscriptionSpanData): + return GenAIOutputType.TEXT + if _is_instance_of(span_data, SpeechSpanData): + return GenAIOutputType.SPEECH + if _is_instance_of(span_data, GuardrailSpanData): + return GenAIOutputType.TEXT + if _is_instance_of(span_data, HandoffSpanData): + return GenAIOutputType.TEXT + + # Check for embeddings operation + if _is_instance_of(span_data, GenerationSpanData): + if hasattr(span_data, "embedding_dimension"): + return ( + GenAIOutputType.TEXT + ) # Embeddings are numeric but represented as text + + # Generation/Response - check output structure + output = getattr(span_data, "output", None) or getattr( + getattr(span_data, "response", None), "output", None ) + if isinstance(output, Sequence) and output: + first = output[0] + if isinstance(first, dict): + item_type = first.get("type") + if isinstance(item_type, str): + normalized = item_type.strip().lower() + if normalized in {"image", "image_url"}: + return GenAIOutputType.IMAGE + if normalized in {"audio", "speech", "audio_url"}: + return GenAIOutputType.SPEECH + if normalized in { + "json", + "json_object", + "jsonschema", + "function_call", + "tool_call", + "tool_result", + }: + return GenAIOutputType.JSON + if normalized in { + "text", + "output_text", + "message", + "assistant", + }: + return GenAIOutputType.TEXT + + # Conversation style payloads + if "role" in first: + parts = first.get("parts") + if isinstance(parts, Sequence) and parts: + # If all parts are textual (or missing explicit type), treat as text + textual = True + for part in parts: + if isinstance(part, dict): + part_type = str(part.get("type", "")).lower() + if part_type in {"image", "image_url"}: + return GenAIOutputType.IMAGE + if part_type in { + "audio", + "speech", + "audio_url", + }: + return GenAIOutputType.SPEECH + if part_type and part_type not in { + "text", + "output_text", + "assistant", + }: + textual = False + elif not isinstance(part, str): + textual = False + if textual: + return GenAIOutputType.TEXT + content_value = first.get("content") + if isinstance(content_value, str): + return GenAIOutputType.TEXT + + # Detect structured data without explicit type + json_like_keys = { + "schema", + "properties", + "arguments", + "result", + "data", + "json", + "output_json", + } + if json_like_keys.intersection(first.keys()): + return GenAIOutputType.JSON + + return GenAIOutputType.TEXT + + @staticmethod + def _sanitize_usage_payload(usage: Any) -> None: + """Remove non-spec usage fields (e.g., total tokens) in-place.""" + if not usage: + return + if isinstance(usage, dict): + usage.pop("total_tokens", None) + return + if hasattr(usage, "total_tokens"): + try: + setattr(usage, "total_tokens", None) + except Exception: # pragma: no cover - defensive + try: + delattr(usage, "total_tokens") + except Exception: # pragma: no cover - defensive + pass + + def _get_span_kind(self, span_data: Any) -> SpanKind: + """Determine appropriate span kind based on span data type.""" + if _is_instance_of(span_data, FunctionSpanData): + return SpanKind.INTERNAL # Tool execution is internal + if _is_instance_of( + span_data, + ( + GenerationSpanData, + ResponseSpanData, + TranscriptionSpanData, + SpeechSpanData, + ), + ): + return SpanKind.CLIENT # API calls to model providers + if _is_instance_of(span_data, AgentSpanData): + return SpanKind.CLIENT + if _is_instance_of(span_data, (GuardrailSpanData, HandoffSpanData)): + return SpanKind.INTERNAL # Agent operations are internal + return SpanKind.INTERNAL + + def on_trace_start(self, trace: Trace) -> None: + """Create root span when trace starts.""" + if self._tracer: + attributes = { + GEN_AI_PROVIDER_NAME: self.system_name, + GEN_AI_SYSTEM_KEY: self.system_name, + GEN_AI_OPERATION_NAME: GenAIOperationName.INVOKE_AGENT, + } + # Legacy emission removed + + # Add configured agent and server attributes + if self.agent_name: + attributes[GEN_AI_AGENT_NAME] = self.agent_name + if self.agent_id: + attributes[GEN_AI_AGENT_ID] = self.agent_id + if self.agent_description: + attributes[GEN_AI_AGENT_DESCRIPTION] = self.agent_description + attributes.update(self._get_server_attributes()) + + otel_span = self._tracer.start_span( + name=trace.name, + attributes=attributes, + kind=SpanKind.SERVER, # Root span is typically server + ) + self._root_spans[trace.trace_id] = otel_span + + def on_trace_end(self, trace: Trace) -> None: + """End root span when trace ends.""" + if root_span := self._root_spans.pop(trace.trace_id, None): + if root_span.is_recording(): + root_span.set_status(Status(StatusCode.OK)) + root_span.end() + self._cleanup_spans_for_trace(trace.trace_id) + + def on_span_start(self, span: Span[Any]) -> None: + """Start child span for agent span.""" + if not self._tracer or not span.started_at: + return - return attributes + self._span_parents[span.span_id] = span.parent_id + if ( + _is_instance_of(span.span_data, AgentSpanData) + and span.span_id not in self._agent_content + ): + self._agent_content[span.span_id] = { + "input_messages": [], + "output_messages": [], + "system_instructions": [], + "request_model": None, + } - def _attributes_from_response(self, span_data: Any) -> dict[str, Any]: - attributes = self._base_attributes() - attributes[GenAI.GEN_AI_OPERATION_NAME] = self._operation_name( - span_data + parent_span = ( + self._otel_spans.get(span.parent_id) + if span.parent_id + else self._root_spans.get(span.trace_id) + ) + context = set_span_in_context(parent_span) if parent_span else None + + # Get operation details for span naming + operation_name = self._get_operation_name(span.span_data) + model = getattr(span.span_data, "model", None) + if model is None: + response_obj = getattr(span.span_data, "response", None) + model = getattr(response_obj, "model", None) + + # Use configured agent name or get from span data + agent_name = self.agent_name + if not agent_name and _is_instance_of(span.span_data, AgentSpanData): + agent_name = getattr(span.span_data, "name", None) + if not agent_name: + agent_name = self._agent_name_default + + tool_name = ( + getattr(span.span_data, "name", None) + if _is_instance_of(span.span_data, FunctionSpanData) + else None ) - response = getattr(span_data, "response", None) - if response is None: - return attributes + # Generate spec-compliant span name + span_name = get_span_name(operation_name, model, agent_name, tool_name) + + attributes = { + GEN_AI_PROVIDER_NAME: self.system_name, + GEN_AI_SYSTEM_KEY: self.system_name, + GEN_AI_OPERATION_NAME: operation_name, + } + # Legacy emission removed + + # Add configured agent and server attributes + agent_name_override = self.agent_name or self._agent_name_default + agent_id_override = self.agent_id or self._agent_id_default + agent_desc_override = ( + self.agent_description or self._agent_description_default + ) + if agent_name_override: + attributes[GEN_AI_AGENT_NAME] = agent_name_override + if agent_id_override: + attributes[GEN_AI_AGENT_ID] = agent_id_override + if agent_desc_override: + attributes[GEN_AI_AGENT_DESCRIPTION] = agent_desc_override + attributes.update(self._get_server_attributes()) + + otel_span = self._tracer.start_span( + name=span_name, + context=context, + attributes=attributes, + kind=self._get_span_kind(span.span_data), + ) + self._otel_spans[span.span_id] = otel_span + self._tokens[span.span_id] = attach(set_span_in_context(otel_span)) + + def on_span_end(self, span: Span[Any]) -> None: + """Finalize span with attributes, events, and metrics.""" + if token := self._tokens.pop(span.span_id, None): + detach(token) + + payload = self._build_content_payload(span) + self._update_agent_aggregate(span, payload) + agent_content = ( + self._agent_content.get(span.span_id) + if _is_instance_of(span.span_data, AgentSpanData) + else None + ) - response_id = getattr(response, "id", None) - if response_id is not None: - attributes[GenAI.GEN_AI_RESPONSE_ID] = response_id + if not (otel_span := self._otel_spans.pop(span.span_id, None)): + # Log attributes even without OTel span + try: + attributes = dict( + self._extract_genai_attributes( + span, payload, agent_content + ) + ) + for key, value in attributes.items(): + logger.debug( + "GenAI attr span %s: %s=%s", span.span_id, key, value + ) + except Exception as e: + logger.warning( + "Failed to extract attributes for span %s: %s", + span.span_id, + e, + ) + if _is_instance_of(span.span_data, AgentSpanData): + self._agent_content.pop(span.span_id, None) + self._span_parents.pop(span.span_id, None) + return - response_model = getattr(response, "model", None) - if response_model: - attributes[GenAI.GEN_AI_RESPONSE_MODEL] = response_model + try: + # Extract and set attributes + attributes: dict[str, AttributeValue] = {} + # Optimize for non-sampled spans to avoid heavy work + if not otel_span.is_recording(): + otel_span.end() + return + for key, value in self._extract_genai_attributes( + span, payload, agent_content + ): + otel_span.set_attribute(key, value) + attributes[key] = value + + if _is_instance_of( + span.span_data, (GenerationSpanData, ResponseSpanData) + ): + operation_name = attributes.get(GEN_AI_OPERATION_NAME) + model_for_name = attributes.get(GEN_AI_REQUEST_MODEL) or ( + attributes.get(GEN_AI_RESPONSE_MODEL) + ) + if operation_name and model_for_name: + agent_name_for_name = attributes.get(GEN_AI_AGENT_NAME) + tool_name_for_name = attributes.get(GEN_AI_TOOL_NAME) + new_name = get_span_name( + operation_name, + model_for_name, + agent_name_for_name, + tool_name_for_name, + ) + if new_name != otel_span.name: + otel_span.update_name(new_name) + + # Emit span events for captured content when configured + self._emit_content_events(span, otel_span, payload, agent_content) + + # Emit operation details event if configured + # Set error status if applicable + otel_span.set_status(status=_get_span_status(span)) + if getattr(span, "error", None): + err_obj = span.error + err_type = err_obj.get("type") or err_obj.get("name") + if err_type: + otel_span.set_attribute("error.type", err_type) + + # Record metrics before ending span + self._record_metrics(span, attributes) + + # End the span + otel_span.end() + + except Exception as e: + logger.warning("Failed to enrich span %s: %s", span.span_id, e) + otel_span.set_status(Status(StatusCode.ERROR, str(e))) + otel_span.end() + finally: + if _is_instance_of(span.span_data, AgentSpanData): + self._agent_content.pop(span.span_id, None) + self._span_parents.pop(span.span_id, None) - usage = getattr(response, "usage", None) - if usage is not None: - input_tokens = getattr(usage, "input_tokens", None) or getattr( - usage, "prompt_tokens", None + def shutdown(self) -> None: + """Clean up resources on shutdown.""" + for span_id, otel_span in list(self._otel_spans.items()): + otel_span.set_status( + Status(StatusCode.ERROR, "Application shutdown") ) - if input_tokens is not None: - attributes[GenAI.GEN_AI_USAGE_INPUT_TOKENS] = input_tokens - output_tokens = getattr(usage, "output_tokens", None) or getattr( - usage, "completion_tokens", None + otel_span.end() + + for trace_id, root_span in list(self._root_spans.items()): + root_span.set_status( + Status(StatusCode.ERROR, "Application shutdown") ) - if output_tokens is not None: - attributes[GenAI.GEN_AI_USAGE_OUTPUT_TOKENS] = output_tokens + root_span.end() + + self._otel_spans.clear() + self._root_spans.clear() + self._tokens.clear() + self._span_parents.clear() + self._agent_content.clear() + + def force_flush(self) -> None: + """Force flush (no-op for this processor).""" + pass - output = getattr(response, "output", None) - if output: - attributes[GenAI.GEN_AI_RESPONSE_FINISH_REASONS] = ( - _collect_finish_reasons(output) + def _get_operation_name(self, span_data: Any) -> str: + """Determine operation name from span data type.""" + if _is_instance_of(span_data, GenerationSpanData): + # Check if it's embeddings + if hasattr(span_data, "embedding_dimension"): + return GenAIOperationName.EMBEDDINGS + # Check if it's chat or completion + if span_data.input: + first_input = span_data.input[0] if span_data.input else None + if isinstance(first_input, dict) and "role" in first_input: + return GenAIOperationName.CHAT + return GenAIOperationName.TEXT_COMPLETION + elif _is_instance_of(span_data, AgentSpanData): + # Could be create_agent or invoke_agent based on context + operation = getattr(span_data, "operation", None) + normalized = ( + operation.strip().lower() + if isinstance(operation, str) + else None ) + if normalized in {"create", "create_agent"}: + return GenAIOperationName.CREATE_AGENT + if normalized in {"invoke", "invoke_agent"}: + return GenAIOperationName.INVOKE_AGENT + return GenAIOperationName.INVOKE_AGENT + elif _is_instance_of(span_data, FunctionSpanData): + return GenAIOperationName.EXECUTE_TOOL + elif _is_instance_of(span_data, ResponseSpanData): + return GenAIOperationName.CHAT # Response typically from chat + elif _is_instance_of(span_data, TranscriptionSpanData): + return GenAIOperationName.TRANSCRIPTION + elif _is_instance_of(span_data, SpeechSpanData): + return GenAIOperationName.SPEECH + elif _is_instance_of(span_data, GuardrailSpanData): + return GenAIOperationName.GUARDRAIL + elif _is_instance_of(span_data, HandoffSpanData): + return GenAIOperationName.HANDOFF + return "unknown" + + def _extract_genai_attributes( + self, + span: Span[Any], + payload: ContentPayload, + agent_content: Optional[Dict[str, list[Any]]] = None, + ) -> Iterator[tuple[str, AttributeValue]]: + """Yield (attr, value) pairs for GenAI semantic conventions.""" + span_data = span.span_data - return attributes + # Base attributes + yield GEN_AI_PROVIDER_NAME, self.system_name + yield GEN_AI_SYSTEM_KEY, self.system_name + # Legacy emission removed - def _attributes_from_agent(self, span_data: Any) -> dict[str, Any]: - attributes = self._base_attributes() - attributes[GenAI.GEN_AI_OPERATION_NAME] = ( - GenAI.GenAiOperationNameValues.INVOKE_AGENT.value + # Add configured agent attributes (always include when set) + agent_name_override = self.agent_name or self._agent_name_default + agent_id_override = self.agent_id or self._agent_id_default + agent_desc_override = ( + self.agent_description or self._agent_description_default ) + if agent_name_override: + yield GEN_AI_AGENT_NAME, agent_name_override + if agent_id_override: + yield GEN_AI_AGENT_ID, agent_id_override + if agent_desc_override: + yield GEN_AI_AGENT_DESCRIPTION, agent_desc_override + + # Server attributes + for key, value in self._get_server_attributes().items(): + yield key, value + + # Process different span types + if _is_instance_of(span_data, GenerationSpanData): + yield from self._get_attributes_from_generation_span_data( + span_data, payload + ) + elif _is_instance_of(span_data, AgentSpanData): + yield from self._get_attributes_from_agent_span_data( + span_data, agent_content + ) + elif _is_instance_of(span_data, FunctionSpanData): + yield from self._get_attributes_from_function_span_data( + span_data, payload + ) + elif _is_instance_of(span_data, ResponseSpanData): + yield from self._get_attributes_from_response_span_data( + span_data, payload + ) + elif _is_instance_of(span_data, TranscriptionSpanData): + yield from self._get_attributes_from_transcription_span_data( + span_data + ) + elif _is_instance_of(span_data, SpeechSpanData): + yield from self._get_attributes_from_speech_span_data(span_data) + elif _is_instance_of(span_data, GuardrailSpanData): + yield from self._get_attributes_from_guardrail_span_data(span_data) + elif _is_instance_of(span_data, HandoffSpanData): + yield from self._get_attributes_from_handoff_span_data(span_data) + + def _get_attributes_from_generation_span_data( + self, span_data: GenerationSpanData, payload: ContentPayload + ) -> Iterator[tuple[str, AttributeValue]]: + """Extract attributes from generation span.""" + # Operation name + operation_name = self._get_operation_name(span_data) + yield GEN_AI_OPERATION_NAME, operation_name + + # Model information + if span_data.model: + yield GEN_AI_REQUEST_MODEL, span_data.model + + # Check for embeddings-specific attributes + if hasattr(span_data, "embedding_dimension"): + yield ( + GEN_AI_EMBEDDINGS_DIMENSION_COUNT, + span_data.embedding_dimension, + ) - name = self._agent_name_override or getattr(span_data, "name", None) - if name: - attributes[GenAI.GEN_AI_AGENT_NAME] = name - output_type = getattr(span_data, "output_type", None) - if output_type: - attributes[GenAI.GEN_AI_OUTPUT_TYPE] = output_type + # Check for data source + if hasattr(span_data, "data_source_id"): + yield GEN_AI_DATA_SOURCE_ID, span_data.data_source_id + + finish_reasons: list[Any] = [] + if span_data.output: + for part in span_data.output: + if isinstance(part, dict): + fr = part.get("finish_reason") or part.get("stop_reason") + else: + fr = getattr(part, "finish_reason", None) + if fr: + finish_reasons.append( + fr if isinstance(fr, str) else str(fr) + ) + if finish_reasons: + yield GEN_AI_RESPONSE_FINISH_REASONS, finish_reasons + + # Usage information + if span_data.usage: + usage = span_data.usage + self._sanitize_usage_payload(usage) + if "prompt_tokens" in usage or "input_tokens" in usage: + tokens = usage.get("prompt_tokens") or usage.get( + "input_tokens" + ) + if tokens is not None: + yield GEN_AI_USAGE_INPUT_TOKENS, tokens + if "completion_tokens" in usage or "output_tokens" in usage: + tokens = usage.get("completion_tokens") or usage.get( + "output_tokens" + ) + if tokens is not None: + yield GEN_AI_USAGE_OUTPUT_TOKENS, tokens + + # Model configuration + if span_data.model_config: + mc = span_data.model_config + param_map = { + "temperature": GEN_AI_REQUEST_TEMPERATURE, + "top_p": GEN_AI_REQUEST_TOP_P, + "top_k": GEN_AI_REQUEST_TOP_K, + "max_tokens": GEN_AI_REQUEST_MAX_TOKENS, + "presence_penalty": GEN_AI_REQUEST_PRESENCE_PENALTY, + "frequency_penalty": GEN_AI_REQUEST_FREQUENCY_PENALTY, + "seed": GEN_AI_REQUEST_SEED, + "n": GEN_AI_REQUEST_CHOICE_COUNT, + "stop": GEN_AI_REQUEST_STOP_SEQUENCES, + "encoding_formats": GEN_AI_REQUEST_ENCODING_FORMATS, + } + for k, attr in param_map.items(): + if hasattr(mc, "__contains__") and k in mc: + value = mc[k] + else: + value = getattr(mc, k, None) + if value is not None: + yield attr, value - return attributes + if hasattr(mc, "get"): + base_url = ( + mc.get("base_url") + or mc.get("baseUrl") + or mc.get("endpoint") + ) + else: + base_url = ( + getattr(mc, "base_url", None) + or getattr(mc, "baseUrl", None) + or getattr(mc, "endpoint", None) + ) + for key, value in _infer_server_attributes(base_url).items(): + yield key, value + + # Sensitive data capture + if ( + self.include_sensitive_data + and self._content_mode.capture_in_span + and self._capture_messages + and payload.input_messages + ): + yield ( + GEN_AI_INPUT_MESSAGES, + safe_json_dumps(payload.input_messages), + ) - def _attributes_from_agent_creation( - self, span_data: Any - ) -> dict[str, Any]: - attributes = self._base_attributes() - attributes[GenAI.GEN_AI_OPERATION_NAME] = ( - GenAI.GenAiOperationNameValues.CREATE_AGENT.value + if ( + self.include_sensitive_data + and self._content_mode.capture_in_span + and self._capture_system_instructions + and payload.system_instructions + ): + yield ( + GEN_AI_SYSTEM_INSTRUCTIONS, + safe_json_dumps(payload.system_instructions), + ) + + if ( + self.include_sensitive_data + and self._content_mode.capture_in_span + and self._capture_messages + and payload.output_messages + ): + yield ( + GEN_AI_OUTPUT_MESSAGES, + safe_json_dumps(payload.output_messages), + ) + + # Output type + yield ( + GEN_AI_OUTPUT_TYPE, + normalize_output_type(self._infer_output_type(span_data)), ) - name = self._agent_name_override or getattr(span_data, "name", None) + def _merge_content_sequence( + self, + existing: list[Any], + incoming: Sequence[Any], + ) -> list[Any]: + """Merge normalized message/content lists without duplicating snapshots.""" + if not incoming: + return existing + + incoming_list = [self._clone_message(item) for item in incoming] + + if self.include_sensitive_data: + filtered = [ + msg + for msg in incoming_list + if not self._is_placeholder_message(msg) + ] + if filtered: + incoming_list = filtered + + if not existing: + return incoming_list + + result = [self._clone_message(item) for item in existing] + + for idx, new_msg in enumerate(incoming_list): + if idx < len(result): + if ( + self.include_sensitive_data + and self._is_placeholder_message(new_msg) + and not self._is_placeholder_message(result[idx]) + ): + continue + if result[idx] != new_msg: + result[idx] = self._clone_message(new_msg) + else: + if ( + self.include_sensitive_data + and self._is_placeholder_message(new_msg) + ): + if ( + any( + not self._is_placeholder_message(existing_msg) + for existing_msg in result + ) + or new_msg in result + ): + continue + result.append(self._clone_message(new_msg)) + + return result + + def _clone_message(self, message: Any) -> Any: + if isinstance(message, dict): + return { + key: self._clone_message(value) + if isinstance(value, (dict, list)) + else value + for key, value in message.items() + } + if isinstance(message, list): + return [self._clone_message(item) for item in message] + return message + + def _is_placeholder_message(self, message: Any) -> bool: + if not isinstance(message, dict): + return False + parts = message.get("parts") + if not isinstance(parts, list) or not parts: + return False + for part in parts: + if ( + not isinstance(part, dict) + or part.get("type") != "text" + or part.get("content") != "readacted" + ): + return False + return True + + def _get_attributes_from_agent_span_data( + self, + span_data: AgentSpanData, + agent_content: Optional[Dict[str, list[Any]]] = None, + ) -> Iterator[tuple[str, AttributeValue]]: + """Extract attributes from agent span.""" + yield GEN_AI_OPERATION_NAME, self._get_operation_name(span_data) + + name = ( + self.agent_name + or getattr(span_data, "name", None) + or self._agent_name_default + ) if name: - attributes[GenAI.GEN_AI_AGENT_NAME] = name - description = getattr(span_data, "description", None) - if description: - attributes[GenAI.GEN_AI_AGENT_DESCRIPTION] = description - agent_id = getattr(span_data, "agent_id", None) or getattr( - span_data, "id", None + yield GEN_AI_AGENT_NAME, name + + agent_id = ( + self.agent_id + or getattr(span_data, "agent_id", None) + or self._agent_id_default ) if agent_id: - attributes[GenAI.GEN_AI_AGENT_ID] = agent_id + yield GEN_AI_AGENT_ID, agent_id + + description = ( + self.agent_description + or getattr(span_data, "description", None) + or self._agent_description_default + ) + if description: + yield GEN_AI_AGENT_DESCRIPTION, description + model = getattr(span_data, "model", None) + if not model and agent_content: + model = agent_content.get("request_model") if model: - attributes[GenAI.GEN_AI_REQUEST_MODEL] = model + yield GEN_AI_REQUEST_MODEL, model - return attributes + if hasattr(span_data, "conversation_id") and span_data.conversation_id: + yield GEN_AI_CONVERSATION_ID, span_data.conversation_id - def _attributes_from_function(self, span_data: Any) -> dict[str, Any]: - attributes = self._base_attributes() - attributes[GenAI.GEN_AI_OPERATION_NAME] = ( - GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value + # Agent definitions + if self._capture_tool_definitions and hasattr( + span_data, "agent_definitions" + ): + yield ( + GEN_AI_ORCHESTRATOR_AGENT_DEFINITIONS, + safe_json_dumps(span_data.agent_definitions), + ) + + # System instructions from agent definitions + if self._capture_system_instructions and hasattr( + span_data, "agent_definitions" + ): + try: + defs = span_data.agent_definitions + if isinstance(defs, (list, tuple)): + collected: list[dict[str, str]] = [] + for d in defs: + if isinstance(d, dict): + msgs = d.get("messages") or d.get( + "system_messages" + ) + if isinstance(msgs, (list, tuple)): + collected.extend( + self._collect_system_instructions(msgs) + ) + if collected: + yield ( + GEN_AI_SYSTEM_INSTRUCTIONS, + safe_json_dumps(collected), + ) + except Exception: + pass + + if ( + self.include_sensitive_data + and self._content_mode.capture_in_span + and self._capture_messages + and agent_content + ): + if agent_content.get("input_messages"): + yield ( + GEN_AI_INPUT_MESSAGES, + safe_json_dumps(agent_content["input_messages"]), + ) + if agent_content.get("output_messages"): + yield ( + GEN_AI_OUTPUT_MESSAGES, + safe_json_dumps(agent_content["output_messages"]), + ) + if ( + self.include_sensitive_data + and self._content_mode.capture_in_span + and self._capture_system_instructions + and agent_content + and agent_content.get("system_instructions") + ): + yield ( + GEN_AI_SYSTEM_INSTRUCTIONS, + safe_json_dumps(agent_content["system_instructions"]), + ) + + yield ( + GEN_AI_OUTPUT_TYPE, + normalize_output_type(self._infer_output_type(span_data)), ) - name = getattr(span_data, "name", None) - if name: - attributes[GenAI.GEN_AI_TOOL_NAME] = name - attributes[GenAI.GEN_AI_TOOL_TYPE] = "function" + def _get_attributes_from_function_span_data( + self, span_data: FunctionSpanData, payload: ContentPayload + ) -> Iterator[tuple[str, AttributeValue]]: + """Extract attributes from function/tool span.""" + yield GEN_AI_OPERATION_NAME, GenAIOperationName.EXECUTE_TOOL + + if span_data.name: + yield GEN_AI_TOOL_NAME, span_data.name + + # Tool type - validate and normalize + tool_type = "function" # Default for function spans + if hasattr(span_data, "tool_type"): + tool_type = span_data.tool_type + yield GEN_AI_TOOL_TYPE, validate_tool_type(tool_type) + + if hasattr(span_data, "call_id") and span_data.call_id: + yield GEN_AI_TOOL_CALL_ID, span_data.call_id + if hasattr(span_data, "description") and span_data.description: + yield GEN_AI_TOOL_DESCRIPTION, span_data.description + + # Tool definitions + if self._capture_tool_definitions and hasattr( + span_data, "tool_definitions" + ): + yield ( + GEN_AI_TOOL_DEFINITIONS, + safe_json_dumps(span_data.tool_definitions), + ) + + # Tool input/output (sensitive) + if ( + self.include_sensitive_data + and self._content_mode.capture_in_span + and payload.tool_arguments is not None + ): + yield GEN_AI_TOOL_CALL_ARGUMENTS, payload.tool_arguments - return attributes + if ( + self.include_sensitive_data + and self._content_mode.capture_in_span + and payload.tool_result is not None + ): + yield GEN_AI_TOOL_CALL_RESULT, payload.tool_result - def _attributes_from_generic(self, span_data: Any) -> dict[str, Any]: - attributes = self._base_attributes() - attributes[GenAI.GEN_AI_OPERATION_NAME] = self._operation_name( - span_data + yield ( + GEN_AI_OUTPUT_TYPE, + normalize_output_type(self._infer_output_type(span_data)), ) - return attributes - def _attributes_for_span(self, span_data: Any) -> dict[str, Any]: - span_type = getattr(span_data, "type", None) - if span_type == SPAN_TYPE_GENERATION: - return self._attributes_from_generation(span_data) - if span_type == SPAN_TYPE_RESPONSE: - return self._attributes_from_response(span_data) - if span_type == SPAN_TYPE_AGENT: - operation = getattr(span_data, "operation", None) - if isinstance(operation, str) and operation.strip().lower() in { - "create", - "create_agent", - }: - return self._attributes_from_agent_creation(span_data) - return self._attributes_from_agent(span_data) - if span_type == SPAN_TYPE_AGENT_CREATION: - return self._attributes_from_agent_creation(span_data) - if span_type == SPAN_TYPE_FUNCTION: - return self._attributes_from_function(span_data) - if span_type in { - SPAN_TYPE_GUARDRAIL, - SPAN_TYPE_HANDOFF, - SPAN_TYPE_SPEECH_GROUP, - SPAN_TYPE_SPEECH, - SPAN_TYPE_TRANSCRIPTION, - SPAN_TYPE_MCP_TOOLS, - }: - return self._attributes_from_generic(span_data) - return self._base_attributes() - - def on_trace_start(self, trace: AgentsTrace) -> None: - # TODO: Confirm with the GenAI SIG whether emitting a SERVER workflow span - # is the desired long-term shape once the semantic conventions define - # top-level agent spans. - attributes = self._base_attributes() - start_time = ( - _parse_iso8601(getattr(trace, "started_at", None)) or time_ns() - ) - - with self._lock: - span = self._tracer.start_span( - name=trace.name, - kind=SpanKind.SERVER, - attributes=attributes, - start_time=start_time, + def _get_attributes_from_response_span_data( + self, span_data: ResponseSpanData, payload: ContentPayload + ) -> Iterator[tuple[str, AttributeValue]]: + """Extract attributes from response span.""" + yield GEN_AI_OPERATION_NAME, GenAIOperationName.CHAT + + # Response information + if span_data.response: + if hasattr(span_data.response, "id") and span_data.response.id: + yield GEN_AI_RESPONSE_ID, span_data.response.id + + # Model from response + if ( + hasattr(span_data.response, "model") + and span_data.response.model + ): + yield GEN_AI_RESPONSE_MODEL, span_data.response.model + if not getattr(span_data, "model", None): + yield GEN_AI_REQUEST_MODEL, span_data.response.model + + # Finish reasons + finish_reasons = [] + if ( + hasattr(span_data.response, "output") + and span_data.response.output + ): + for part in span_data.response.output: + if isinstance(part, dict): + fr = part.get("finish_reason") or part.get( + "stop_reason" + ) + else: + fr = getattr(part, "finish_reason", None) + if fr: + finish_reasons.append(fr) + if finish_reasons: + yield GEN_AI_RESPONSE_FINISH_REASONS, finish_reasons + + # Usage from response + if ( + hasattr(span_data.response, "usage") + and span_data.response.usage + ): + usage = span_data.response.usage + self._sanitize_usage_payload(usage) + input_tokens = getattr(usage, "input_tokens", None) + if input_tokens is None: + input_tokens = getattr(usage, "prompt_tokens", None) + if input_tokens is not None: + yield GEN_AI_USAGE_INPUT_TOKENS, input_tokens + + output_tokens = getattr(usage, "output_tokens", None) + if output_tokens is None: + output_tokens = getattr(usage, "completion_tokens", None) + if output_tokens is not None: + yield GEN_AI_USAGE_OUTPUT_TOKENS, output_tokens + + # Input/output messages + if ( + self.include_sensitive_data + and self._content_mode.capture_in_span + and self._capture_messages + and payload.input_messages + ): + yield ( + GEN_AI_INPUT_MESSAGES, + safe_json_dumps(payload.input_messages), ) - self._root_spans[trace.trace_id] = span - def on_trace_end(self, trace: AgentsTrace) -> None: - end_time = _parse_iso8601(getattr(trace, "ended_at", None)) + if ( + self.include_sensitive_data + and self._content_mode.capture_in_span + and self._capture_system_instructions + and payload.system_instructions + ): + yield ( + GEN_AI_SYSTEM_INSTRUCTIONS, + safe_json_dumps(payload.system_instructions), + ) - with self._lock: - span = self._root_spans.pop(trace.trace_id, None) + if ( + self.include_sensitive_data + and self._content_mode.capture_in_span + and self._capture_messages + and payload.output_messages + ): + yield ( + GEN_AI_OUTPUT_MESSAGES, + safe_json_dumps(payload.output_messages), + ) - if span: - span.end(end_time=end_time) + yield ( + GEN_AI_OUTPUT_TYPE, + normalize_output_type(self._infer_output_type(span_data)), + ) - def on_span_start(self, span: AgentsSpan[Any]) -> None: - span_data = span.span_data - start_time = _parse_iso8601(span.started_at) - attributes = self._attributes_for_span(span_data) - operation = attributes.get(GenAI.GEN_AI_OPERATION_NAME, "operation") - name = self._span_name(operation, attributes) - kind = self._span_kind(span_data) - - with self._lock: - parent_span = None - if span.parent_id and span.parent_id in self._spans: - parent_span = self._spans[span.parent_id].span - elif span.trace_id in self._root_spans: - parent_span = self._root_spans[span.trace_id] - - context = set_span_in_context(parent_span) if parent_span else None - otel_span = self._tracer.start_span( - name=name, - kind=kind, - attributes=attributes, - start_time=start_time, - context=context, - ) - self._spans[span.span_id] = _SpanContext(span=otel_span, kind=kind) + def _get_attributes_from_transcription_span_data( + self, span_data: TranscriptionSpanData + ) -> Iterator[tuple[str, AttributeValue]]: + """Extract attributes from transcription span.""" + yield GEN_AI_OPERATION_NAME, GenAIOperationName.TRANSCRIPTION + + if hasattr(span_data, "model") and span_data.model: + yield GEN_AI_REQUEST_MODEL, span_data.model + + # Audio format + if hasattr(span_data, "format") and span_data.format: + yield "gen_ai.audio.input.format", span_data.format + + # Transcript (sensitive) + if ( + self.include_sensitive_data + and self._content_mode.capture_in_span + and self._capture_messages + and hasattr(span_data, "transcript") + ): + yield "gen_ai.transcription.text", span_data.transcript - def on_span_end(self, span: AgentsSpan[Any]) -> None: - end_time = _parse_iso8601(span.ended_at) + yield ( + GEN_AI_OUTPUT_TYPE, + normalize_output_type(self._infer_output_type(span_data)), + ) - with self._lock: - context = self._spans.pop(span.span_id, None) + def _get_attributes_from_speech_span_data( + self, span_data: SpeechSpanData + ) -> Iterator[tuple[str, AttributeValue]]: + """Extract attributes from speech span.""" + yield GEN_AI_OPERATION_NAME, GenAIOperationName.SPEECH - if context is None: - return + if hasattr(span_data, "model") and span_data.model: + yield GEN_AI_REQUEST_MODEL, span_data.model - otel_span = context.span - if otel_span.is_recording(): - attributes = self._attributes_for_span(span.span_data) - for key, value in attributes.items(): - otel_span.set_attribute(key, value) + if hasattr(span_data, "voice") and span_data.voice: + yield "gen_ai.speech.voice", span_data.voice - error = span.error - if error: - description = error.get("message") or "" - otel_span.set_status(Status(StatusCode.ERROR, description)) - else: - otel_span.set_status(Status(StatusCode.OK)) + if hasattr(span_data, "format") and span_data.format: + yield "gen_ai.audio.output.format", span_data.format - otel_span.end(end_time=end_time) + # Input text (sensitive) + if ( + self.include_sensitive_data + and self._content_mode.capture_in_span + and self._capture_messages + and hasattr(span_data, "input_text") + ): + yield "gen_ai.speech.input_text", span_data.input_text - def shutdown(self) -> None: - with self._lock: - spans = list(self._spans.values()) - self._spans.clear() - roots = list(self._root_spans.values()) - self._root_spans.clear() + yield ( + GEN_AI_OUTPUT_TYPE, + normalize_output_type(self._infer_output_type(span_data)), + ) - for context in spans: - context.span.set_status(Status(StatusCode.ERROR, "shutdown")) - context.span.end() + def _get_attributes_from_guardrail_span_data( + self, span_data: GuardrailSpanData + ) -> Iterator[tuple[str, AttributeValue]]: + """Extract attributes from guardrail span.""" + yield GEN_AI_OPERATION_NAME, GenAIOperationName.GUARDRAIL - for root in roots: - root.set_status(Status(StatusCode.ERROR, "shutdown")) - root.end() + if span_data.name: + yield GEN_AI_GUARDRAIL_NAME, span_data.name - def force_flush(self) -> None: - # no batching - return + yield GEN_AI_GUARDRAIL_TRIGGERED, span_data.triggered + yield ( + GEN_AI_OUTPUT_TYPE, + normalize_output_type(self._infer_output_type(span_data)), + ) + + def _get_attributes_from_handoff_span_data( + self, span_data: HandoffSpanData + ) -> Iterator[tuple[str, AttributeValue]]: + """Extract attributes from handoff span.""" + yield GEN_AI_OPERATION_NAME, GenAIOperationName.HANDOFF + + if span_data.from_agent: + yield GEN_AI_HANDOFF_FROM_AGENT, span_data.from_agent + if span_data.to_agent: + yield GEN_AI_HANDOFF_TO_AGENT, span_data.to_agent -__all__ = ["_OpenAIAgentsSpanProcessor"] + yield ( + GEN_AI_OUTPUT_TYPE, + normalize_output_type(self._infer_output_type(span_data)), + ) + + def _cleanup_spans_for_trace(self, trace_id: str) -> None: + """Clean up spans for a trace to prevent memory leaks.""" + spans_to_remove = [ + span_id + for span_id in self._otel_spans.keys() + if span_id.startswith(trace_id) + ] + for span_id in spans_to_remove: + if otel_span := self._otel_spans.pop(span_id, None): + otel_span.set_status( + Status( + StatusCode.ERROR, "Trace ended before span completion" + ) + ) + otel_span.end() + self._tokens.pop(span_id, None) + + +__all__ = [ + "GenAIProvider", + "GenAIOperationName", + "GenAIToolType", + "GenAIOutputType", + "GenAIEvaluationAttributes", + "ContentCaptureMode", + "ContentPayload", + "GenAISemanticProcessor", + "normalize_provider", + "normalize_output_type", + "validate_tool_type", +] diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/conftest.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/conftest.py index f652ad6ddc..61d8621993 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/conftest.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/conftest.py @@ -11,6 +11,7 @@ for path in ( PROJECT_ROOT / "opentelemetry-instrumentation" / "src", GENAI_ROOT / "src", + PROJECT_ROOT / "util" / "opentelemetry-util-genai" / "src", REPO_ROOT / "openai_agents_lib", REPO_ROOT / "openai_lib", TESTS_ROOT / "stubs", diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/stubs/wrapt.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/stubs/wrapt.py new file mode 100644 index 0000000000..ba3b8ab22c --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/stubs/wrapt.py @@ -0,0 +1,17 @@ +class ObjectProxy: + """Minimal stand-in for wrapt.ObjectProxy used in tests.""" + + def __init__(self, wrapped): + self.__wrapped__ = wrapped + + def __getattr__(self, item): + return getattr(self.__wrapped__, item) + + def __setattr__(self, key, value): + if key == "__wrapped__": + super().__setattr__(key, value) + else: + setattr(self.__wrapped__, key, value) + + def __call__(self, *args, **kwargs): + return self.__wrapped__(*args, **kwargs) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_tracer.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_tracer.py index 53661309c6..5ff37de9c4 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_tracer.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_tracer.py @@ -1,7 +1,10 @@ from __future__ import annotations +import json import sys from pathlib import Path +from types import SimpleNamespace +from typing import Any TESTS_ROOT = Path(__file__).resolve().parent stub_path = TESTS_ROOT / "stubs" @@ -11,6 +14,7 @@ sys.modules.pop("agents", None) sys.modules.pop("agents.tracing", None) +import agents.tracing as agents_tracing # noqa: E402 from agents.tracing import ( # noqa: E402 agent_span, function_span, @@ -23,6 +27,10 @@ from opentelemetry.instrumentation.openai_agents import ( # noqa: E402 OpenAIAgentsInstrumentor, ) +from opentelemetry.instrumentation.openai_agents.span_processor import ( # noqa: E402 + ContentPayload, + GenAISemanticProcessor, +) from opentelemetry.sdk.trace import TracerProvider # noqa: E402 try: @@ -46,6 +54,12 @@ from opentelemetry.trace import SpanKind # noqa: E402 GEN_AI_PROVIDER_NAME = GenAI.GEN_AI_PROVIDER_NAME +GEN_AI_INPUT_MESSAGES = getattr( + GenAI, "GEN_AI_INPUT_MESSAGES", "gen_ai.input.messages" +) +GEN_AI_OUTPUT_MESSAGES = getattr( + GenAI, "GEN_AI_OUTPUT_MESSAGES", "gen_ai.output.messages" +) def _instrument_with_provider(**instrument_kwargs): @@ -194,6 +208,214 @@ def test_agent_create_span_records_attributes(): exporter.clear() +def _placeholder_message() -> dict[str, Any]: + return { + "role": "user", + "parts": [{"type": "text", "content": "readacted"}], + } + + +def test_normalize_messages_skips_empty_when_sensitive_enabled(): + processor = GenAISemanticProcessor(metrics_enabled=False) + normalized = processor._normalize_messages_to_role_parts( + [{"role": "user", "content": None}] + ) + assert normalized == [] + + +def test_normalize_messages_emits_placeholder_when_sensitive_disabled(): + processor = GenAISemanticProcessor( + include_sensitive_data=False, metrics_enabled=False + ) + normalized = processor._normalize_messages_to_role_parts( + [{"role": "user", "content": None}] + ) + assert normalized == [_placeholder_message()] + + +def test_agent_content_aggregation_skips_duplicate_snapshots(): + processor = GenAISemanticProcessor(metrics_enabled=False) + agent_id = "agent-span" + processor._agent_content[agent_id] = { + "input_messages": [], + "output_messages": [], + "system_instructions": [], + } + + payload = ContentPayload( + input_messages=[ + {"role": "user", "parts": [{"type": "text", "content": "hello"}]}, + { + "role": "user", + "parts": [{"type": "text", "content": "readacted"}], + }, + ] + ) + + processor._update_agent_aggregate( + SimpleNamespace(span_id="child-1", parent_id=agent_id, span_data=None), + payload, + ) + processor._update_agent_aggregate( + SimpleNamespace(span_id="child-2", parent_id=agent_id, span_data=None), + payload, + ) + + aggregated = processor._agent_content[agent_id]["input_messages"] + assert aggregated == [ + {"role": "user", "parts": [{"type": "text", "content": "hello"}]} + ] + # ensure data copied rather than reused to prevent accidental mutation + assert aggregated is not payload.input_messages + + +def test_agent_content_aggregation_filters_placeholder_append_when_sensitive(): + processor = GenAISemanticProcessor(metrics_enabled=False) + agent_id = "agent-span" + processor._agent_content[agent_id] = { + "input_messages": [], + "output_messages": [], + "system_instructions": [], + } + + initial_payload = ContentPayload( + input_messages=[ + {"role": "user", "parts": [{"type": "text", "content": "hello"}]} + ] + ) + processor._update_agent_aggregate( + SimpleNamespace(span_id="child-1", parent_id=agent_id, span_data=None), + initial_payload, + ) + + placeholder_payload = ContentPayload( + input_messages=[_placeholder_message()] + ) + processor._update_agent_aggregate( + SimpleNamespace(span_id="child-2", parent_id=agent_id, span_data=None), + placeholder_payload, + ) + + aggregated = processor._agent_content[agent_id]["input_messages"] + assert aggregated == [ + {"role": "user", "parts": [{"type": "text", "content": "hello"}]} + ] + + +def test_agent_content_aggregation_retains_placeholder_when_sensitive_disabled(): + processor = GenAISemanticProcessor( + include_sensitive_data=False, metrics_enabled=False + ) + agent_id = "agent-span" + processor._agent_content[agent_id] = { + "input_messages": [], + "output_messages": [], + "system_instructions": [], + } + + placeholder_payload = ContentPayload( + input_messages=[_placeholder_message()] + ) + processor._update_agent_aggregate( + SimpleNamespace(span_id="child-1", parent_id=agent_id, span_data=None), + placeholder_payload, + ) + + aggregated = processor._agent_content[agent_id]["input_messages"] + assert aggregated == [_placeholder_message()] + + +def test_agent_content_aggregation_appends_new_messages_once(): + processor = GenAISemanticProcessor(metrics_enabled=False) + agent_id = "agent-span" + processor._agent_content[agent_id] = { + "input_messages": [], + "output_messages": [], + "system_instructions": [], + } + + initial_payload = ContentPayload( + input_messages=[ + {"role": "user", "parts": [{"type": "text", "content": "hello"}]} + ] + ) + processor._update_agent_aggregate( + SimpleNamespace(span_id="child-1", parent_id=agent_id, span_data=None), + initial_payload, + ) + + extended_messages = [ + {"role": "user", "parts": [{"type": "text", "content": "hello"}]}, + { + "role": "assistant", + "parts": [{"type": "text", "content": "hi there"}], + }, + ] + extended_payload = ContentPayload(input_messages=extended_messages) + processor._update_agent_aggregate( + SimpleNamespace(span_id="child-2", parent_id=agent_id, span_data=None), + extended_payload, + ) + + aggregated = processor._agent_content[agent_id]["input_messages"] + assert aggregated == extended_messages + assert extended_payload.input_messages == extended_messages + + +def test_agent_span_collects_child_messages(): + instrumentor, exporter = _instrument_with_provider() + + try: + provider = agents_tracing.get_trace_provider() + + with trace("workflow") as workflow: + agent_span_obj = provider.create_span( + agents_tracing.AgentSpanData(name="helper"), + parent=workflow, + ) + agent_span_obj.start() + + generation = agents_tracing.GenerationSpanData( + input=[{"role": "user", "content": "hi"}], + output=[{"type": "text", "content": "hello"}], + model="gpt-4o-mini", + ) + gen_span = provider.create_span(generation, parent=agent_span_obj) + gen_span.start() + gen_span.finish() + + agent_span_obj.finish() + + spans = exporter.get_finished_spans() + agent_span = next( + span + for span in spans + if span.attributes.get(GenAI.GEN_AI_OPERATION_NAME) + == GenAI.GenAiOperationNameValues.INVOKE_AGENT.value + ) + + prompt = json.loads(agent_span.attributes[GEN_AI_INPUT_MESSAGES]) + completion = json.loads(agent_span.attributes[GEN_AI_OUTPUT_MESSAGES]) + + assert prompt == [ + { + "role": "user", + "parts": [{"type": "text", "content": "hi"}], + } + ] + assert completion == [ + { + "role": "assistant", + "parts": [{"type": "text", "content": "hello"}], + } + ] + + assert not agent_span.events + finally: + instrumentor.uninstrument() + exporter.clear() + + def test_agent_name_override_applied_to_agent_spans(): instrumentor, exporter = _instrument_with_provider( agent_name="Travel Concierge" @@ -223,6 +445,35 @@ def test_agent_name_override_applied_to_agent_spans(): exporter.clear() +def test_capture_mode_can_be_disabled(): + instrumentor, exporter = _instrument_with_provider( + capture_message_content="no_content" + ) + + try: + with trace("workflow"): + with generation_span( + input=[{"role": "user", "content": "hi"}], + output=[{"role": "assistant", "content": "hello"}], + model="gpt-4o-mini", + ): + pass + + spans = exporter.get_finished_spans() + client_span = next( + span for span in spans if span.kind is SpanKind.CLIENT + ) + + assert GEN_AI_INPUT_MESSAGES not in client_span.attributes + assert GEN_AI_OUTPUT_MESSAGES not in client_span.attributes + for event in client_span.events: + assert GEN_AI_INPUT_MESSAGES not in event.attributes + assert GEN_AI_OUTPUT_MESSAGES not in event.attributes + finally: + instrumentor.uninstrument() + exporter.clear() + + def test_response_span_records_response_attributes(): instrumentor, exporter = _instrument_with_provider() diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_z_instrumentor_behaviors.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_z_instrumentor_behaviors.py index 682b358cff..3aa15521c0 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_z_instrumentor_behaviors.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_z_instrumentor_behaviors.py @@ -42,3 +42,26 @@ def test_double_instrument_is_noop(): def test_instrumentation_dependencies_exposed(): instrumentor = OpenAIAgentsInstrumentor() assert instrumentor.instrumentation_dependencies() == _instruments + + +def test_default_agent_configuration(): + set_trace_processors([]) + provider = TracerProvider() + instrumentor = OpenAIAgentsInstrumentor() + + try: + instrumentor.instrument(tracer_provider=provider) + processor = instrumentor._processor + assert processor is not None + assert getattr(processor, "_agent_name_default") == "OpenAI Agent" + assert getattr(processor, "_agent_id_default") == "agent" + assert ( + getattr(processor, "_agent_description_default") + == "OpenAI Agents instrumentation" + ) + assert processor.base_url == "https://api.openai.com" + assert processor.server_address == "api.openai.com" + assert processor.server_port == 443 + finally: + instrumentor.uninstrument() + set_trace_processors([]) diff --git a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_z_span_processor_unit.py b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_z_span_processor_unit.py index 5c368ec870..f47a4a69e6 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_z_span_processor_unit.py +++ b/instrumentation-genai/opentelemetry-instrumentation-openai-agents-v2/tests/test_z_span_processor_unit.py @@ -1,22 +1,83 @@ from __future__ import annotations +import importlib +import json from dataclasses import dataclass +from datetime import datetime, timezone +from enum import Enum from types import SimpleNamespace from typing import Any import pytest +from agents.tracing import ( + AgentSpanData, + FunctionSpanData, + GenerationSpanData, + ResponseSpanData, +) -import opentelemetry.instrumentation.openai_agents.span_processor as sp +import opentelemetry.semconv._incubating.attributes.gen_ai_attributes as _gen_ai_attributes from opentelemetry.sdk.trace import TracerProvider from opentelemetry.semconv._incubating.attributes import ( - gen_ai_attributes as GenAI, -) -from opentelemetry.semconv._incubating.attributes import ( - server_attributes as ServerAttributes, + server_attributes as _server_attributes, ) from opentelemetry.trace import SpanKind from opentelemetry.trace.status import StatusCode + +def _ensure_semconv_enums() -> None: + if not hasattr(_gen_ai_attributes, "GenAiProviderNameValues"): + + class _GenAiProviderNameValues(Enum): + OPENAI = "openai" + GCP_GEN_AI = "gcp.gen_ai" + GCP_VERTEX_AI = "gcp.vertex_ai" + GCP_GEMINI = "gcp.gemini" + ANTHROPIC = "anthropic" + COHERE = "cohere" + AZURE_AI_INFERENCE = "azure.ai.inference" + AZURE_AI_OPENAI = "azure.ai.openai" + IBM_WATSONX_AI = "ibm.watsonx.ai" + AWS_BEDROCK = "aws.bedrock" + PERPLEXITY = "perplexity" + X_AI = "x.ai" + DEEPSEEK = "deepseek" + GROQ = "groq" + MISTRAL_AI = "mistral.ai" + + class _GenAiOperationNameValues(Enum): + CHAT = "chat" + GENERATE_CONTENT = "generate_content" + TEXT_COMPLETION = "text_completion" + EMBEDDINGS = "embeddings" + CREATE_AGENT = "create_agent" + INVOKE_AGENT = "invoke_agent" + EXECUTE_TOOL = "execute_tool" + + class _GenAiOutputTypeValues(Enum): + TEXT = "text" + JSON = "json" + IMAGE = "image" + SPEECH = "speech" + + _gen_ai_attributes.GenAiProviderNameValues = _GenAiProviderNameValues + _gen_ai_attributes.GenAiOperationNameValues = _GenAiOperationNameValues + _gen_ai_attributes.GenAiOutputTypeValues = _GenAiOutputTypeValues + + if not hasattr(_server_attributes, "SERVER_ADDRESS"): + _server_attributes.SERVER_ADDRESS = "server.address" + if not hasattr(_server_attributes, "SERVER_PORT"): + _server_attributes.SERVER_PORT = "server.port" + + +_ensure_semconv_enums() + +ServerAttributes = _server_attributes + +sp = importlib.import_module( + "opentelemetry.instrumentation.openai_agents.span_processor" +) + try: from opentelemetry.sdk.trace.export import ( # type: ignore[attr-defined] InMemorySpanExporter, @@ -29,33 +90,39 @@ ) +def _collect(iterator) -> dict[str, Any]: + return {key: value for key, value in iterator} + + @pytest.fixture def processor_setup(): provider = TracerProvider() exporter = InMemorySpanExporter() provider.add_span_processor(SimpleSpanProcessor(exporter)) tracer = provider.get_tracer(__name__) - processor = sp._OpenAIAgentsSpanProcessor(tracer=tracer, system="openai") + processor = sp.GenAISemanticProcessor(tracer=tracer, system_name="openai") yield processor, exporter processor.shutdown() exporter.clear() -def test_parse_iso8601_variants(): - assert sp._parse_iso8601(None) is None - assert sp._parse_iso8601("bad-value") is None - assert ( - sp._parse_iso8601("2024-01-01T00:00:00Z") == 1704067200 * 1_000_000_000 - ) +def test_time_helpers(): + dt = datetime(2024, 1, 1, tzinfo=timezone.utc) + assert sp._as_utc_nano(dt) == 1704067200 * 1_000_000_000 + class Fallback: + def __str__(self) -> str: + return "fallback" -def test_extract_server_attributes_variants(monkeypatch): - assert sp._extract_server_attributes(None) == {} - assert sp._extract_server_attributes({"base_url": 123}) == {} + assert sp.safe_json_dumps({"foo": "bar"}) == '{"foo":"bar"}' + assert sp.safe_json_dumps(Fallback()) == "fallback" - attrs = sp._extract_server_attributes( - {"base_url": "https://api.example.com:8080/v1"} - ) + +def test_infer_server_attributes_variants(monkeypatch): + assert sp._infer_server_attributes(None) == {} + assert sp._infer_server_attributes(123) == {} + + attrs = sp._infer_server_attributes("https://api.example.com:8080/v1") assert attrs[ServerAttributes.SERVER_ADDRESS] == "api.example.com" assert attrs[ServerAttributes.SERVER_PORT] == 8080 @@ -63,157 +130,157 @@ def boom(_: str): raise ValueError("unparsable url") monkeypatch.setattr(sp, "urlparse", boom) - assert sp._extract_server_attributes({"base_url": "bad"}) == {} - - -def test_chat_helpers_cover_edges(): - assert not sp._looks_like_chat(None) - assert not sp._looks_like_chat([{"content": "only text"}]) - assert sp._looks_like_chat([{"role": "user", "content": "hi"}]) - - reasons = sp._collect_finish_reasons( - [ - {"finish_reason": "stop"}, - {"stop_reason": "length"}, - SimpleNamespace(finish_reason="done"), - ] - ) - assert reasons == ["stop", "length", "done"] - - assert sp._clean_stop_sequences(None) is None - assert sp._clean_stop_sequences("stop") == ["stop"] - assert sp._clean_stop_sequences([None]) is None - assert sp._clean_stop_sequences(["foo", None, 42]) == ["foo", "42"] - assert sp._clean_stop_sequences(123) is None + assert sp._infer_server_attributes("bad") == {} def test_operation_and_span_naming(processor_setup): processor, _ = processor_setup - generation = SimpleNamespace( - type=sp.SPAN_TYPE_GENERATION, input=[{"role": "user"}] - ) + generation = GenerationSpanData(input=[{"role": "user"}], model="gpt-4o") assert ( - processor._operation_name(generation) - == GenAI.GenAiOperationNameValues.CHAT.value + processor._get_operation_name(generation) == sp.GenAIOperationName.CHAT ) - agent_create = SimpleNamespace( - type=sp.SPAN_TYPE_AGENT, operation=" CREATE " - ) + completion = GenerationSpanData(input=[]) assert ( - processor._operation_name(agent_create) - == GenAI.GenAiOperationNameValues.CREATE_AGENT.value + processor._get_operation_name(completion) + == sp.GenAIOperationName.TEXT_COMPLETION ) - agent_invoke = SimpleNamespace( - type=sp.SPAN_TYPE_AGENT, operation="invoke_agent" - ) + embeddings = GenerationSpanData(input=None) + setattr(embeddings, "embedding_dimension", 128) assert ( - processor._operation_name(agent_invoke) - == GenAI.GenAiOperationNameValues.INVOKE_AGENT.value + processor._get_operation_name(embeddings) + == sp.GenAIOperationName.EMBEDDINGS ) - agent_default = SimpleNamespace(type=sp.SPAN_TYPE_AGENT, operation=None) + agent_create = AgentSpanData(operation=" CREATE ") assert ( - processor._operation_name(agent_default) - == GenAI.GenAiOperationNameValues.INVOKE_AGENT.value + processor._get_operation_name(agent_create) + == sp.GenAIOperationName.CREATE_AGENT ) - function_data = SimpleNamespace(type=sp.SPAN_TYPE_FUNCTION) + agent_invoke = AgentSpanData(operation="invoke_agent") assert ( - processor._operation_name(function_data) - == GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value + processor._get_operation_name(agent_invoke) + == sp.GenAIOperationName.INVOKE_AGENT ) - response_data = SimpleNamespace(type=sp.SPAN_TYPE_RESPONSE) + agent_default = AgentSpanData(operation=None) assert ( - processor._operation_name(response_data) - == GenAI.GenAiOperationNameValues.CHAT.value + processor._get_operation_name(agent_default) + == sp.GenAIOperationName.INVOKE_AGENT ) - unknown = SimpleNamespace(type=None) - assert processor._operation_name(unknown) == "operation" - - agent_creation = SimpleNamespace(type=sp.SPAN_TYPE_AGENT_CREATION) + function_data = FunctionSpanData() assert ( - processor._operation_name(agent_creation) - == GenAI.GenAiOperationNameValues.CREATE_AGENT.value + processor._get_operation_name(function_data) + == sp.GenAIOperationName.EXECUTE_TOOL ) + response_data = ResponseSpanData() assert ( - processor._span_kind(SimpleNamespace(type=sp.SPAN_TYPE_GENERATION)) - == SpanKind.CLIENT - ) - assert ( - processor._span_kind(SimpleNamespace(type="internal")) - is SpanKind.INTERNAL + processor._get_operation_name(response_data) + == sp.GenAIOperationName.CHAT ) - attrs = {GenAI.GEN_AI_REQUEST_MODEL: "gpt-4o"} + class UnknownSpanData: + pass + + unknown = UnknownSpanData() + assert processor._get_operation_name(unknown) == "unknown" + + assert processor._get_span_kind(GenerationSpanData()) is SpanKind.CLIENT + assert processor._get_span_kind(FunctionSpanData()) is SpanKind.INTERNAL + assert ( - processor._span_name(GenAI.GenAiOperationNameValues.CHAT.value, attrs) + sp.get_span_name(sp.GenAIOperationName.CHAT, model="gpt-4o") == "chat gpt-4o" ) assert ( - processor._span_name( - GenAI.GenAiOperationNameValues.EXECUTE_TOOL.value, - {GenAI.GEN_AI_TOOL_NAME: "weather"}, + sp.get_span_name( + sp.GenAIOperationName.EXECUTE_TOOL, tool_name="weather" ) == "execute_tool weather" ) assert ( - processor._span_name( - GenAI.GenAiOperationNameValues.INVOKE_AGENT.value, {} - ) + sp.get_span_name(sp.GenAIOperationName.INVOKE_AGENT, agent_name=None) == "invoke_agent" ) assert ( - processor._span_name( - GenAI.GenAiOperationNameValues.CREATE_AGENT.value, {} - ) + sp.get_span_name(sp.GenAIOperationName.CREATE_AGENT, agent_name=None) == "create_agent" ) - assert processor._span_name("custom_operation", {}) == "custom_operation" def test_attribute_builders(processor_setup): processor, _ = processor_setup - generation_span = SimpleNamespace( - type=sp.SPAN_TYPE_GENERATION, + payload = sp.ContentPayload( + input_messages=[ + { + "role": "user", + "parts": [{"type": "text", "content": "hi"}], + } + ], + output_messages=[ + { + "role": "assistant", + "parts": [{"type": "text", "content": "hello"}], + } + ], + system_instructions=[{"type": "text", "content": "be helpful"}], + ) + model_config = { + "base_url": "https://api.openai.com:443/v1", + "temperature": 0.2, + "top_p": 0.9, + "top_k": 3, + "frequency_penalty": 0.1, + "presence_penalty": 0.4, + "seed": 1234, + "n": 2, + "max_tokens": 128, + "stop": ["foo", None, "bar"], + } + generation_span = GenerationSpanData( input=[{"role": "user"}], - model="gpt-4o", output=[{"finish_reason": "stop"}], - model_config={ - "base_url": "https://api.openai.com:443/v1", - "temperature": 0.2, - "top_p": 0.9, - "top_k": 3, - "frequency_penalty": 0.1, - "presence_penalty": 0.4, - "seed": 1234, - "n": 2, - "max_tokens": 128, - "stop": ["foo", None, "bar"], + model="gpt-4o", + model_config=model_config, + usage={ + "prompt_tokens": 10, + "completion_tokens": 3, + "total_tokens": 13, }, - usage={"prompt_tokens": 10, "completion_tokens": 3}, ) - gen_attrs = processor._attributes_from_generation(generation_span) - assert gen_attrs[GenAI.GEN_AI_REQUEST_MODEL] == "gpt-4o" - assert gen_attrs[GenAI.GEN_AI_REQUEST_MAX_TOKENS] == 128 - assert gen_attrs[GenAI.GEN_AI_REQUEST_STOP_SEQUENCES] == ["foo", "bar"] + gen_attrs = _collect( + processor._get_attributes_from_generation_span_data( + generation_span, payload + ) + ) + assert gen_attrs[sp.GEN_AI_REQUEST_MODEL] == "gpt-4o" + assert gen_attrs[sp.GEN_AI_REQUEST_MAX_TOKENS] == 128 + assert gen_attrs[sp.GEN_AI_REQUEST_STOP_SEQUENCES] == [ + "foo", + None, + "bar", + ] assert gen_attrs[ServerAttributes.SERVER_ADDRESS] == "api.openai.com" assert gen_attrs[ServerAttributes.SERVER_PORT] == 443 - assert gen_attrs[GenAI.GEN_AI_USAGE_INPUT_TOKENS] == 10 - assert gen_attrs[GenAI.GEN_AI_USAGE_OUTPUT_TOKENS] == 3 - - empty_response_span = SimpleNamespace( - type=sp.SPAN_TYPE_RESPONSE, response=None + assert gen_attrs[sp.GEN_AI_USAGE_INPUT_TOKENS] == 10 + assert gen_attrs[sp.GEN_AI_USAGE_OUTPUT_TOKENS] == 3 + assert gen_attrs[sp.GEN_AI_RESPONSE_FINISH_REASONS] == ["stop"] + assert json.loads(gen_attrs[sp.GEN_AI_INPUT_MESSAGES])[0]["role"] == "user" + assert ( + json.loads(gen_attrs[sp.GEN_AI_OUTPUT_MESSAGES])[0]["role"] + == "assistant" + ) + assert ( + json.loads(gen_attrs[sp.GEN_AI_SYSTEM_INSTRUCTIONS])[0]["content"] + == "be helpful" ) - empty_attrs = processor._attributes_from_response(empty_response_span) - assert empty_attrs[GenAI.GEN_AI_OPERATION_NAME] == "chat" - assert empty_attrs[sp._GEN_AI_PROVIDER_NAME] == "openai" + assert gen_attrs[sp.GEN_AI_OUTPUT_TYPE] == sp.GenAIOutputType.TEXT class _Usage: def __init__(self) -> None: @@ -221,6 +288,7 @@ def __init__(self) -> None: self.prompt_tokens = 7 self.output_tokens = None self.completion_tokens = 2 + self.total_tokens = 9 class _Response: def __init__(self) -> None: @@ -229,117 +297,107 @@ def __init__(self) -> None: self.usage = _Usage() self.output = [{"finish_reason": "stop"}] - response_span = SimpleNamespace( - type=sp.SPAN_TYPE_RESPONSE, response=_Response() - ) - response_attrs = processor._attributes_from_response(response_span) - assert response_attrs[GenAI.GEN_AI_RESPONSE_ID] == "resp-1" - assert response_attrs[GenAI.GEN_AI_RESPONSE_MODEL] == "gpt-4o" - assert response_attrs[GenAI.GEN_AI_USAGE_INPUT_TOKENS] == 7 - assert response_attrs[GenAI.GEN_AI_USAGE_OUTPUT_TOKENS] == 2 - - agent_span = SimpleNamespace( - type=sp.SPAN_TYPE_AGENT, name="helper", output_type="json" + response_span = ResponseSpanData(response=_Response()) + response_attrs = _collect( + processor._get_attributes_from_response_span_data( + response_span, sp.ContentPayload() + ) ) - agent_attrs = processor._attributes_from_agent(agent_span) - assert agent_attrs[GenAI.GEN_AI_AGENT_NAME] == "helper" - assert agent_attrs[GenAI.GEN_AI_OUTPUT_TYPE] == "json" + assert response_attrs[sp.GEN_AI_RESPONSE_ID] == "resp-1" + assert response_attrs[sp.GEN_AI_RESPONSE_MODEL] == "gpt-4o" + assert response_attrs[sp.GEN_AI_RESPONSE_FINISH_REASONS] == ["stop"] + assert response_attrs[sp.GEN_AI_USAGE_INPUT_TOKENS] == 7 + assert response_attrs[sp.GEN_AI_USAGE_OUTPUT_TOKENS] == 2 + assert response_attrs[sp.GEN_AI_OUTPUT_TYPE] == sp.GenAIOutputType.TEXT - agent_creation_span = SimpleNamespace( - type=sp.SPAN_TYPE_AGENT_CREATION, - name="builder", + agent_span = AgentSpanData( + name="helper", + output_type="json", description="desc", - agent_id=None, - id="agent-123", + agent_id="agent-123", model="model-x", + operation="invoke_agent", ) - creation_attrs = processor._attributes_from_agent_creation( - agent_creation_span + agent_attrs = _collect( + processor._get_attributes_from_agent_span_data(agent_span, None) ) - assert creation_attrs[GenAI.GEN_AI_AGENT_ID] == "agent-123" - assert creation_attrs[GenAI.GEN_AI_REQUEST_MODEL] == "model-x" + assert agent_attrs[sp.GEN_AI_AGENT_NAME] == "helper" + assert agent_attrs[sp.GEN_AI_AGENT_ID] == "agent-123" + assert agent_attrs[sp.GEN_AI_REQUEST_MODEL] == "model-x" + assert agent_attrs[sp.GEN_AI_OUTPUT_TYPE] == sp.GenAIOutputType.TEXT - function_span = SimpleNamespace( - type=sp.SPAN_TYPE_FUNCTION, name="lookup_weather" + # Fallback to aggregated model when span data lacks it + agent_span_no_model = AgentSpanData( + name="helper-2", + output_type="json", + description="desc", + agent_id="agent-456", + operation="invoke_agent", + ) + agent_content = { + "input_messages": [], + "output_messages": [], + "system_instructions": [], + "request_model": "gpt-fallback", + } + agent_attrs_fallback = _collect( + processor._get_attributes_from_agent_span_data( + agent_span_no_model, agent_content + ) ) - function_attrs = processor._attributes_from_function(function_span) - assert function_attrs[GenAI.GEN_AI_TOOL_NAME] == "lookup_weather" - assert function_attrs[GenAI.GEN_AI_TOOL_TYPE] == "function" - - generic_span = SimpleNamespace(type="custom") - generic_attrs = processor._attributes_from_generic(generic_span) - assert generic_attrs[GenAI.GEN_AI_OPERATION_NAME] == "custom" - + assert agent_attrs_fallback[sp.GEN_AI_REQUEST_MODEL] == "gpt-fallback" -def test_attributes_for_span_dispatch(processor_setup): - processor, _ = processor_setup - - generation_span = SimpleNamespace( - type=sp.SPAN_TYPE_GENERATION, - model="gpt", - input=[{"role": "user"}], - output=[], + function_span = FunctionSpanData(name="lookup_weather") + function_span.tool_type = "extension" + function_span.call_id = "call-42" + function_span.description = "desc" + function_payload = sp.ContentPayload( + tool_arguments={"city": "seattle"}, + tool_result={"temperature": 70}, ) - assert GenAI.GEN_AI_OPERATION_NAME in processor._attributes_for_span( - generation_span + function_attrs = _collect( + processor._get_attributes_from_function_span_data( + function_span, function_payload + ) ) + assert function_attrs[sp.GEN_AI_TOOL_NAME] == "lookup_weather" + assert function_attrs[sp.GEN_AI_TOOL_TYPE] == "extension" + assert function_attrs[sp.GEN_AI_TOOL_CALL_ID] == "call-42" + assert function_attrs[sp.GEN_AI_TOOL_DESCRIPTION] == "desc" + assert function_attrs[sp.GEN_AI_TOOL_CALL_ARGUMENTS] == {"city": "seattle"} + assert function_attrs[sp.GEN_AI_TOOL_CALL_RESULT] == {"temperature": 70} + assert function_attrs[sp.GEN_AI_OUTPUT_TYPE] == sp.GenAIOutputType.JSON - response_span = SimpleNamespace( - type=sp.SPAN_TYPE_RESPONSE, response=SimpleNamespace() - ) - assert ( - processor._attributes_for_span(response_span)[ - GenAI.GEN_AI_OPERATION_NAME - ] - == "chat" - ) - agent_create_span = SimpleNamespace( - type=sp.SPAN_TYPE_AGENT, operation="create" - ) - assert ( - processor._attributes_for_span(agent_create_span)[ - GenAI.GEN_AI_OPERATION_NAME - ] - == "create_agent" - ) +def test_extract_genai_attributes_unknown_type(processor_setup): + processor, _ = processor_setup - agent_invoke_span = SimpleNamespace( - type=sp.SPAN_TYPE_AGENT, operation="invoke" - ) - assert ( - processor._attributes_for_span(agent_invoke_span)[ - GenAI.GEN_AI_OPERATION_NAME - ] - == "invoke_agent" - ) + class UnknownSpanData: + pass - agent_creation_span = SimpleNamespace(type=sp.SPAN_TYPE_AGENT_CREATION) - assert ( - processor._attributes_for_span(agent_creation_span)[ - GenAI.GEN_AI_OPERATION_NAME - ] - == "create_agent" - ) + class StubSpan: + def __init__(self) -> None: + self.span_data = UnknownSpanData() - function_span = SimpleNamespace(type=sp.SPAN_TYPE_FUNCTION) - assert ( - processor._attributes_for_span(function_span)[GenAI.GEN_AI_TOOL_TYPE] - == "function" + attrs = _collect( + processor._extract_genai_attributes( + StubSpan(), sp.ContentPayload(), None + ) ) + assert attrs[sp.GEN_AI_PROVIDER_NAME] == "openai" + assert attrs[sp.GEN_AI_SYSTEM_KEY] == "openai" + assert sp.GEN_AI_OPERATION_NAME not in attrs - guardrail_span = SimpleNamespace(type=sp.SPAN_TYPE_GUARDRAIL) - assert ( - processor._attributes_for_span(guardrail_span)[ - GenAI.GEN_AI_OPERATION_NAME - ] - == sp.SPAN_TYPE_GUARDRAIL + +def test_span_status_helper(): + status = sp._get_span_status( + SimpleNamespace(error={"message": "boom", "data": "bad"}) ) + assert status.status_code is StatusCode.ERROR + assert status.description == "boom: bad" - unknown_span = SimpleNamespace(type="unknown") - assert processor._attributes_for_span(unknown_span) == { - sp._GEN_AI_PROVIDER_NAME: "openai" - } + ok_status = sp._get_span_status(SimpleNamespace(error=None)) + assert ok_status.status_code is StatusCode.OK @dataclass @@ -375,8 +433,8 @@ def test_span_lifecycle_and_shutdown(processor_setup): parent_span = FakeSpan( trace_id="trace-1", span_id="span-1", - span_data=SimpleNamespace( - type=sp.SPAN_TYPE_AGENT, operation="invoke", name="agent" + span_data=AgentSpanData( + operation="invoke", name="agent", model="gpt-4o" ), started_at="2024-01-01T00:00:00Z", ended_at="2024-01-01T00:00:02Z", @@ -386,7 +444,7 @@ def test_span_lifecycle_and_shutdown(processor_setup): missing_span = FakeSpan( trace_id="trace-1", span_id="missing", - span_data=SimpleNamespace(type=sp.SPAN_TYPE_FUNCTION), + span_data=FunctionSpanData(name="lookup"), started_at="2024-01-01T00:00:01Z", ended_at="2024-01-01T00:00:02Z", ) @@ -396,10 +454,10 @@ def test_span_lifecycle_and_shutdown(processor_setup): trace_id="trace-1", span_id="span-2", parent_id="span-1", - span_data=SimpleNamespace(type=sp.SPAN_TYPE_FUNCTION, name="lookup"), + span_data=FunctionSpanData(name="lookup"), started_at="2024-01-01T00:00:02Z", ended_at="2024-01-01T00:00:03Z", - error={"message": "boom"}, + error={"message": "boom", "data": "bad"}, ) processor.on_span_start(child_span) processor.on_span_end(child_span) @@ -416,7 +474,7 @@ def test_span_lifecycle_and_shutdown(processor_setup): linger_span = FakeSpan( trace_id="trace-2", span_id="span-3", - span_data=SimpleNamespace(type=sp.SPAN_TYPE_AGENT, operation=None), + span_data=AgentSpanData(operation=None), started_at="2024-01-01T00:00:06Z", ) processor.on_span_start(linger_span) @@ -427,15 +485,66 @@ def test_span_lifecycle_and_shutdown(processor_setup): finished = exporter.get_finished_spans() statuses = {span.name: span.status for span in finished} - assert any( - status.status_code is StatusCode.ERROR and status.description == "boom" - for status in statuses.values() + assert ( + statuses["execute_tool lookup"].status_code is StatusCode.ERROR + and statuses["execute_tool lookup"].description == "boom: bad" ) - assert any( - status.status_code is StatusCode.OK for status in statuses.values() + assert statuses["invoke_agent agent"].status_code is StatusCode.OK + assert statuses["workflow"].status_code is StatusCode.OK + assert ( + statuses["invoke_agent"].status_code is StatusCode.ERROR + and statuses["invoke_agent"].description == "Application shutdown" + ) + assert ( + statuses["linger"].status_code is StatusCode.ERROR + and statuses["linger"].description == "Application shutdown" ) - assert any( - status.status_code is StatusCode.ERROR - and status.description == "shutdown" - for status in statuses.values() + workflow_span = next(span for span in finished if span.name == "workflow") + assert ( + workflow_span.attributes[sp.GEN_AI_OPERATION_NAME] + == sp.GenAIOperationName.INVOKE_AGENT ) + + +def test_chat_span_renamed_with_model(processor_setup): + processor, exporter = processor_setup + + trace = FakeTrace(name="workflow", trace_id="trace-rename") + processor.on_trace_start(trace) + + agent = FakeSpan( + trace_id=trace.trace_id, + span_id="agent-span", + span_data=AgentSpanData( + operation="invoke_agent", + name="Agent", + ), + started_at="2025-01-01T00:00:00Z", + ended_at="2025-01-01T00:00:02Z", + ) + processor.on_span_start(agent) + + generation_data = GenerationSpanData( + input=[{"role": "user", "content": "question"}], + output=[{"finish_reason": "stop"}], + usage={"prompt_tokens": 1, "completion_tokens": 1}, + ) + generation_span = FakeSpan( + trace_id=trace.trace_id, + span_id="child-span", + parent_id=agent.span_id, + span_data=generation_data, + started_at="2025-01-01T00:00:00Z", + ended_at="2025-01-01T00:00:01Z", + ) + processor.on_span_start(generation_span) + + # Model becomes available before span end (e.g., once response arrives) + generation_data.model = "gpt-4o" + + processor.on_span_end(generation_span) + processor.on_span_end(agent) + processor.on_trace_end(trace) + + span_names = {span.name for span in exporter.get_finished_spans()} + assert "chat gpt-4o" in span_names diff --git a/uv.lock b/uv.lock index 5150ff87da..76a60b9134 100644 --- a/uv.lock +++ b/uv.lock @@ -3354,6 +3354,7 @@ dependencies = [ { name = "opentelemetry-api" }, { name = "opentelemetry-instrumentation" }, { name = "opentelemetry-semantic-conventions" }, + { name = "opentelemetry-util-genai" }, ] [package.optional-dependencies] @@ -3367,6 +3368,7 @@ requires-dist = [ { name = "opentelemetry-api", git = "https://github.com/open-telemetry/opentelemetry-python?subdirectory=opentelemetry-api&branch=main" }, { name = "opentelemetry-instrumentation", editable = "opentelemetry-instrumentation" }, { name = "opentelemetry-semantic-conventions", git = "https://github.com/open-telemetry/opentelemetry-python?subdirectory=opentelemetry-semantic-conventions&branch=main" }, + { name = "opentelemetry-util-genai", editable = "util/opentelemetry-util-genai" }, ] provides-extras = ["instruments"]