diff --git a/agentops/instrumentation/README.md b/agentops/instrumentation/README.md index d6fea178b..94d087728 100644 --- a/agentops/instrumentation/README.md +++ b/agentops/instrumentation/README.md @@ -1,20 +1,38 @@ -# AgentOps Instrumentation +# AgentOps Instrumentation Developer Guide -This package provides OpenTelemetry instrumentation for various LLM providers and related services. +This comprehensive guide provides everything you need to implement OpenTelemetry instrumentation for LLM providers and related services in AgentOps. -## Available Instrumentors +## Table of Contents -- OpenAI (`v0.27.0+` and `v1.0.0+`) +1. [Architectural Overview](#architectural-overview) +2. [Prerequisites](#prerequisites) +3. [Available Instrumentors](#available-instrumentors) +4. [Quick Start Example](#quick-start-example) +5. [Implementation Guide](#implementation-guide) +6. [Configuration](#configuration) +7. [Testing Methodologies](#testing-methodologies) +8. [Debugging Techniques](#debugging-techniques) +9. [Error Handling](#error-handling) +10. [Best Practices](#best-practices) +### Key Components -## Usage +- **EnhancedBaseInstrumentor**: Abstract base class providing common initialization, metrics, and lifecycle management +- **Context Management**: Thread-safe context propagation and span relationship management +- **Attribute Handlers**: Reusable extractors for common patterns (LLM requests, messages, streaming) +- **Span Lifecycle**: Consistent error handling, timing, and retry mechanisms +- **Wrapper Utilities**: Standardized method wrapping with OpenTelemetry integration -### OpenAI Instrumentation +## Quick Start Example + +### Using an Existing Instrumentor ```python -from opentelemetry.instrumentation.openai import OpenAIInstrumentor +from agentops.instrumentation.openai import OpenAIInstrumentor +from agentops import init -from agentops.telemetry import get_tracer_provider() +# Initialize AgentOps +init(api_key="your-api-key") # Initialize and instrument instrumentor = OpenAIInstrumentor( @@ -22,11 +40,565 @@ instrumentor = OpenAIInstrumentor( enrich_token_usage=True, # Include token usage in spans enable_trace_context_propagation=True, # Enable trace context propagation ) -instrumentor.instrument(tracer_provider=tracer_provider) # <-- Uses the global AgentOps TracerProvider +instrumentor.instrument() # Uses the global AgentOps TracerProvider +``` + +## Implementation Guide + +### Step 1: Create Your Instrumentor Class + +Create a new file `agentops/instrumentation/your_provider/instrumentor.py`: + +```python +"""YourProvider API Instrumentation for AgentOps + +This module provides instrumentation for YourProvider API, implementing OpenTelemetry +instrumentation for model requests and responses. +""" + +from typing import List, Collection, Dict, Any, Optional +from agentops.instrumentation.common import EnhancedBaseInstrumentor, WrapConfig +from agentops.instrumentation.your_provider import LIBRARY_NAME, LIBRARY_VERSION +from agentops.instrumentation.your_provider.attributes import ( + get_chat_attributes, + get_completion_attributes, +) +from agentops.semconv import Meters + + +class YourProviderInstrumentor(EnhancedBaseInstrumentor): + """An instrumentor for YourProvider's API. + + This instrumentor extends the EnhancedBaseInstrumentor to provide + provider-specific instrumentation with automatic metric creation, + error handling, and lifecycle management. + """ + + def __init__(self, config_option1: bool = True, config_option2: Optional[str] = None): + super().__init__() + self.config_option1 = config_option1 + self.config_option2 = config_option2 + + @property + def library_name(self) -> str: + """Return the library name for tracer creation.""" + return LIBRARY_NAME + + @property + def library_version(self) -> str: + """Return the library version.""" + return LIBRARY_VERSION + + @property + def wrapped_methods(self) -> List[WrapConfig]: + """Define all methods to be wrapped for instrumentation.""" + return [ + # Chat completions + WrapConfig( + trace_name="your_provider.chat.completion", + package="your_provider.resources.chat", + class_name="Chat", + method_name="create", + handler=get_chat_attributes, + ), + # Async variant + WrapConfig( + trace_name="your_provider.chat.completion", + package="your_provider.resources.chat", + class_name="AsyncChat", + method_name="create", + handler=get_chat_attributes, + is_async=True, + ), + # Add more methods as needed + ] + + @property + def supports_streaming(self) -> bool: + """Indicate if this provider supports streaming responses.""" + return True # Set to False if no streaming support + + def get_streaming_wrapper(self, tracer): + """Return the sync streaming wrapper if supported.""" + if self.supports_streaming: + from .stream_wrapper import create_streaming_wrapper + return create_streaming_wrapper(tracer) + return None + + def instrumentation_dependencies(self) -> Collection[str]: + """Specify required package dependencies.""" + return ["your-provider >= 1.0.0"] + + def _create_provider_metrics(self, meter) -> Dict[str, Any]: + """Create provider-specific metrics beyond the common ones.""" + return { + "custom_metric": meter.create_counter( + name="your_provider.custom_metric", + unit="count", + description="Description of your custom metric", + ), + } +``` + +### Step 2: Create Attribute Handlers + +Create `agentops/instrumentation/your_provider/attributes.py`: + +```python +"""Attribute extraction handlers for YourProvider instrumentation.""" + +from typing import Any, Dict, Optional, Tuple +from agentops.instrumentation.common import ( + AttributeMap, + LLMAttributeHandler, + MessageAttributeHandler, + create_composite_handler, +) +from agentops.semconv import SpanAttributes, LLMRequestTypeValues + +# Define provider-specific attribute mappings +YOUR_PROVIDER_REQUEST_ATTRIBUTES: AttributeMap = { + SpanAttributes.LLM_REQUEST_CUSTOM_PARAM: "custom_param", + # Add more mappings +} + +YOUR_PROVIDER_RESPONSE_ATTRIBUTES: AttributeMap = { + SpanAttributes.LLM_RESPONSE_CUSTOM_FIELD: "custom_field", + # Add more mappings +} + + +def _extract_base_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract base attributes specific to YourProvider.""" + attributes = { + SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.CHAT.value, + SpanAttributes.LLM_SYSTEM: "YourProvider", + } + + # Add provider-specific logic + if kwargs and "streaming" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_STREAMING] = kwargs["streaming"] + + return attributes + + +def _extract_request_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract request attributes using common LLM handler.""" + if not kwargs: + return {} + + # Use the common LLM handler with provider-specific mappings + return LLMAttributeHandler.extract_request_attributes( + kwargs, + additional_mappings=YOUR_PROVIDER_REQUEST_ATTRIBUTES + ) + + +def _extract_messages( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract message content from request and response.""" + attributes = {} + + # Extract request messages + if kwargs and "messages" in kwargs: + messages = kwargs["messages"] + # Transform to standard format if needed + formatted_messages = _format_messages(messages) + + message_attrs = MessageAttributeHandler.extract_messages( + formatted_messages, + attribute_type="prompt" + ) + attributes.update(message_attrs) + + # Extract response messages + if return_value: + response_messages = _extract_response_messages(return_value) + if response_messages: + completion_attrs = MessageAttributeHandler.extract_messages( + response_messages, + attribute_type="completion" + ) + attributes.update(completion_attrs) + + return attributes + + +def _extract_response_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract response attributes using common handler.""" + if not return_value: + return {} + + # Convert response to dict if needed + response_dict = _convert_to_dict(return_value) + + return LLMAttributeHandler.extract_response_attributes( + response_dict, + additional_mappings=YOUR_PROVIDER_RESPONSE_ATTRIBUTES + ) + + +# Create the main composite handler +get_chat_attributes = create_composite_handler( + _extract_base_attributes, + _extract_request_attributes, + _extract_messages, + _extract_response_attributes, +) +``` + +### Step 3: Handle Streaming (if applicable) + +Create `agentops/instrumentation/your_provider/stream_wrapper.py`: + +```python +"""Streaming response wrapper for YourProvider.""" + +from typing import Any, AsyncIterator, Iterator +from opentelemetry import trace +from opentelemetry.trace import SpanKind +from agentops.instrumentation.common import ( + SpanLifecycleManager, + StreamingAttributeHandler, + global_context_manager, +) + + +def create_streaming_wrapper(tracer: trace.Tracer): + """Create a wrapper for streaming responses.""" + + def wrapper(wrapped, instance, args, kwargs): + # Start span for the streaming operation + with tracer.start_as_current_span( + "your_provider.chat.stream", + kind=SpanKind.CLIENT, + ) as span: + # Mark as streaming + span.set_attribute("gen_ai.request.streaming", True) + + # Get the stream + stream = wrapped(*args, **kwargs) + + # Create streaming handler + handler = StreamingAttributeHandler.create_streaming_handler( + span_attribute_prefix="stream" + ) + + # Wrap the stream + return StreamWrapper(stream, span, handler) + + return wrapper + + +class StreamWrapper: + """Wrapper for streaming responses that captures incremental data.""" + + def __init__(self, stream, span, handler): + self.stream = stream + self.span = span + self.handler = handler + self.chunk_index = 0 + self.accumulated_content = "" + + def __iter__(self): + try: + for chunk in self.stream: + # Extract and accumulate content + content = self._extract_content(chunk) + if content: + self.accumulated_content += content + + # Update span with chunk data + attributes = self.handler( + chunk, + self.chunk_index, + self.accumulated_content + ) + + for key, value in attributes.items(): + self.span.set_attribute(key, value) + + self.chunk_index += 1 + yield chunk + + except Exception as e: + SpanLifecycleManager.record_exception(self.span, e) + raise + finally: + # Final attributes + self.span.set_attribute("stream.total_chunks", self.chunk_index) + self.span.set_attribute("stream.final_content", self.accumulated_content) + SpanLifecycleManager.set_success_status(self.span) + + def _extract_content(self, chunk): + """Extract content from a chunk - implement based on provider format.""" + # Example implementation + if hasattr(chunk, 'choices') and chunk.choices: + delta = chunk.choices[0].delta + if hasattr(delta, 'content'): + return delta.content + return "" +``` + +### Step 4: Set Up Module Structure + +Create the module structure: + +``` +agentops/instrumentation/your_provider/ +├── __init__.py +├── instrumentor.py +├── attributes.py +├── stream_wrapper.py (if streaming is supported) +└── README.md +``` + +`__init__.py`: +```python +"""YourProvider API instrumentation.""" + +from agentops.logging import logger + + +def get_version() -> str: + """Get the version of YourProvider SDK.""" + try: + from importlib.metadata import version + return version("your-provider") + except ImportError: + logger.debug("Could not find YourProvider SDK version") + return "unknown" + + +LIBRARY_NAME = "your_provider" +LIBRARY_VERSION = get_version() + +from agentops.instrumentation.your_provider.instrumentor import YourProviderInstrumentor + +__all__ = [ + "LIBRARY_NAME", + "LIBRARY_VERSION", + "YourProviderInstrumentor", +] +``` + +## Testing Methodologies + +### Unit Tests + +Create `tests/instrumentation/test_your_provider.py`: + +```python +import pytest +from unittest.mock import Mock, patch +from agentops.instrumentation.your_provider import YourProviderInstrumentor +from agentops.instrumentation.your_provider.attributes import get_chat_attributes + + +class TestYourProviderInstrumentor: + """Test suite for YourProvider instrumentor.""" + + def test_initialization(self): + """Test instrumentor initialization.""" + instrumentor = YourProviderInstrumentor( + enrich_messages=True, + enrich_token_usage=False + ) + assert instrumentor.library_name == "your_provider" + assert instrumentor.supports_streaming is True + + def test_wrapped_methods(self): + """Test wrapped methods configuration.""" + instrumentor = YourProviderInstrumentor() + methods = instrumentor.wrapped_methods + + # Verify expected methods are configured + method_names = [(m.class_name, m.method_name) for m in methods] + assert ("Chat", "create") in method_names + assert ("AsyncChat", "create") in method_names + + def test_attribute_extraction(self): + """Test attribute extraction from API calls.""" + # Mock request + kwargs = { + "model": "your-model-v1", + "messages": [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there!"} + ], + "temperature": 0.7, + "max_tokens": 100 + } + + # Mock response + response = Mock() + response.id = "resp-123" + response.model = "your-model-v1" + response.usage = Mock( + prompt_tokens=10, + completion_tokens=5, + total_tokens=15 + ) + + # Extract attributes + attributes = get_chat_attributes( + kwargs=kwargs, + return_value=response + ) + + # Verify request attributes + assert attributes["gen_ai.request.model"] == "your-model-v1" + assert attributes["gen_ai.request.temperature"] == 0.7 + assert attributes["gen_ai.request.max_tokens"] == 100 + + # Verify response attributes + assert attributes["gen_ai.response.id"] == "resp-123" + assert attributes["gen_ai.usage.prompt_tokens"] == 10 + assert attributes["gen_ai.usage.completion_tokens"] == 5 + assert attributes["gen_ai.usage.total_tokens"] == 15 + + @pytest.mark.asyncio + async def test_async_instrumentation(self): + """Test async method instrumentation.""" + # Test async wrapped methods work correctly + pass + + +class TestStreamingWrapper: + """Test streaming functionality.""" + + def test_stream_wrapper(self): + """Test streaming response wrapper.""" + # Mock stream chunks + chunks = [ + Mock(choices=[Mock(delta=Mock(content="Hello"))]), + Mock(choices=[Mock(delta=Mock(content=" world"))]), + Mock(choices=[Mock(delta=Mock(content="!"))]), + ] + + # Test wrapper accumulates content correctly + # Test span attributes are set + # Test error handling in stream +``` +### 1. Use Common Utilities + +Always leverage the common utilities instead of reimplementing: + +```python +# Good - uses common handler +from agentops.instrumentation.common import LLMAttributeHandler + +attributes = LLMAttributeHandler.extract_request_attributes(kwargs) + +# Bad - reimplements extraction +attributes = {} +if "model" in kwargs: + attributes["gen_ai.request.model"] = kwargs["model"] +# ... etc +``` + +### 2. Consistent Attribute Naming + +Follow OpenTelemetry semantic conventions: + +```python +# Good - uses semantic conventions +from agentops.semconv import SpanAttributes + +attributes[SpanAttributes.LLM_REQUEST_MODEL] = model + +# Bad - custom attribute names +attributes["model_name"] = model +``` +### 3. Documentation + +Document your instrumentor thoroughly: + +```python +class YourProviderInstrumentor(EnhancedBaseInstrumentor): + """Instrumentor for YourProvider API. + + This instrumentor provides OpenTelemetry instrumentation for YourProvider, + capturing request/response data, token usage, and streaming responses. + + Features: + - Automatic span creation for all API calls + - Token usage tracking + - Streaming response support + - Error tracking and retry logic + - Context propagation across async calls + + Example: + >>> from agentops.instrumentation.your_provider import YourProviderInstrumentor + >>> instrumentor = YourProviderInstrumentor(enrich_token_usage=True) + >>> instrumentor.instrument() + >>> + >>> # Your provider will now be instrumented + >>> client = YourProvider() + >>> response = client.chat.create(model="...", messages=[...]) + + Args: + enrich_token_usage: Whether to capture token usage metrics + enrich_messages: Whether to capture message content + capture_streaming: Whether to instrument streaming responses + """ +``` + +## Adding Custom Instrumentation + +If you need to add instrumentation outside the standard patterns: + +```python +from opentelemetry import trace +from agentops.instrumentation.common import SpanLifecycleManager + +def custom_operation(): + """Example of manual instrumentation.""" + tracer = trace.get_tracer("your_provider", "1.0.0") + + with tracer.start_as_current_span("custom.operation") as span: + try: + # Set custom attributes + span.set_attribute("custom.attribute", "value") + + # Your operation + result = do_something() + + # Record success + SpanLifecycleManager.set_success_status(span, "Operation completed") + + return result + + except Exception as e: + # Record error + SpanLifecycleManager.record_exception(span, e) + raise ``` +## Troubleshooting -> To add custom instrumentation, please do so in the `third_party/opentelemetry` directory. +## Contributing +When contributing a new instrumentor: +1. Follow the patterns established in this guide +2. Include comprehensive tests +3. Update the PROVIDERS dictionary +4. Submit PR with description of the provider and its features +For more information, see the [CONTRIBUTING.md](../../CONTRIBUTING.md) file. diff --git a/agentops/instrumentation/anthropic/instrumentor.py b/agentops/instrumentation/anthropic/instrumentor.py index fdaae4f33..3636e7dd8 100644 --- a/agentops/instrumentation/anthropic/instrumentor.py +++ b/agentops/instrumentation/anthropic/instrumentor.py @@ -28,14 +28,12 @@ - Maintains span context across multiple events """ -from typing import List, Collection -from opentelemetry.trace import get_tracer -from opentelemetry.instrumentation.instrumentor import BaseInstrumentor -from opentelemetry.metrics import get_meter +from typing import List, Collection, Dict, Any, Optional +from opentelemetry.trace import Tracer from wrapt import wrap_function_wrapper from agentops.logging import logger -from agentops.instrumentation.common.wrappers import WrapConfig, wrap, unwrap +from agentops.instrumentation.common import EnhancedBaseInstrumentor, WrapConfig from agentops.instrumentation.anthropic import LIBRARY_NAME, LIBRARY_VERSION from agentops.instrumentation.anthropic.attributes.message import get_message_attributes, get_completion_attributes from agentops.instrumentation.anthropic.stream_wrapper import ( @@ -44,46 +42,8 @@ ) from agentops.semconv import Meters -# Methods to wrap for instrumentation -WRAPPED_METHODS: List[WrapConfig] = [ - # Main messages.create (modern API) - WrapConfig( - trace_name="anthropic.messages.create", - package="anthropic.resources.messages", - class_name="Messages", - method_name="create", - handler=get_message_attributes, - ), - # Async variant - WrapConfig( - trace_name="anthropic.messages.create", - package="anthropic.resources.messages", - class_name="AsyncMessages", - method_name="create", - handler=get_message_attributes, - is_async=True, - ), - # Legacy completions API - WrapConfig( - trace_name="anthropic.completions.create", - package="anthropic.resources.completions", - class_name="Completions", - method_name="create", - handler=get_completion_attributes, - ), - # Async variant of legacy API - WrapConfig( - trace_name="anthropic.completions.create", - package="anthropic.resources.completions", - class_name="AsyncCompletions", - method_name="create", - handler=get_completion_attributes, - is_async=True, - ), -] - - -class AnthropicInstrumentor(BaseInstrumentor): + +class AnthropicInstrumentor(EnhancedBaseInstrumentor): """An instrumentor for Anthropic's Claude API. This class provides instrumentation for Anthropic's Claude API by wrapping key methods @@ -98,6 +58,69 @@ class AnthropicInstrumentor(BaseInstrumentor): It captures metrics including token usage, operation duration, and exceptions. """ + @property + def library_name(self) -> str: + """Return the Anthropic library name.""" + return LIBRARY_NAME + + @property + def library_version(self) -> str: + """Return the Anthropic library version.""" + return LIBRARY_VERSION + + @property + def wrapped_methods(self) -> List[WrapConfig]: + """Return all methods that should be wrapped for Anthropic instrumentation.""" + return [ + # Main messages.create (modern API) + WrapConfig( + trace_name="anthropic.messages.create", + package="anthropic.resources.messages", + class_name="Messages", + method_name="create", + handler=get_message_attributes, + ), + # Async variant + WrapConfig( + trace_name="anthropic.messages.create", + package="anthropic.resources.messages", + class_name="AsyncMessages", + method_name="create", + handler=get_message_attributes, + is_async=True, + ), + # Legacy completions API + WrapConfig( + trace_name="anthropic.completions.create", + package="anthropic.resources.completions", + class_name="Completions", + method_name="create", + handler=get_completion_attributes, + ), + # Async variant of legacy API + WrapConfig( + trace_name="anthropic.completions.create", + package="anthropic.resources.completions", + class_name="AsyncCompletions", + method_name="create", + handler=get_completion_attributes, + is_async=True, + ), + ] + + @property + def supports_streaming(self) -> bool: + """Anthropic supports streaming responses.""" + return True + + def get_streaming_wrapper(self, tracer: Tracer) -> Optional[Any]: + """Return the sync streaming wrapper for Anthropic.""" + return messages_stream_wrapper(tracer) + + def get_async_streaming_wrapper(self, tracer: Tracer) -> Optional[Any]: + """Return the async streaming wrapper for Anthropic.""" + return messages_stream_async_wrapper(tracer) + def instrumentation_dependencies(self) -> Collection[str]: """Return packages required for instrumentation. @@ -106,89 +129,47 @@ def instrumentation_dependencies(self) -> Collection[str]: """ return ["anthropic >= 0.7.0"] - def _instrument(self, **kwargs): - """Instrument the Anthropic API. - - This method wraps the key methods in the Anthropic client to capture - telemetry data for API calls. It sets up tracers, meters, and wraps the appropriate - methods for instrumentation. - - Args: - **kwargs: Configuration options for instrumentation. - """ - tracer_provider = kwargs.get("tracer_provider") - tracer = get_tracer(LIBRARY_NAME, LIBRARY_VERSION, tracer_provider) - - meter_provider = kwargs.get("meter_provider") - meter = get_meter(LIBRARY_NAME, LIBRARY_VERSION, meter_provider) - - meter.create_histogram( - name=Meters.LLM_TOKEN_USAGE, - unit="token", - description="Measures number of input and output tokens used with Anthropic models", - ) - - meter.create_histogram( - name=Meters.LLM_OPERATION_DURATION, - unit="s", - description="Anthropic API operation duration", - ) - - meter.create_counter( - name=Meters.LLM_COMPLETIONS_EXCEPTIONS, - unit="time", - description="Number of exceptions occurred during Anthropic completions", - ) - - # Standard method wrapping approach - # Uses the common wrappers module to wrap methods with tracers - for wrap_config in WRAPPED_METHODS: - try: - wrap(wrap_config, tracer) - except (AttributeError, ModuleNotFoundError): - logger.debug(f"Could not wrap {wrap_config.package}.{wrap_config.class_name}.{wrap_config.method_name}") - - # Special handling for streaming responses - # Uses direct wrapt.wrap_function_wrapper for stream methods - # This approach captures events as they arrive rather than waiting for completion + def _create_provider_metrics(self, meter) -> Dict[str, Any]: + """Create Anthropic-specific metrics beyond the common ones.""" + return { + "completion_exception_counter": meter.create_counter( + name=Meters.LLM_ANTHROPIC_COMPLETION_EXCEPTIONS, + unit="time", + description="Number of exceptions occurred during Anthropic completions", + ), + } + + def _apply_streaming_wrappers(self, tracer: Tracer): + """Apply Anthropic-specific streaming wrappers.""" try: - wrap_function_wrapper( - "anthropic.resources.messages.messages", - "Messages.stream", - messages_stream_wrapper(tracer), - ) - - wrap_function_wrapper( - "anthropic.resources.messages.messages", - "AsyncMessages.stream", - messages_stream_async_wrapper(tracer), - ) - except (AttributeError, ModuleNotFoundError): - logger.debug("Failed to wrap Anthropic streaming methods") - - def _uninstrument(self, **kwargs): - """Remove instrumentation from Anthropic API. - - This method unwraps all methods that were wrapped during instrumentation, - restoring the original behavior of the Anthropic API. - - Args: - **kwargs: Configuration options for uninstrumentation. - """ - # Unwrap standard methods - for wrap_config in WRAPPED_METHODS: - try: - unwrap(wrap_config) - except Exception: - logger.debug( - f"Failed to unwrap {wrap_config.package}.{wrap_config.class_name}.{wrap_config.method_name}" + # Get the wrappers + sync_wrapper = self.get_streaming_wrapper(tracer) + async_wrapper = self.get_async_streaming_wrapper(tracer) + + # Apply sync streaming wrapper + if sync_wrapper: + wrap_function_wrapper( + "anthropic.resources.messages.messages", + "Messages.stream", + sync_wrapper, + ) + + # Apply async streaming wrapper + if async_wrapper: + wrap_function_wrapper( + "anthropic.resources.messages.messages", + "AsyncMessages.stream", + async_wrapper, ) + except (AttributeError, ModuleNotFoundError) as e: + logger.debug(f"Failed to wrap Anthropic streaming methods: {e}") - # Unwrap streaming methods + def _remove_streaming_wrappers(self): + """Remove Anthropic-specific streaming wrappers.""" try: from opentelemetry.instrumentation.utils import unwrap as otel_unwrap otel_unwrap("anthropic.resources.messages.messages", "Messages.stream") otel_unwrap("anthropic.resources.messages.messages", "AsyncMessages.stream") - except (AttributeError, ModuleNotFoundError): - logger.debug("Failed to unwrap Anthropic streaming methods") + except (AttributeError, ModuleNotFoundError) as e: + logger.debug(f"Failed to unwrap Anthropic streaming methods: {e}") diff --git a/agentops/instrumentation/common/__init__.py b/agentops/instrumentation/common/__init__.py index 144fa48e4..9adea2975 100644 --- a/agentops/instrumentation/common/__init__.py +++ b/agentops/instrumentation/common/__init__.py @@ -1,4 +1,63 @@ +"""Common utilities for OpenTelemetry instrumentation. + +This module provides shared utilities for all AgentOps instrumentors: +- Base instrumentor with common patterns +- Context management and propagation +- Span lifecycle management +- Enhanced attribute handling +- Wrapper utilities +""" + +# Existing exports from .attributes import AttributeMap, _extract_attributes_from_mapping -from .wrappers import _with_tracer_wrapper +from .wrappers import _with_tracer_wrapper, WrapConfig, wrap, unwrap +from .objects import get_uploaded_object_attributes + +# New exports +from .base_instrumentor import EnhancedBaseInstrumentor +from .context import ContextManager, SpanManager, global_context_manager +from .span_lifecycle import ( + SpanLifecycleManager, + TimingManager, + RetryHandler, + span_error_handler, + async_span_error_handler, +) +from .attribute_handlers import ( + AttributeExtractor, + LLMAttributeHandler, + MessageAttributeHandler, + StreamingAttributeHandler, + create_composite_handler, + with_attribute_filter, +) -__all__ = ["AttributeMap", "_extract_attributes_from_mapping", "_with_tracer_wrapper"] +__all__ = [ + # Existing + "AttributeMap", + "_extract_attributes_from_mapping", + "_with_tracer_wrapper", + "WrapConfig", + "wrap", + "unwrap", + "get_uploaded_object_attributes", + # New base instrumentor + "EnhancedBaseInstrumentor", + # Context management + "ContextManager", + "SpanManager", + "global_context_manager", + # Span lifecycle + "SpanLifecycleManager", + "TimingManager", + "RetryHandler", + "span_error_handler", + "async_span_error_handler", + # Attribute handlers + "AttributeExtractor", + "LLMAttributeHandler", + "MessageAttributeHandler", + "StreamingAttributeHandler", + "create_composite_handler", + "with_attribute_filter", +] diff --git a/agentops/instrumentation/common/attribute_handlers.py b/agentops/instrumentation/common/attribute_handlers.py new file mode 100644 index 000000000..4edd98781 --- /dev/null +++ b/agentops/instrumentation/common/attribute_handlers.py @@ -0,0 +1,389 @@ +"""Enhanced attribute handling utilities for OpenTelemetry instrumentation. + +This module provides advanced utilities for extracting and formatting attributes +from various data sources, including: +- LLM request/response data +- Tool invocations +- Message content +- Token usage metrics +- Error information +""" + +from typing import Any, Dict, List, Optional, Callable, Tuple +from functools import wraps + +from agentops.instrumentation.common.attributes import AttributeMap, _extract_attributes_from_mapping +from agentops.helpers import safe_serialize +from agentops.semconv import SpanAttributes, MessageAttributes, LLMRequestTypeValues +from agentops.logging import logger + + +class AttributeExtractor: + """Base class for attribute extraction with common patterns.""" + + @staticmethod + def extract_safely( + source: Any, + attribute_map: AttributeMap, + prefix: Optional[str] = None, + serializer: Callable[[Any], str] = safe_serialize, + ) -> AttributeMap: + """Safely extract attributes with error handling. + + Args: + source: The source object to extract from + attribute_map: Mapping of target to source attributes + prefix: Optional prefix to add to all keys + serializer: Function to serialize complex values + + Returns: + Extracted attributes + """ + try: + attributes = _extract_attributes_from_mapping(source, attribute_map) + + if prefix: + attributes = {f"{prefix}.{k}": v for k, v in attributes.items()} + + return attributes + except Exception as e: + logger.debug(f"Error extracting attributes: {e}") + return {} + + @staticmethod + def merge_attributes(*attribute_dicts: AttributeMap) -> AttributeMap: + """Merge multiple attribute dictionaries. + + Args: + *attribute_dicts: Dictionaries to merge + + Returns: + Merged attributes + """ + result = {} + for attrs in attribute_dicts: + if attrs: + result.update(attrs) + return result + + +class LLMAttributeHandler: + """Common attribute handling for LLM requests and responses.""" + + # Common request attribute mappings + REQUEST_ATTRIBUTES: AttributeMap = { + SpanAttributes.LLM_REQUEST_MODEL: "model", + SpanAttributes.LLM_REQUEST_MAX_TOKENS: "max_tokens", + SpanAttributes.LLM_REQUEST_TEMPERATURE: "temperature", + SpanAttributes.LLM_REQUEST_TOP_P: "top_p", + SpanAttributes.LLM_REQUEST_TOP_K: "top_k", + SpanAttributes.LLM_REQUEST_SEED: "seed", + SpanAttributes.LLM_REQUEST_STOP_SEQUENCES: "stop", + SpanAttributes.LLM_REQUEST_FREQUENCY_PENALTY: "frequency_penalty", + SpanAttributes.LLM_REQUEST_PRESENCE_PENALTY: "presence_penalty", + SpanAttributes.LLM_REQUEST_STREAMING: "stream", + } + + # Common response attribute mappings + RESPONSE_ATTRIBUTES: AttributeMap = { + SpanAttributes.LLM_RESPONSE_MODEL: "model", + SpanAttributes.LLM_RESPONSE_ID: "id", + SpanAttributes.LLM_RESPONSE_FINISH_REASON: "finish_reason", + } + + # Common usage attribute mappings + USAGE_ATTRIBUTES: AttributeMap = { + SpanAttributes.LLM_USAGE_PROMPT_TOKENS: "prompt_tokens", + SpanAttributes.LLM_USAGE_COMPLETION_TOKENS: "completion_tokens", + SpanAttributes.LLM_USAGE_TOTAL_TOKENS: "total_tokens", + } + + @classmethod + def extract_request_attributes( + cls, kwargs: Dict[str, Any], additional_mappings: Optional[AttributeMap] = None + ) -> AttributeMap: + """Extract standard LLM request attributes. + + Args: + kwargs: Request keyword arguments + additional_mappings: Provider-specific mappings to include + + Returns: + Extracted attributes + """ + # Merge standard and additional mappings + mappings = cls.REQUEST_ATTRIBUTES.copy() + if additional_mappings: + mappings.update(additional_mappings) + + attributes = AttributeExtractor.extract_safely(kwargs, mappings) + + # Determine request type + if "messages" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_TYPE] = LLMRequestTypeValues.CHAT.value + elif "prompt" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_TYPE] = LLMRequestTypeValues.COMPLETION.value + elif "input" in kwargs and "embedding" in str(kwargs.get("model", "")): + attributes[SpanAttributes.LLM_REQUEST_TYPE] = LLMRequestTypeValues.EMBEDDING.value + + return attributes + + @classmethod + def extract_response_attributes( + cls, response: Any, additional_mappings: Optional[AttributeMap] = None + ) -> AttributeMap: + """Extract standard LLM response attributes. + + Args: + response: The LLM response object + additional_mappings: Provider-specific mappings to include + + Returns: + Extracted attributes + """ + # Merge standard and additional mappings + mappings = cls.RESPONSE_ATTRIBUTES.copy() + if additional_mappings: + mappings.update(additional_mappings) + + attributes = AttributeExtractor.extract_safely(response, mappings) + + # Extract usage if available + if hasattr(response, "usage") and response.usage: + usage_attrs = AttributeExtractor.extract_safely(response.usage, cls.USAGE_ATTRIBUTES) + attributes.update(usage_attrs) + + return attributes + + @classmethod + def extract_token_usage(cls, usage_data: Any, additional_mappings: Optional[AttributeMap] = None) -> AttributeMap: + """Extract token usage attributes. + + Args: + usage_data: Usage data object + additional_mappings: Provider-specific usage mappings + + Returns: + Extracted usage attributes + """ + mappings = cls.USAGE_ATTRIBUTES.copy() + if additional_mappings: + mappings.update(additional_mappings) + + return AttributeExtractor.extract_safely(usage_data, mappings) + + +class MessageAttributeHandler: + """Common attribute handling for message content.""" + + @staticmethod + def extract_messages(messages: List[Dict[str, Any]], attribute_type: str = "prompt") -> AttributeMap: + """Extract attributes from message lists. + + Args: + messages: List of message dictionaries + attribute_type: Type of attributes ("prompt" or "completion") + + Returns: + Extracted message attributes + """ + attributes = {} + + for i, message in enumerate(messages): + if attribute_type == "prompt": + base_attrs = { + MessageAttributes.PROMPT_ROLE.format(i=i): "role", + MessageAttributes.PROMPT_CONTENT.format(i=i): "content", + } + else: + base_attrs = { + MessageAttributes.COMPLETION_ROLE.format(i=i): "role", + MessageAttributes.COMPLETION_CONTENT.format(i=i): "content", + } + + msg_attrs = AttributeExtractor.extract_safely(message, base_attrs) + attributes.update(msg_attrs) + + # Handle tool calls if present + if "tool_calls" in message and message["tool_calls"]: + tool_attrs = MessageAttributeHandler._extract_tool_calls(message["tool_calls"], i, attribute_type) + attributes.update(tool_attrs) + + return attributes + + @staticmethod + def _extract_tool_calls(tool_calls: List[Dict[str, Any]], message_index: int, attribute_type: str) -> AttributeMap: + """Extract attributes from tool calls. + + Args: + tool_calls: List of tool call dictionaries + message_index: Index of the parent message + attribute_type: Type of attributes + + Returns: + Extracted tool call attributes + """ + attributes = {} + + for j, tool_call in enumerate(tool_calls): + if attribute_type == "prompt": + tool_attrs_map = { + MessageAttributes.TOOL_CALL_ID.format(i=message_index): "id", + MessageAttributes.TOOL_CALL_TYPE.format(i=message_index): "type", + MessageAttributes.TOOL_CALL_NAME.format(i=message_index): "name", + MessageAttributes.TOOL_CALL_ARGUMENTS.format(i=message_index): "arguments", + } + else: + tool_attrs_map = { + MessageAttributes.COMPLETION_TOOL_CALL_ID.format(i=message_index, j=j): "id", + MessageAttributes.COMPLETION_TOOL_CALL_TYPE.format(i=message_index, j=j): "type", + MessageAttributes.COMPLETION_TOOL_CALL_NAME.format(i=message_index, j=j): "name", + MessageAttributes.COMPLETION_TOOL_CALL_ARGUMENTS.format(i=message_index, j=j): "arguments", + } + + tool_attrs = AttributeExtractor.extract_safely(tool_call, tool_attrs_map) + + # Handle function details if present + if "function" in tool_call: + func_attrs = AttributeExtractor.extract_safely( + tool_call["function"], {"name": "name", "arguments": "arguments"} + ) + # Update the attributes with function details + for key, value in func_attrs.items(): + if key == "name" and attribute_type == "completion": + attributes[MessageAttributes.COMPLETION_TOOL_CALL_NAME.format(i=message_index, j=j)] = value + elif key == "arguments" and attribute_type == "completion": + attributes[ + MessageAttributes.COMPLETION_TOOL_CALL_ARGUMENTS.format(i=message_index, j=j) + ] = value + + attributes.update(tool_attrs) + + return attributes + + +class StreamingAttributeHandler: + """Common attribute handling for streaming responses.""" + + @staticmethod + def create_streaming_handler(span_attribute_prefix: str = "stream") -> Callable: + """Create a handler for streaming attributes. + + Args: + span_attribute_prefix: Prefix for streaming attributes + + Returns: + Handler function + """ + + def handler(chunk: Any, chunk_index: int, accumulated_content: str = "") -> AttributeMap: + """Handle attributes from a streaming chunk. + + Args: + chunk: The streaming chunk + chunk_index: Index of this chunk + accumulated_content: Content accumulated so far + + Returns: + Attributes to set on the span + """ + attributes = {} + + # Track chunk metadata + attributes[f"{span_attribute_prefix}.chunk_index"] = chunk_index + + # Extract content from chunk + if hasattr(chunk, "choices") and chunk.choices: + choice = chunk.choices[0] + if hasattr(choice, "delta"): + delta = choice.delta + if hasattr(delta, "content") and delta.content: + attributes[f"{span_attribute_prefix}.chunk_content"] = delta.content + if hasattr(delta, "tool_calls") and delta.tool_calls: + attributes[f"{span_attribute_prefix}.has_tool_calls"] = True + + # Track accumulated content length + if accumulated_content: + attributes[f"{span_attribute_prefix}.accumulated_length"] = len(accumulated_content) + + return attributes + + return handler + + +def create_composite_handler( + *handlers: Callable[[Optional[Tuple], Optional[Dict], Optional[Any]], AttributeMap], +) -> Callable[[Optional[Tuple], Optional[Dict], Optional[Any]], AttributeMap]: + """Create a composite handler that combines multiple attribute handlers. + + Args: + *handlers: Handler functions to combine + + Returns: + Combined handler function + """ + + def composite_handler( + args: Optional[Tuple] = None, kwargs: Optional[Dict] = None, return_value: Optional[Any] = None + ) -> AttributeMap: + """Execute all handlers and merge their results. + + Args: + args: Method arguments + kwargs: Method keyword arguments + return_value: Method return value + + Returns: + Merged attributes from all handlers + """ + all_attributes = {} + + for handler in handlers: + try: + attributes = handler(args=args, kwargs=kwargs, return_value=return_value) + if attributes: + all_attributes.update(attributes) + except Exception as e: + logger.debug(f"Error in composite handler: {e}") + + return all_attributes + + return composite_handler + + +def with_attribute_filter( + handler: Callable, include_patterns: Optional[List[str]] = None, exclude_patterns: Optional[List[str]] = None +) -> Callable: + """Wrap a handler to filter attributes based on patterns. + + Args: + handler: The handler to wrap + include_patterns: Patterns to include (if None, include all) + exclude_patterns: Patterns to exclude + + Returns: + Wrapped handler + """ + + @wraps(handler) + def filtered_handler(*args, **kwargs) -> AttributeMap: + attributes = handler(*args, **kwargs) + + if not attributes: + return attributes + + # Apply include filter + if include_patterns: + filtered = {} + for key, value in attributes.items(): + if any(pattern in key for pattern in include_patterns): + filtered[key] = value + attributes = filtered + + # Apply exclude filter + if exclude_patterns: + attributes = {k: v for k, v in attributes.items() if not any(pattern in k for pattern in exclude_patterns)} + + return attributes + + return filtered_handler diff --git a/agentops/instrumentation/common/base_instrumentor.py b/agentops/instrumentation/common/base_instrumentor.py new file mode 100644 index 000000000..913f01620 --- /dev/null +++ b/agentops/instrumentation/common/base_instrumentor.py @@ -0,0 +1,210 @@ +"""Enhanced base instrumentor with common initialization and lifecycle management. + +This module provides an enhanced base instrumentor class that abstracts common +patterns found across all AgentOps instrumentors, including: +- Tracer and meter initialization +- Common metric definitions +- Method wrapping with error handling +- Streaming support utilities +- Standard uninstrumentation logic +""" + +from typing import List, Dict, Optional, Any, Callable +from abc import abstractmethod +import logging + +from opentelemetry.trace import get_tracer, Tracer +from opentelemetry.metrics import get_meter, Meter +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor + +from agentops.instrumentation.common.wrappers import WrapConfig, wrap, unwrap +from agentops.semconv import Meters + +logger = logging.getLogger(__name__) + + +class EnhancedBaseInstrumentor(BaseInstrumentor): + """Enhanced base instrumentor with common functionality for all AgentOps instrumentors. + + This class provides: + - Automatic tracer and meter initialization + - Common metric creation based on provider type + - Standardized method wrapping with error handling + - Built-in streaming support + - Consistent uninstrumentation + + Subclasses must implement: + - library_name: Property returning the library name (e.g., "openai") + - library_version: Property returning the library version + - wrapped_methods: Property returning list of WrapConfig objects + - instrumentation_dependencies: Method returning required packages + """ + + def __init__(self): + """Initialize the enhanced base instrumentor.""" + super().__init__() + self._tracer: Optional[Tracer] = None + self._meter: Optional[Meter] = None + self._metrics: Dict[str, Any] = {} + self._wrapped_methods_cache: Optional[List[WrapConfig]] = None + + @property + @abstractmethod + def library_name(self) -> str: + """Return the name of the library being instrumented.""" + pass + + @property + @abstractmethod + def library_version(self) -> str: + """Return the version of the library being instrumented.""" + pass + + @property + @abstractmethod + def wrapped_methods(self) -> List[WrapConfig]: + """Return list of methods to be wrapped for instrumentation.""" + pass + + @property + def supports_streaming(self) -> bool: + """Whether this instrumentor supports streaming responses.""" + return False + + def get_streaming_wrapper(self, tracer: Tracer) -> Optional[Callable]: + """Return streaming wrapper function if supported.""" + return None + + def get_async_streaming_wrapper(self, tracer: Tracer) -> Optional[Callable]: + """Return async streaming wrapper function if supported.""" + return None + + def _instrument(self, **kwargs): + """Instrument the target library with common initialization.""" + # Initialize tracer + tracer_provider = kwargs.get("tracer_provider") + self._tracer = get_tracer(self.library_name, self.library_version, tracer_provider) + + # Initialize meter and metrics + meter_provider = kwargs.get("meter_provider") + self._meter = get_meter(self.library_name, self.library_version, meter_provider) + self._metrics = self._create_metrics(self._meter) + + # Cache wrapped methods for uninstrumentation + self._wrapped_methods_cache = self.wrapped_methods + + # Apply standard method wrapping + self._wrap_methods(self._wrapped_methods_cache, self._tracer) + + # Apply streaming wrappers if supported + if self.supports_streaming: + self._apply_streaming_wrappers(self._tracer) + + # Call provider-specific initialization if needed + self._instrument_provider(**kwargs) + + def _uninstrument(self, **kwargs): + """Remove instrumentation with common cleanup.""" + # Unwrap standard methods + if self._wrapped_methods_cache: + self._unwrap_methods(self._wrapped_methods_cache) + + # Remove streaming wrappers if supported + if self.supports_streaming: + self._remove_streaming_wrappers() + + # Call provider-specific cleanup if needed + self._uninstrument_provider(**kwargs) + + # Clear references + self._tracer = None + self._meter = None + self._metrics.clear() + self._wrapped_methods_cache = None + + def _create_metrics(self, meter: Meter) -> Dict[str, Any]: + """Create common metrics for LLM instrumentation.""" + metrics = {} + + # Common LLM metrics (OpenTelemetry GenAI standard) + metrics["token_usage_histogram"] = meter.create_histogram( + name=Meters.LLM_TOKEN_USAGE, + unit="token", + description=f"Measures number of input and output tokens used with {self.library_name}", + ) + + metrics["operation_duration_histogram"] = meter.create_histogram( + name=Meters.LLM_OPERATION_DURATION, unit="s", description=f"{self.library_name} operation duration" + ) + + metrics["generation_choices_counter"] = meter.create_counter( + name=Meters.LLM_GENERATION_CHOICES, + unit="choice", + description=f"Number of choices returned by {self.library_name} completions", + ) + + # Provider-specific metrics + provider_metrics = self._create_provider_metrics(meter) + metrics.update(provider_metrics) + + return metrics + + def _create_provider_metrics(self, meter: Meter) -> Dict[str, Any]: + """Create provider-specific metrics. Override in subclasses.""" + return {} + + def _wrap_methods(self, methods: List[WrapConfig], tracer: Tracer): + """Wrap methods with consistent error handling.""" + for wrap_config in methods: + try: + wrap(wrap_config, tracer) + logger.debug(f"Successfully wrapped {wrap_config}") + except (AttributeError, ModuleNotFoundError) as e: + logger.debug( + f"Could not wrap {wrap_config.package}.{wrap_config.class_name}." f"{wrap_config.method_name}: {e}" + ) + except Exception as e: + logger.warning(f"Unexpected error wrapping {wrap_config}: {e}") + + def _unwrap_methods(self, methods: List[WrapConfig]): + """Unwrap methods with consistent error handling.""" + for wrap_config in methods: + try: + unwrap(wrap_config) + logger.debug(f"Successfully unwrapped {wrap_config}") + except Exception as e: + logger.debug( + f"Failed to unwrap {wrap_config.package}.{wrap_config.class_name}." + f"{wrap_config.method_name}: {e}" + ) + + def _apply_streaming_wrappers(self, tracer: Tracer): + """Apply streaming-specific wrappers. Override in subclasses that support streaming.""" + pass + + def _remove_streaming_wrappers(self): + """Remove streaming-specific wrappers. Override in subclasses that support streaming.""" + pass + + def _instrument_provider(self, **kwargs): + """Provider-specific instrumentation. Override in subclasses if needed.""" + pass + + def _uninstrument_provider(self, **kwargs): + """Provider-specific uninstrumentation. Override in subclasses if needed.""" + pass + + @property + def tracer(self) -> Optional[Tracer]: + """Get the initialized tracer.""" + return self._tracer + + @property + def meter(self) -> Optional[Meter]: + """Get the initialized meter.""" + return self._meter + + @property + def metrics(self) -> Dict[str, Any]: + """Get the created metrics.""" + return self._metrics diff --git a/agentops/instrumentation/common/context.py b/agentops/instrumentation/common/context.py new file mode 100644 index 000000000..7cb68c65f --- /dev/null +++ b/agentops/instrumentation/common/context.py @@ -0,0 +1,216 @@ +"""Common context management utilities for OpenTelemetry instrumentation. + +This module provides utilities for managing OpenTelemetry context propagation +across different execution contexts, including: +- Context storage and retrieval +- Parent-child span relationships +- Trace continuity across async boundaries +- Context preservation in callbacks +""" + +import weakref +from typing import Optional, Any, Dict +from contextlib import contextmanager + +from opentelemetry import context as context_api +from opentelemetry import trace +from opentelemetry.trace import Span, Context, format_trace_id + +from agentops.logging import logger + + +class ContextManager: + """Manages OpenTelemetry context storage and propagation. + + This class provides thread-safe context management for maintaining + span relationships across different execution contexts. + """ + + def __init__(self): + """Initialize the context manager with weak reference dictionaries.""" + # Use weakref to prevent memory leaks + self._span_contexts: weakref.WeakKeyDictionary = weakref.WeakKeyDictionary() + self._trace_root_contexts: weakref.WeakKeyDictionary = weakref.WeakKeyDictionary() + self._object_spans: weakref.WeakKeyDictionary = weakref.WeakKeyDictionary() + + def store_span_context(self, key: Any, span: Span) -> Context: + """Store a span's context for future reference. + + Args: + key: The object to associate with this context + span: The span whose context to store + + Returns: + The stored context + """ + context = trace.set_span_in_context(span) + self._span_contexts[key] = context + return context + + def store_trace_root_context(self, key: Any, context: Context): + """Store a trace's root context for maintaining trace continuity. + + Args: + key: The trace object to associate with this context + context: The root context to store + """ + self._trace_root_contexts[key] = context + + def get_parent_context(self, key: Any, fallback_to_current: bool = True) -> Optional[Context]: + """Get the parent context for a given key. + + Args: + key: The object whose parent context to retrieve + fallback_to_current: Whether to fallback to current context if not found + + Returns: + The parent context or current context if fallback is True + """ + # First check if this object has a specific context + if key in self._span_contexts: + return self._span_contexts[key] + + # Then check if it has a trace root context + if key in self._trace_root_contexts: + return self._trace_root_contexts[key] + + # Fallback to current context if requested + if fallback_to_current: + return context_api.get_current() + + return None + + def associate_span_with_object(self, obj: Any, span: Span): + """Associate a span with an object for lifecycle tracking. + + Args: + obj: The object to associate with the span + span: The span to associate + """ + self._object_spans[obj] = span + + def get_span_for_object(self, obj: Any) -> Optional[Span]: + """Get the span associated with an object. + + Args: + obj: The object whose span to retrieve + + Returns: + The associated span or None + """ + return self._object_spans.get(obj) + + def clear_context(self, key: Any): + """Clear all stored contexts for a given key. + + Args: + key: The object whose contexts to clear + """ + self._span_contexts.pop(key, None) + self._trace_root_contexts.pop(key, None) + self._object_spans.pop(key, None) + + @contextmanager + def preserve_context(self, context: Optional[Context] = None): + """Context manager to preserve OpenTelemetry context. + + Args: + context: The context to preserve (uses current if None) + + Yields: + The preserved context + """ + if context is None: + context = context_api.get_current() + + token = context_api.attach(context) + try: + yield context + finally: + context_api.detach(token) + + def debug_trace_info(self, span: Optional[Span] = None, label: str = ""): + """Log debug information about the current trace. + + Args: + span: The span to debug (uses current if None) + label: A label to include in the debug output + """ + if span is None: + span = trace.get_current_span() + + span_context = span.get_span_context() + trace_id = format_trace_id(span_context.trace_id) + span_id = f"{span_context.span_id:016x}" + + logger.debug( + f"Trace Debug {label}: " + f"trace_id={trace_id}, " + f"span_id={span_id}, " + f"is_valid={span_context.is_valid}, " + f"is_recording={span.is_recording()}" + ) + + +class SpanManager: + """Utilities for creating and managing spans with consistent patterns.""" + + @staticmethod + def create_child_span( + tracer: trace.Tracer, + name: str, + parent_context: Optional[Context] = None, + kind: trace.SpanKind = trace.SpanKind.CLIENT, + attributes: Optional[Dict[str, Any]] = None, + ) -> Span: + """Create a child span with proper context propagation. + + Args: + tracer: The tracer to use for span creation + name: The name of the span + parent_context: The parent context (uses current if None) + kind: The kind of span to create + attributes: Initial attributes to set on the span + + Returns: + The created span + """ + if parent_context is None: + parent_context = context_api.get_current() + + with tracer.start_as_current_span(name=name, context=parent_context, kind=kind, attributes=attributes) as span: + return span + + @staticmethod + @contextmanager + def managed_span( + tracer: trace.Tracer, + name: str, + context_manager: ContextManager, + context_key: Any, + kind: trace.SpanKind = trace.SpanKind.CLIENT, + attributes: Optional[Dict[str, Any]] = None, + ): + """Create a managed span that automatically handles context storage. + + Args: + tracer: The tracer to use for span creation + name: The name of the span + context_manager: The context manager for storing contexts + context_key: The key to associate with this span's context + kind: The kind of span to create + attributes: Initial attributes to set on the span + + Yields: + The created span + """ + parent_context = context_manager.get_parent_context(context_key) + + with tracer.start_as_current_span(name=name, context=parent_context, kind=kind, attributes=attributes) as span: + # Store the span's context for future reference + context_manager.store_span_context(context_key, span) + yield span + + +# Global context manager instance for shared use +global_context_manager = ContextManager() diff --git a/agentops/instrumentation/common/span_lifecycle.py b/agentops/instrumentation/common/span_lifecycle.py new file mode 100644 index 000000000..2c0298c6c --- /dev/null +++ b/agentops/instrumentation/common/span_lifecycle.py @@ -0,0 +1,295 @@ +"""Common span lifecycle management utilities for OpenTelemetry instrumentation. + +This module provides utilities for managing span lifecycle events including: +- Consistent error handling and recording +- Span status management +- Event recording patterns +- Retry and timeout handling +""" + +from typing import Optional, Any, Dict, Callable, TypeVar +from functools import wraps +import time + +from opentelemetry.trace import Span, Status, StatusCode +from opentelemetry import trace + +from agentops.logging import logger +from agentops.semconv import CoreAttributes + +T = TypeVar("T") + + +class SpanLifecycleManager: + """Manages span lifecycle events with consistent patterns.""" + + @staticmethod + def record_exception( + span: Span, exception: Exception, attributes: Optional[Dict[str, Any]] = None, escaped: bool = True + ): + """Record an exception on a span with consistent formatting. + + Args: + span: The span to record the exception on + exception: The exception to record + attributes: Additional attributes to record with the exception + escaped: Whether the exception escaped the span scope + """ + # Record the exception with OpenTelemetry + span.record_exception(exception, attributes=attributes, escaped=escaped) + + # Set error attributes following semantic conventions + span.set_attribute(CoreAttributes.ERROR_TYPE, type(exception).__name__) + span.set_attribute(CoreAttributes.ERROR_MESSAGE, str(exception)) + + # Set span status to error + span.set_status(Status(StatusCode.ERROR, str(exception))) + + # Log for debugging + logger.debug(f"Recorded exception on span {span.name}: {exception}") + + @staticmethod + def set_success_status(span: Span, message: Optional[str] = None): + """Set a span's status to success with optional message. + + Args: + span: The span to update + message: Optional success message + """ + if message: + span.set_status(Status(StatusCode.OK, message)) + else: + span.set_status(Status(StatusCode.OK)) + + @staticmethod + def add_event(span: Span, name: str, attributes: Optional[Dict[str, Any]] = None, timestamp: Optional[int] = None): + """Add an event to a span with consistent formatting. + + Args: + span: The span to add the event to + name: The name of the event + attributes: Event attributes + timestamp: Optional timestamp (uses current time if None) + """ + span.add_event(name, attributes=attributes, timestamp=timestamp) + logger.debug(f"Added event '{name}' to span {span.name}") + + @staticmethod + def with_error_handling( + span: Span, operation: Callable[[], T], error_message: str = "Operation failed", reraise: bool = True + ) -> Optional[T]: + """Execute an operation with consistent error handling. + + Args: + span: The span to record errors on + operation: The operation to execute + error_message: Message to use for error status + reraise: Whether to reraise exceptions + + Returns: + The operation result or None if error occurred and reraise=False + """ + try: + result = operation() + return result + except Exception as e: + SpanLifecycleManager.record_exception(span, e) + logger.error(f"{error_message}: {e}") + if reraise: + raise + return None + + @staticmethod + async def with_error_handling_async( + span: Span, operation: Callable[[], T], error_message: str = "Operation failed", reraise: bool = True + ) -> Optional[T]: + """Execute an async operation with consistent error handling. + + Args: + span: The span to record errors on + operation: The async operation to execute + error_message: Message to use for error status + reraise: Whether to reraise exceptions + + Returns: + The operation result or None if error occurred and reraise=False + """ + try: + result = await operation() + return result + except Exception as e: + SpanLifecycleManager.record_exception(span, e) + logger.error(f"{error_message}: {e}") + if reraise: + raise + return None + + +def span_error_handler(error_message: str = "Operation failed", reraise: bool = True, record_on_span: bool = True): + """Decorator for consistent error handling in span operations. + + Args: + error_message: Base error message + reraise: Whether to reraise exceptions + record_on_span: Whether to record exception on current span + + Returns: + Decorator function + """ + + def decorator(func: Callable[..., T]) -> Callable[..., T]: + @wraps(func) + def wrapper(*args, **kwargs) -> T: + try: + return func(*args, **kwargs) + except Exception as e: + if record_on_span: + current_span = trace.get_current_span() + if current_span and current_span.is_recording(): + SpanLifecycleManager.record_exception(current_span, e) + + logger.error(f"{error_message} in {func.__name__}: {e}") + + if reraise: + raise + return None + + return wrapper + + return decorator + + +def async_span_error_handler( + error_message: str = "Operation failed", reraise: bool = True, record_on_span: bool = True +): + """Async decorator for consistent error handling in span operations. + + Args: + error_message: Base error message + reraise: Whether to reraise exceptions + record_on_span: Whether to record exception on current span + + Returns: + Decorator function + """ + + def decorator(func: Callable[..., T]) -> Callable[..., T]: + @wraps(func) + async def wrapper(*args, **kwargs) -> T: + try: + return await func(*args, **kwargs) + except Exception as e: + if record_on_span: + current_span = trace.get_current_span() + if current_span and current_span.is_recording(): + SpanLifecycleManager.record_exception(current_span, e) + + logger.error(f"{error_message} in {func.__name__}: {e}") + + if reraise: + raise + return None + + return wrapper + + return decorator + + +class TimingManager: + """Utilities for managing timing and performance metrics.""" + + @staticmethod + def measure_duration(span: Span, attribute_name: str): + """Context manager to measure operation duration. + + Args: + span: The span to add the duration attribute to + attribute_name: The name of the duration attribute + """ + + class DurationContext: + def __enter__(self): + self.start_time = time.time() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + duration = time.time() - self.start_time + span.set_attribute(attribute_name, duration) + return False + + return DurationContext() + + @staticmethod + def add_timing_event(span: Span, event_name: str, start_time: float, end_time: Optional[float] = None): + """Add a timing event to a span. + + Args: + span: The span to add the event to + event_name: The name of the timing event + start_time: The start time of the operation + end_time: The end time (uses current time if None) + """ + if end_time is None: + end_time = time.time() + + duration = end_time - start_time + span.add_event( + event_name, attributes={"duration_ms": duration * 1000, "start_time": start_time, "end_time": end_time} + ) + + +class RetryHandler: + """Utilities for handling retries with OpenTelemetry instrumentation.""" + + @staticmethod + def with_retry( + span: Span, + operation: Callable[[], T], + max_attempts: int = 3, + backoff_factor: float = 2.0, + initial_delay: float = 1.0, + ) -> T: + """Execute an operation with retry logic and span events. + + Args: + span: The span to record retry events on + operation: The operation to execute + max_attempts: Maximum number of attempts + backoff_factor: Factor to multiply delay by after each attempt + initial_delay: Initial delay between attempts in seconds + + Returns: + The operation result + + Raises: + The last exception if all attempts fail + """ + delay = initial_delay + last_exception = None + + for attempt in range(max_attempts): + try: + if attempt > 0: + span.add_event(f"Retry attempt {attempt + 1}", attributes={"attempt": attempt + 1, "delay": delay}) + time.sleep(delay) + + result = operation() + + if attempt > 0: + span.add_event("Retry successful", attributes={"attempt": attempt + 1}) + + return result + + except Exception as e: + last_exception = e + span.add_event( + f"Attempt {attempt + 1} failed", + attributes={"attempt": attempt + 1, "error": str(e), "error_type": type(e).__name__}, + ) + + if attempt < max_attempts - 1: + delay *= backoff_factor + else: + SpanLifecycleManager.record_exception(span, e) + + raise last_exception diff --git a/agentops/instrumentation/openai/instrumentor.py b/agentops/instrumentation/openai/instrumentor.py index 63c560d0c..34bedc4c2 100644 --- a/agentops/instrumentation/openai/instrumentor.py +++ b/agentops/instrumentation/openai/instrumentor.py @@ -12,11 +12,8 @@ and distributed tracing. """ -from typing import List, Collection -from opentelemetry.trace import get_tracer -from opentelemetry.instrumentation.instrumentor import BaseInstrumentor - -from agentops.instrumentation.common.wrappers import WrapConfig +from typing import List, Collection, Dict, Any +from agentops.instrumentation.common import EnhancedBaseInstrumentor, WrapConfig from agentops.instrumentation.openai import LIBRARY_NAME, LIBRARY_VERSION from agentops.instrumentation.openai.attributes.common import get_response_attributes from agentops.instrumentation.openai.config import Config @@ -38,8 +35,13 @@ _instruments = ("openai >= 0.27.0",) -class OpenAIInstrumentor(BaseInstrumentor): - """An instrumentor for OpenAI's client library with comprehensive coverage.""" +class OpenAIInstrumentor(EnhancedBaseInstrumentor): + """An instrumentor for OpenAI's client library with comprehensive coverage. + + This instrumentor extends the EnhancedBaseInstrumentor to provide + OpenAI-specific instrumentation with automatic metric creation, + error handling, and lifecycle management. + """ def __init__( self, @@ -59,104 +61,22 @@ def __init__( Config.upload_base64_image = upload_base64_image Config.enable_trace_context_propagation = enable_trace_context_propagation - def instrumentation_dependencies(self) -> Collection[str]: - return _instruments - - def _instrument(self, **kwargs): - """Instrument the OpenAI API.""" - if not is_openai_v1(): - # For v0, use the legacy instrumentor - OpenAIV0Instrumentor().instrument(**kwargs) - return - - # Get tracer and meter - tracer_provider = kwargs.get("tracer_provider") - tracer = get_tracer(LIBRARY_NAME, LIBRARY_VERSION, tracer_provider) - - # Define all wrapped methods - wrapped_methods = self._get_wrapped_methods() + # Cache for v0 instrumentor if needed + self._v0_instrumentor = None - # Apply all wrappers using the common wrapper infrastructure - from agentops.instrumentation.common.wrappers import wrap - - for wrap_config in wrapped_methods: - try: - wrap(wrap_config, tracer) - except (AttributeError, ModuleNotFoundError): - # Some methods may not be available in all versions - pass - - def _uninstrument(self, **kwargs): - """Remove instrumentation from OpenAI API.""" - if not is_openai_v1(): - OpenAIV0Instrumentor().uninstrument(**kwargs) - return + @property + def library_name(self) -> str: + """Return the OpenAI library name.""" + return LIBRARY_NAME - # Get all wrapped methods - wrapped_methods = self._get_wrapped_methods() - - # Remove all wrappers using the common wrapper infrastructure - from agentops.instrumentation.common.wrappers import unwrap - - for wrap_config in wrapped_methods: - try: - unwrap(wrap_config) - except Exception: - # Some methods may not be wrapped - pass - - def _init_metrics(self, meter): - """Initialize metrics for instrumentation.""" - return { - "tokens_histogram": meter.create_histogram( - name=Meters.LLM_TOKEN_USAGE, - unit="token", - description="Measures number of input and output tokens used", - ), - "chat_choice_counter": meter.create_counter( - name=Meters.LLM_GENERATION_CHOICES, - unit="choice", - description="Number of choices returned by chat completions call", - ), - "duration_histogram": meter.create_histogram( - name=Meters.LLM_OPERATION_DURATION, - unit="s", - description="GenAI operation duration", - ), - "chat_exception_counter": meter.create_counter( - name=Meters.LLM_COMPLETIONS_EXCEPTIONS, - unit="time", - description="Number of exceptions occurred during chat completions", - ), - "streaming_time_to_first_token": meter.create_histogram( - name=Meters.LLM_STREAMING_TIME_TO_FIRST_TOKEN, - unit="s", - description="Time to first token in streaming chat completions", - ), - "streaming_time_to_generate": meter.create_histogram( - name=Meters.LLM_STREAMING_TIME_TO_GENERATE, - unit="s", - description="Time between first token and completion in streaming chat completions", - ), - "embeddings_vector_size_counter": meter.create_counter( - name=Meters.LLM_EMBEDDINGS_VECTOR_SIZE, - unit="element", - description="The size of returned vector", - ), - "embeddings_exception_counter": meter.create_counter( - name=Meters.LLM_EMBEDDINGS_EXCEPTIONS, - unit="time", - description="Number of exceptions occurred during embeddings operation", - ), - "image_gen_exception_counter": meter.create_counter( - name=Meters.LLM_IMAGE_GENERATIONS_EXCEPTIONS, - unit="time", - description="Number of exceptions occurred during image generations operation", - ), - } + @property + def library_version(self) -> str: + """Return the OpenAI library version.""" + return LIBRARY_VERSION - def _get_wrapped_methods(self) -> List[WrapConfig]: - """Get all methods that should be wrapped.""" + @property + def wrapped_methods(self) -> List[WrapConfig]: + """Return all methods that should be wrapped for OpenAI instrumentation.""" wrapped_methods = [] # Chat completions @@ -331,3 +251,55 @@ def _get_wrapped_methods(self) -> List[WrapConfig]: ) return wrapped_methods + + def instrumentation_dependencies(self) -> Collection[str]: + """Return the required OpenAI package dependencies.""" + return _instruments + + def _create_provider_metrics(self, meter) -> Dict[str, Any]: + """Create OpenAI-specific metrics beyond the common ones.""" + return { + "chat_exception_counter": meter.create_counter( + name=Meters.LLM_COMPLETIONS_EXCEPTIONS, + unit="time", + description="Number of exceptions occurred during chat completions", + ), + "streaming_time_to_first_token": meter.create_histogram( + name=Meters.LLM_STREAMING_TIME_TO_FIRST_TOKEN, + unit="s", + description="Time to first token in streaming chat completions", + ), + "streaming_time_to_generate": meter.create_histogram( + name=Meters.LLM_STREAMING_TIME_TO_GENERATE, + unit="s", + description="Time between first token and completion in streaming chat completions", + ), + "embeddings_vector_size_counter": meter.create_counter( + name=Meters.LLM_EMBEDDINGS_VECTOR_SIZE, + unit="element", + description="The size of returned vector", + ), + "embeddings_exception_counter": meter.create_counter( + name=Meters.LLM_EMBEDDINGS_EXCEPTIONS, + unit="time", + description="Number of exceptions occurred during embeddings operation", + ), + "image_gen_exception_counter": meter.create_counter( + name=Meters.LLM_IMAGE_GENERATIONS_EXCEPTIONS, + unit="time", + description="Number of exceptions occurred during image generations operation", + ), + } + + def _instrument_provider(self, **kwargs): + """Handle OpenAI-specific instrumentation logic.""" + if not is_openai_v1(): + # For v0, use the legacy instrumentor + self._v0_instrumentor = OpenAIV0Instrumentor() + self._v0_instrumentor.instrument(**kwargs) + + def _uninstrument_provider(self, **kwargs): + """Handle OpenAI-specific uninstrumentation logic.""" + if self._v0_instrumentor: + self._v0_instrumentor.uninstrument(**kwargs) + self._v0_instrumentor = None diff --git a/agentops/instrumentation/openai/wrappers/chat.py b/agentops/instrumentation/openai/wrappers/chat.py index bc2be1b73..59aec8245 100644 --- a/agentops/instrumentation/openai/wrappers/chat.py +++ b/agentops/instrumentation/openai/wrappers/chat.py @@ -13,50 +13,48 @@ model_as_dict, should_send_prompts, ) -from agentops.instrumentation.common.attributes import AttributeMap +from agentops.instrumentation.common import ( + AttributeMap, + LLMAttributeHandler, + MessageAttributeHandler, + create_composite_handler, +) from agentops.semconv import SpanAttributes, LLMRequestTypeValues logger = logging.getLogger(__name__) LLM_REQUEST_TYPE = LLMRequestTypeValues.CHAT +# OpenAI-specific request attribute mappings +OPENAI_REQUEST_ATTRIBUTES: AttributeMap = { + SpanAttributes.LLM_USER: "user", + SpanAttributes.LLM_REQUEST_FUNCTIONS: "functions", +} + +# OpenAI-specific response attribute mappings +OPENAI_RESPONSE_ATTRIBUTES: AttributeMap = { + SpanAttributes.LLM_OPENAI_RESPONSE_SYSTEM_FINGERPRINT: "system_fingerprint", +} + +# OpenAI-specific usage attribute mappings +OPENAI_USAGE_ATTRIBUTES: AttributeMap = { + SpanAttributes.LLM_USAGE_REASONING_TOKENS: "output_tokens_details.reasoning_tokens", +} -def handle_chat_attributes( + +def _extract_base_attributes( args: Optional[Tuple] = None, kwargs: Optional[Dict] = None, return_value: Optional[Any] = None, ) -> AttributeMap: - """Extract attributes from chat completion calls. - - This function is designed to work with the common wrapper pattern, - extracting attributes from the method arguments and return value. - """ + """Extract base OpenAI chat attributes.""" attributes = { SpanAttributes.LLM_REQUEST_TYPE: LLM_REQUEST_TYPE.value, SpanAttributes.LLM_SYSTEM: "OpenAI", } - # Extract request attributes from kwargs + # Add streaming attribute if kwargs: - # Model - if "model" in kwargs: - attributes[SpanAttributes.LLM_REQUEST_MODEL] = kwargs["model"] - - # Request parameters - if "max_tokens" in kwargs: - attributes[SpanAttributes.LLM_REQUEST_MAX_TOKENS] = kwargs["max_tokens"] - if "temperature" in kwargs: - attributes[SpanAttributes.LLM_REQUEST_TEMPERATURE] = kwargs["temperature"] - if "top_p" in kwargs: - attributes[SpanAttributes.LLM_REQUEST_TOP_P] = kwargs["top_p"] - if "frequency_penalty" in kwargs: - attributes[SpanAttributes.LLM_REQUEST_FREQUENCY_PENALTY] = kwargs["frequency_penalty"] - if "presence_penalty" in kwargs: - attributes[SpanAttributes.LLM_REQUEST_PRESENCE_PENALTY] = kwargs["presence_penalty"] - if "user" in kwargs: - attributes[SpanAttributes.LLM_USER] = kwargs["user"] - - # Streaming attributes[SpanAttributes.LLM_REQUEST_STREAMING] = kwargs.get("stream", False) # Headers @@ -64,127 +62,141 @@ def handle_chat_attributes( if headers: attributes[SpanAttributes.LLM_REQUEST_HEADERS] = str(headers) - # Messages - if should_send_prompts() and "messages" in kwargs: - messages = kwargs["messages"] - for i, msg in enumerate(messages): - prefix = f"{SpanAttributes.LLM_PROMPTS}.{i}" - if "role" in msg: - attributes[f"{prefix}.role"] = msg["role"] - if "content" in msg: - content = msg["content"] - if isinstance(content, list): - # Handle multi-modal content - content = json.dumps(content) - attributes[f"{prefix}.content"] = content - if "tool_call_id" in msg: - attributes[f"{prefix}.tool_call_id"] = msg["tool_call_id"] - - # Tool calls - if "tool_calls" in msg: - tool_calls = msg["tool_calls"] - for j, tool_call in enumerate(tool_calls): - if is_openai_v1() and hasattr(tool_call, "__dict__"): - tool_call = model_as_dict(tool_call) - function = tool_call.get("function", {}) - attributes[f"{prefix}.tool_calls.{j}.id"] = tool_call.get("id") - attributes[f"{prefix}.tool_calls.{j}.name"] = function.get("name") - attributes[f"{prefix}.tool_calls.{j}.arguments"] = function.get("arguments") - - # Functions - if "functions" in kwargs: - functions = kwargs["functions"] - for i, function in enumerate(functions): - prefix = f"{SpanAttributes.LLM_REQUEST_FUNCTIONS}.{i}" - attributes[f"{prefix}.name"] = function.get("name") - attributes[f"{prefix}.description"] = function.get("description") - attributes[f"{prefix}.parameters"] = json.dumps(function.get("parameters")) - - # Tools - if "tools" in kwargs: - tools = kwargs["tools"] - for i, tool in enumerate(tools): - function = tool.get("function", {}) - prefix = f"{SpanAttributes.LLM_REQUEST_FUNCTIONS}.{i}" - attributes[f"{prefix}.name"] = function.get("name") - attributes[f"{prefix}.description"] = function.get("description") - attributes[f"{prefix}.parameters"] = json.dumps(function.get("parameters")) - - # Extract response attributes from return value - if return_value: - # Note: For streaming responses, return_value might be a generator/stream - # In that case, we won't have the full response data here - - # Convert to dict if needed - response_dict = {} - if hasattr(return_value, "__dict__") and not hasattr(return_value, "__iter__"): - response_dict = model_as_dict(return_value) - elif isinstance(return_value, dict): - response_dict = return_value - - # Basic response attributes - if "id" in response_dict: - attributes[SpanAttributes.LLM_RESPONSE_ID] = response_dict["id"] - if "model" in response_dict: - attributes[SpanAttributes.LLM_RESPONSE_MODEL] = response_dict["model"] - if "system_fingerprint" in response_dict: - attributes[SpanAttributes.LLM_OPENAI_RESPONSE_SYSTEM_FINGERPRINT] = response_dict["system_fingerprint"] - - # Usage - usage = response_dict.get("usage", {}) - if usage: - if is_openai_v1() and hasattr(usage, "__dict__"): - usage = usage.__dict__ - if "total_tokens" in usage: - attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = usage["total_tokens"] - if "prompt_tokens" in usage: - attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = usage["prompt_tokens"] - if "completion_tokens" in usage: - attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = usage["completion_tokens"] - - # Reasoning tokens - output_details = usage.get("output_tokens_details", {}) - if isinstance(output_details, dict) and "reasoning_tokens" in output_details: - attributes[SpanAttributes.LLM_USAGE_REASONING_TOKENS] = output_details["reasoning_tokens"] - - # Choices - if should_send_prompts() and "choices" in response_dict: - choices = response_dict["choices"] - for choice in choices: - index = choice.get("index", 0) - prefix = f"{SpanAttributes.LLM_COMPLETIONS}.{index}" + return attributes - if "finish_reason" in choice: - attributes[f"{prefix}.finish_reason"] = choice["finish_reason"] - # Content filter - if "content_filter_results" in choice: - attributes[f"{prefix}.content_filter_results"] = json.dumps(choice["content_filter_results"]) +def _extract_request_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract request attributes using common LLM handler.""" + if not kwargs: + return {} + + # Use the common LLM handler with OpenAI-specific mappings + return LLMAttributeHandler.extract_request_attributes(kwargs, additional_mappings=OPENAI_REQUEST_ATTRIBUTES) + + +def _extract_messages( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract message attributes from request and response.""" + attributes = {} + + # Extract request messages + if kwargs and should_send_prompts() and "messages" in kwargs: + messages = kwargs["messages"] + + # Convert messages to standard format + formatted_messages = [] + for msg in messages: + formatted_msg = { + "role": msg.get("role"), + "content": msg.get("content"), + } + + # Handle multi-modal content + if isinstance(formatted_msg["content"], list): + formatted_msg["content"] = json.dumps(formatted_msg["content"]) + + # Handle tool call ID + if "tool_call_id" in msg: + formatted_msg["tool_call_id"] = msg["tool_call_id"] + + # Handle tool calls + if "tool_calls" in msg: + tool_calls = [] + for tool_call in msg["tool_calls"]: + if is_openai_v1() and hasattr(tool_call, "__dict__"): + tool_call = model_as_dict(tool_call) + + function = tool_call.get("function", {}) + tool_calls.append( + { + "id": tool_call.get("id"), + "name": function.get("name"), + "arguments": function.get("arguments"), + } + ) + formatted_msg["tool_calls"] = tool_calls - # Message + formatted_messages.append(formatted_msg) + + # Use MessageAttributeHandler to extract attributes + message_attrs = MessageAttributeHandler.extract_messages(formatted_messages, attribute_type="prompt") + attributes.update(message_attrs) + + # Extract response messages (choices) + if return_value and should_send_prompts(): + response_dict = _get_response_dict(return_value) + + if "choices" in response_dict: + choices = response_dict["choices"] + + # Convert choices to message format + formatted_messages = [] + for choice in choices: message = choice.get("message", {}) if message: - if "role" in message: - attributes[f"{prefix}.role"] = message["role"] - if "content" in message: - attributes[f"{prefix}.content"] = message["content"] + formatted_msg = { + "role": message.get("role"), + "content": message.get("content"), + } + + # Add finish reason + if "finish_reason" in choice: + formatted_msg["finish_reason"] = choice["finish_reason"] + + # Add refusal if present if "refusal" in message: - attributes[f"{prefix}.refusal"] = message["refusal"] + formatted_msg["refusal"] = message["refusal"] - # Function call + # Handle function call (legacy format) if "function_call" in message: function_call = message["function_call"] - attributes[f"{prefix}.tool_calls.0.name"] = function_call.get("name") - attributes[f"{prefix}.tool_calls.0.arguments"] = function_call.get("arguments") + formatted_msg["tool_calls"] = [ + { + "name": function_call.get("name"), + "arguments": function_call.get("arguments"), + } + ] - # Tool calls - if "tool_calls" in message: - tool_calls = message["tool_calls"] - for i, tool_call in enumerate(tool_calls): + # Handle tool calls + elif "tool_calls" in message: + tool_calls = [] + for tool_call in message["tool_calls"]: function = tool_call.get("function", {}) - attributes[f"{prefix}.tool_calls.{i}.id"] = tool_call.get("id") - attributes[f"{prefix}.tool_calls.{i}.name"] = function.get("name") - attributes[f"{prefix}.tool_calls.{i}.arguments"] = function.get("arguments") + tool_calls.append( + { + "id": tool_call.get("id"), + "name": function.get("name"), + "arguments": function.get("arguments"), + } + ) + formatted_msg["tool_calls"] = tool_calls + + formatted_messages.append(formatted_msg) + + # Extract completion attributes + completion_attrs = MessageAttributeHandler.extract_messages(formatted_messages, attribute_type="completion") + + # Add any extra OpenAI-specific choice attributes + for i, choice in enumerate(choices): + # Content filter results + if "content_filter_results" in choice: + attributes[f"{SpanAttributes.LLM_COMPLETIONS}.{i}.content_filter_results"] = json.dumps( + choice["content_filter_results"] + ) + + # Refusal + message = choice.get("message", {}) + if "refusal" in message: + attributes[f"{SpanAttributes.LLM_COMPLETIONS}.{i}.refusal"] = message["refusal"] + + attributes.update(completion_attrs) # Prompt filter results if "prompt_filter_results" in response_dict: @@ -193,3 +205,84 @@ def handle_chat_attributes( ) return attributes + + +def _extract_tools_and_functions( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract tools and functions from request.""" + attributes = {} + + if not kwargs: + return attributes + + # Extract functions + if "functions" in kwargs: + functions = kwargs["functions"] + for i, function in enumerate(functions): + prefix = f"{SpanAttributes.LLM_REQUEST_FUNCTIONS}.{i}" + attributes[f"{prefix}.name"] = function.get("name") + attributes[f"{prefix}.description"] = function.get("description") + attributes[f"{prefix}.parameters"] = json.dumps(function.get("parameters")) + + # Extract tools (newer format) + if "tools" in kwargs: + tools = kwargs["tools"] + for i, tool in enumerate(tools): + function = tool.get("function", {}) + prefix = f"{SpanAttributes.LLM_REQUEST_FUNCTIONS}.{i}" + attributes[f"{prefix}.name"] = function.get("name") + attributes[f"{prefix}.description"] = function.get("description") + attributes[f"{prefix}.parameters"] = json.dumps(function.get("parameters")) + + return attributes + + +def _extract_response_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract response attributes using common LLM handler.""" + if not return_value: + return {} + + response_dict = _get_response_dict(return_value) + if not response_dict: + return {} + + # Use the common LLM handler with OpenAI-specific mappings + attributes = LLMAttributeHandler.extract_response_attributes( + response_dict, additional_mappings=OPENAI_RESPONSE_ATTRIBUTES + ) + + # Handle OpenAI-specific usage attributes + usage = response_dict.get("usage", {}) + if usage: + # Extract reasoning tokens from output details + output_details = usage.get("output_tokens_details", {}) + if isinstance(output_details, dict) and "reasoning_tokens" in output_details: + attributes[SpanAttributes.LLM_USAGE_REASONING_TOKENS] = output_details["reasoning_tokens"] + + return attributes + + +def _get_response_dict(return_value: Any) -> Dict[str, Any]: + """Convert response to dictionary format.""" + if hasattr(return_value, "__dict__") and not hasattr(return_value, "__iter__"): + return model_as_dict(return_value) + elif isinstance(return_value, dict): + return return_value + return {} + + +# Create the main handler by composing individual handlers +handle_chat_attributes = create_composite_handler( + _extract_base_attributes, + _extract_request_attributes, + _extract_messages, + _extract_tools_and_functions, + _extract_response_attributes, +)