diff --git a/agentops/instrumentation/providers/openai/instrumentor.py b/agentops/instrumentation/providers/openai/instrumentor.py index 37883bf7a..9497d7252 100644 --- a/agentops/instrumentation/providers/openai/instrumentor.py +++ b/agentops/instrumentation/providers/openai/instrumentor.py @@ -13,7 +13,11 @@ """ from typing import Dict, Any +from wrapt import wrap_function_wrapper +from opentelemetry.metrics import Meter + +from agentops.logging import logger from agentops.instrumentation.common import ( CommonInstrumentor, InstrumentorConfig, @@ -22,11 +26,9 @@ MetricsRecorder, ) from agentops.instrumentation.providers.openai import LIBRARY_NAME, LIBRARY_VERSION -from agentops.instrumentation.providers.openai.attributes.common import get_response_attributes from agentops.instrumentation.providers.openai.config import Config from agentops.instrumentation.providers.openai.utils import is_openai_v1 from agentops.instrumentation.providers.openai.wrappers import ( - handle_chat_attributes, handle_completion_attributes, handle_embeddings_attributes, handle_image_gen_attributes, @@ -36,9 +38,14 @@ handle_run_stream_attributes, handle_messages_attributes, ) +from agentops.instrumentation.providers.openai.stream_wrapper import ( + chat_completion_stream_wrapper, + async_chat_completion_stream_wrapper, + responses_stream_wrapper, + async_responses_stream_wrapper, +) from agentops.instrumentation.providers.openai.v0 import OpenAIV0Instrumentor from agentops.semconv import Meters -from opentelemetry.metrics import Meter _instruments = ("openai >= 0.27.0",) @@ -82,6 +89,59 @@ def _initialize(self, **kwargs): # Skip normal instrumentation self.config.wrapped_methods = [] + def _custom_wrap(self, **kwargs): + """Add custom wrappers for streaming functionality.""" + if is_openai_v1() and self._tracer: + # from wrapt import wrap_function_wrapper + # # Add streaming wrappers for v1 + try: + # Chat completion streaming wrappers + + wrap_function_wrapper( + "openai.resources.chat.completions", + "Completions.create", + chat_completion_stream_wrapper(self._tracer), + ) + + wrap_function_wrapper( + "openai.resources.chat.completions", + "AsyncCompletions.create", + async_chat_completion_stream_wrapper(self._tracer), + ) + + # Beta chat completion streaming wrappers + wrap_function_wrapper( + "openai.resources.beta.chat.completions", + "Completions.parse", + chat_completion_stream_wrapper(self._tracer), + ) + + wrap_function_wrapper( + "openai.resources.beta.chat.completions", + "AsyncCompletions.parse", + async_chat_completion_stream_wrapper(self._tracer), + ) + + # Responses API streaming wrappers + wrap_function_wrapper( + "openai.resources.responses", + "Responses.create", + responses_stream_wrapper(self._tracer), + ) + + wrap_function_wrapper( + "openai.resources.responses", + "AsyncResponses.create", + async_responses_stream_wrapper(self._tracer), + ) + except Exception as e: + logger.warning(f"[OPENAI INSTRUMENTOR] Error setting up OpenAI streaming wrappers: {e}") + else: + if not is_openai_v1(): + logger.debug("[OPENAI INSTRUMENTOR] Skipping custom wrapping - not using OpenAI v1") + if not self._tracer: + logger.debug("[OPENAI INSTRUMENTOR] Skipping custom wrapping - no tracer available") + def _create_metrics(self, meter: Meter) -> Dict[str, Any]: """Create metrics for OpenAI instrumentation.""" metrics = StandardMetrics.create_standard_metrics(meter) @@ -130,29 +190,12 @@ def _custom_unwrap(self, **kwargs): OpenAIV0Instrumentor().uninstrument(**kwargs) def _get_wrapped_methods(self) -> list[WrapConfig]: - """Get all methods that should be wrapped.""" - wrapped_methods = [] + """Get all methods that should be wrapped. - # Chat completions - wrapped_methods.extend( - [ - WrapConfig( - trace_name="openai.chat.completion", - package="openai.resources.chat.completions", - class_name="Completions", - method_name="create", - handler=handle_chat_attributes, - ), - WrapConfig( - trace_name="openai.chat.completion", - package="openai.resources.chat.completions", - class_name="AsyncCompletions", - method_name="create", - handler=handle_chat_attributes, - is_async=True, - ), - ] - ) + Note: Chat completions and Responses API methods are NOT included here + as they are wrapped directly in _custom_wrap to support streaming. + """ + wrapped_methods = [] # Regular completions wrapped_methods.extend( @@ -221,27 +264,6 @@ def _get_wrapped_methods(self) -> list[WrapConfig]: ) ) - # Chat parse methods - beta_methods.extend( - [ - WrapConfig( - trace_name="openai.chat.completion", - package="openai.resources.beta.chat.completions", - class_name="Completions", - method_name="parse", - handler=handle_chat_attributes, - ), - WrapConfig( - trace_name="openai.chat.completion", - package="openai.resources.beta.chat.completions", - class_name="AsyncCompletions", - method_name="parse", - handler=handle_chat_attributes, - is_async=True, - ), - ] - ) - # Runs beta_methods.extend( [ @@ -283,27 +305,6 @@ def _get_wrapped_methods(self) -> list[WrapConfig]: # Add beta methods to wrapped methods (they might fail) wrapped_methods.extend(beta_methods) - # Responses API (Agents SDK) - our custom addition - wrapped_methods.extend( - [ - WrapConfig( - trace_name="openai.responses.create", - package="openai.resources.responses", - class_name="Responses", - method_name="create", - handler=get_response_attributes, - ), - WrapConfig( - trace_name="openai.responses.create", - package="openai.resources.responses", - class_name="AsyncResponses", - method_name="create", - handler=get_response_attributes, - is_async=True, - ), - ] - ) - return wrapped_methods def get_metrics_recorder(self) -> MetricsRecorder: diff --git a/agentops/instrumentation/providers/openai/stream_wrapper.py b/agentops/instrumentation/providers/openai/stream_wrapper.py new file mode 100644 index 000000000..15e541d82 --- /dev/null +++ b/agentops/instrumentation/providers/openai/stream_wrapper.py @@ -0,0 +1,766 @@ +"""OpenAI streaming response wrapper implementation. + +This module provides wrappers for OpenAI's streaming functionality, +handling both Chat Completions API and Responses API streaming. +It instruments streams to collect telemetry data for monitoring and analysis. +""" + +import time +from typing import Any, AsyncIterator, Iterator + +from opentelemetry import context as context_api +from opentelemetry.trace import Span, SpanKind, Status, StatusCode, set_span_in_context +from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY + +from agentops.logging import logger +from agentops.instrumentation.common.wrappers import _with_tracer_wrapper +from agentops.instrumentation.providers.openai.utils import is_metrics_enabled +from agentops.instrumentation.providers.openai.wrappers.chat import handle_chat_attributes +from agentops.semconv import SpanAttributes, LLMRequestTypeValues, MessageAttributes + + +class OpenaiStreamWrapper: + """Wrapper for OpenAI Chat Completions streaming responses. + + This wrapper intercepts streaming chunks to collect telemetry data including: + - Time to first token + - Total generation time + - Content aggregation + - Token usage (if available) + - Chunk statistics + """ + + def __init__(self, stream: Any, span: Span, request_kwargs: dict): + """Initialize the stream wrapper. + + Args: + stream: The original OpenAI stream object + span: The OpenTelemetry span for tracking + request_kwargs: Original request parameters for context + """ + self._stream = stream + self._span = span + self._request_kwargs = request_kwargs + self._start_time = time.time() + self._first_token_time = None + self._chunk_count = 0 + self._content_chunks = [] + self._finish_reason = None + self._model = None + self._response_id = None + self._usage = None + self._tool_calls = {} + self._current_tool_call_index = None + + # Make sure the span is attached to the current context + current_context = context_api.get_current() + self._token = context_api.attach(set_span_in_context(span, current_context)) + + def __iter__(self) -> Iterator[Any]: + """Return iterator for sync streaming.""" + return self + + def __next__(self) -> Any: + """Process the next chunk from the stream.""" + try: + chunk = next(self._stream) + self._process_chunk(chunk) + return chunk + except StopIteration: + self._finalize_stream() + raise + + def __enter__(self): + """Support context manager protocol.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Clean up on context manager exit.""" + if exc_type is not None: + self._span.record_exception(exc_val) + self._span.set_status(Status(StatusCode.ERROR, str(exc_val))) + + self._span.end() + context_api.detach(self._token) + return False + + def _process_chunk(self, chunk: Any) -> None: + """Process a single chunk from the stream. + + Args: + chunk: A chunk from the OpenAI streaming response + """ + self._chunk_count += 1 + + # Usage (may be in final chunk with a different structure) + if hasattr(chunk, "usage"): + self._usage = chunk.usage + # Check if this is a usage-only chunk (often the final chunk when stream_options.include_usage=true) + is_usage_only_chunk = not (hasattr(chunk, "choices") and chunk.choices) + + # If this is a usage-only chunk, we don't need to process it as a content chunk + if is_usage_only_chunk: + return + + # Skip processing if no choices are present + if not hasattr(chunk, "choices") or not chunk.choices: + return + + # Track first token timing + if self._first_token_time is None: + if any(choice.delta.content for choice in chunk.choices if hasattr(choice.delta, "content")): + self._first_token_time = time.time() + time_to_first_token = self._first_token_time - self._start_time + self._span.set_attribute(SpanAttributes.LLM_STREAMING_TIME_TO_FIRST_TOKEN, time_to_first_token) + self._span.add_event("first_token_received", {"time_elapsed": time_to_first_token}) + # Also check for tool_calls as first tokens + elif any( + choice.delta.tool_calls + for choice in chunk.choices + if hasattr(choice.delta, "tool_calls") and choice.delta.tool_calls + ): + self._first_token_time = time.time() + time_to_first_token = self._first_token_time - self._start_time + self._span.set_attribute(SpanAttributes.LLM_STREAMING_TIME_TO_FIRST_TOKEN, time_to_first_token) + self._span.add_event("first_tool_call_token_received", {"time_elapsed": time_to_first_token}) + + # Extract chunk data + if hasattr(chunk, "id") and chunk.id and not self._response_id: + self._response_id = chunk.id + if self._response_id is not None: + self._span.set_attribute(SpanAttributes.LLM_RESPONSE_ID, self._response_id) + + if hasattr(chunk, "model") and chunk.model and not self._model: + self._model = chunk.model + if self._model is not None: + self._span.set_attribute(SpanAttributes.LLM_RESPONSE_MODEL, self._model) + + # Process choices + for choice in chunk.choices: + if not hasattr(choice, "delta"): + continue + + delta = choice.delta + + # Content + if hasattr(delta, "content") and delta.content is not None: + self._content_chunks.append(delta.content) + + # Tool calls + if hasattr(delta, "tool_calls") and delta.tool_calls: + for tool_call in delta.tool_calls: + if hasattr(tool_call, "index"): + idx = tool_call.index + if idx not in self._tool_calls: + self._tool_calls[idx] = { + "id": "", + "type": "function", + "function": {"name": "", "arguments": ""}, + } + + if hasattr(tool_call, "id") and tool_call.id: + self._tool_calls[idx]["id"] = tool_call.id + + if hasattr(tool_call, "function"): + if hasattr(tool_call.function, "name") and tool_call.function.name: + self._tool_calls[idx]["function"]["name"] = tool_call.function.name + if hasattr(tool_call.function, "arguments") and tool_call.function.arguments: + self._tool_calls[idx]["function"]["arguments"] += tool_call.function.arguments + + # Finish reason + if hasattr(choice, "finish_reason") and choice.finish_reason: + self._finish_reason = choice.finish_reason + + def _finalize_stream(self) -> None: + """Finalize the stream and set final attributes on the span.""" + total_time = time.time() - self._start_time + + # Aggregate content + full_content = "".join(self._content_chunks) + + # Set generation time + if self._first_token_time: + generation_time = total_time - (self._first_token_time - self._start_time) + self._span.set_attribute(SpanAttributes.LLM_STREAMING_TIME_TO_GENERATE, generation_time) + + # Add content attributes + if full_content: + self._span.set_attribute(MessageAttributes.COMPLETION_CONTENT.format(i=0), full_content) + self._span.set_attribute(MessageAttributes.COMPLETION_ROLE.format(i=0), "assistant") + + # Set finish reason + if self._finish_reason: + self._span.set_attribute(MessageAttributes.COMPLETION_FINISH_REASON.format(i=0), self._finish_reason) + + # Set tool calls + if len(self._tool_calls) > 0: + for idx, tool_call in self._tool_calls.items(): + # Only set attributes if values are not None + if tool_call["id"] is not None: + self._span.set_attribute( + MessageAttributes.COMPLETION_TOOL_CALL_ID.format(i=0, j=idx), tool_call["id"] + ) + + if tool_call["type"] is not None: + self._span.set_attribute( + MessageAttributes.COMPLETION_TOOL_CALL_TYPE.format(i=0, j=idx), tool_call["type"] + ) + + if tool_call["function"]["name"] is not None: + self._span.set_attribute( + MessageAttributes.COMPLETION_TOOL_CALL_NAME.format(i=0, j=idx), tool_call["function"]["name"] + ) + + if tool_call["function"]["arguments"] is not None: + self._span.set_attribute( + MessageAttributes.COMPLETION_TOOL_CALL_ARGUMENTS.format(i=0, j=idx), + tool_call["function"]["arguments"], + ) + + # Set usage if available from the API + if self._usage is not None: + # Only set token attributes if they exist and have non-None values + if hasattr(self._usage, "prompt_tokens") and self._usage.prompt_tokens is not None: + self._span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, int(self._usage.prompt_tokens)) + + if hasattr(self._usage, "completion_tokens") and self._usage.completion_tokens is not None: + self._span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, int(self._usage.completion_tokens)) + + if hasattr(self._usage, "total_tokens") and self._usage.total_tokens is not None: + self._span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, int(self._usage.total_tokens)) + + # Stream statistics + self._span.set_attribute("llm.openai.stream.chunk_count", self._chunk_count) + self._span.set_attribute("llm.openai.stream.content_length", len(full_content)) + self._span.set_attribute("llm.openai.stream.total_duration", total_time) + + # Add completion event + self._span.add_event( + "stream_completed", + { + "chunks_received": self._chunk_count, + "total_content_length": len(full_content), + "duration": total_time, + "had_tool_calls": len(self._tool_calls) > 0, + }, + ) + + # Finalize span and context + self._span.set_status(Status(StatusCode.OK)) + self._span.end() + context_api.detach(self._token) + + +class OpenAIAsyncStreamWrapper: + """Async wrapper for OpenAI Chat Completions streaming responses.""" + + def __init__(self, stream: Any, span: Span, request_kwargs: dict): + """Initialize the async stream wrapper. + + Args: + stream: The original OpenAI async stream object + span: The OpenTelemetry span for tracking + request_kwargs: Original request parameters for context + """ + self._stream = stream + self._span = span + self._request_kwargs = request_kwargs + self._start_time = time.time() + self._first_token_time = None + self._chunk_count = 0 + self._content_chunks = [] + self._finish_reason = None + self._model = None + self._response_id = None + self._usage = None + self._tool_calls = {} + + # Make sure the span is attached to the current context + current_context = context_api.get_current() + self._token = context_api.attach(set_span_in_context(span, current_context)) + + def __aiter__(self) -> AsyncIterator[Any]: + """Return async iterator for async streaming.""" + return self + + async def __anext__(self) -> Any: + """Process the next chunk from the async stream.""" + try: + if not hasattr(self, "_aiter_debug_logged"): + self._aiter_debug_logged = True + + chunk = await self._stream.__anext__() + + # Reuse the synchronous implementation + OpenaiStreamWrapper._process_chunk(self, chunk) + return chunk + except StopAsyncIteration: + OpenaiStreamWrapper._finalize_stream(self) + raise + except Exception as e: + logger.error(f"[OPENAI ASYNC WRAPPER] Error in __anext__: {e}") + # Make sure span is ended in case of error + self._span.record_exception(e) + self._span.set_status(Status(StatusCode.ERROR, str(e))) + self._span.end() + context_api.detach(self._token) + raise + + async def __aenter__(self): + """Support async context manager protocol.""" + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Clean up on async context manager exit.""" + if exc_type is not None: + self._span.record_exception(exc_val) + self._span.set_status(Status(StatusCode.ERROR, str(exc_val))) + + self._span.end() + context_api.detach(self._token) + return False + + +@_with_tracer_wrapper +def chat_completion_stream_wrapper(tracer, wrapped, instance, args, kwargs): + """Wrapper for chat completions (both streaming and non-streaming). + + This wrapper handles both streaming and non-streaming responses, + wrapping streams with telemetry collection while maintaining the original interface. + """ + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return wrapped(*args, **kwargs) + + # Check if streaming is enabled + is_streaming = kwargs.get("stream", False) + + # Start the span + span = tracer.start_span( + "openai.chat.completion", + kind=SpanKind.CLIENT, + attributes={SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.CHAT.value}, + ) + + # Make sure span is linked to the current trace context + current_context = context_api.get_current() + token = context_api.attach(set_span_in_context(span, current_context)) + + try: + # Extract and set request attributes + request_attributes = handle_chat_attributes(kwargs=kwargs) + + for key, value in request_attributes.items(): + span.set_attribute(key, value) + + # Add include_usage to get token counts for streaming responses + if is_streaming and is_metrics_enabled(): + # Add stream_options if it doesn't exist + if "stream_options" not in kwargs: + kwargs["stream_options"] = {"include_usage": True} + logger.debug("[OPENAI WRAPPER] Adding stream_options.include_usage=True to get token counts") + # If stream_options exists but doesn't have include_usage, add it + elif isinstance(kwargs["stream_options"], dict) and "include_usage" not in kwargs["stream_options"]: + kwargs["stream_options"]["include_usage"] = True + logger.debug( + "[OPENAI WRAPPER] Adding include_usage=True to existing stream_options to get token counts" + ) + + # Call the original method + response = wrapped(*args, **kwargs) + + if is_streaming: + # Wrap the stream + context_api.detach(token) + return OpenaiStreamWrapper(response, span, kwargs) + else: + # Handle non-streaming response + response_attributes = handle_chat_attributes(kwargs=kwargs, return_value=response) + + for key, value in response_attributes.items(): + if key not in request_attributes: # Avoid overwriting request attributes + span.set_attribute(key, value) + + span.set_status(Status(StatusCode.OK)) + span.end() + context_api.detach(token) + return response + + except Exception as e: + logger.error(f"[OPENAI WRAPPER] Error in chat_completion_stream_wrapper: {e}") + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + span.end() + context_api.detach(token) + raise + + +@_with_tracer_wrapper +async def async_chat_completion_stream_wrapper(tracer, wrapped, instance, args, kwargs): + """Async wrapper for chat completions (both streaming and non-streaming).""" + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return await wrapped(*args, **kwargs) + + # Check if streaming is enabled + is_streaming = kwargs.get("stream", False) + + # Start the span + span = tracer.start_span( + "openai.chat.completion", + kind=SpanKind.CLIENT, + attributes={SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.CHAT.value}, + ) + + # Make sure span is linked to the current trace context + current_context = context_api.get_current() + token = context_api.attach(set_span_in_context(span, current_context)) + + try: + # Extract and set request attributes + request_attributes = handle_chat_attributes(kwargs=kwargs) + + for key, value in request_attributes.items(): + span.set_attribute(key, value) + + # Add include_usage to get token counts for streaming responses + if is_streaming and is_metrics_enabled(): + # Add stream_options if it doesn't exist + if "stream_options" not in kwargs: + kwargs["stream_options"] = {"include_usage": True} + # If stream_options exists but doesn't have include_usage, add it + elif isinstance(kwargs["stream_options"], dict) and "include_usage" not in kwargs["stream_options"]: + kwargs["stream_options"]["include_usage"] = True + + # Call the original method + response = await wrapped(*args, **kwargs) + + if is_streaming: + # Wrap the stream + context_api.detach(token) + return OpenAIAsyncStreamWrapper(response, span, kwargs) + else: + # Handle non-streaming response + response_attributes = handle_chat_attributes(kwargs=kwargs, return_value=response) + + for key, value in response_attributes.items(): + if key not in request_attributes: # Avoid overwriting request attributes + span.set_attribute(key, value) + + span.set_status(Status(StatusCode.OK)) + span.end() + context_api.detach(token) + return response + + except Exception as e: + logger.error(f"[OPENAI WRAPPER] Error in async_chat_completion_stream_wrapper: {e}") + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + span.end() + context_api.detach(token) + raise + + +class ResponsesAPIStreamWrapper: + """Wrapper for OpenAI Responses API streaming. + + The Responses API uses event-based streaming with typed events + like 'response.output_text.delta' instead of generic chunks. + """ + + def __init__(self, stream: Any, span: Span, request_kwargs: dict): + """Initialize the Responses API stream wrapper.""" + self._stream = stream + self._span = span + self._request_kwargs = request_kwargs + self._start_time = time.time() + self._first_token_time = None + self._event_count = 0 + self._content_chunks = [] + self._response_id = None + self._model = None + self._usage = None + + # Make sure the span is attached to the current context + current_context = context_api.get_current() + self._token = context_api.attach(set_span_in_context(span, current_context)) + + def __iter__(self) -> Iterator[Any]: + """Return iterator for sync streaming.""" + return self + + def __next__(self) -> Any: + """Process the next event from the stream.""" + try: + event = next(self._stream) + self._process_event(event) + return event + except StopIteration: + self._finalize_stream() + raise + + # Add async iterator support + def __aiter__(self) -> AsyncIterator[Any]: + """Return async iterator for async streaming.""" + return self + + async def __anext__(self) -> Any: + """Process the next event from the async stream.""" + try: + # If the underlying stream is async + if hasattr(self._stream, "__anext__"): + event = await self._stream.__anext__() + # If the underlying stream is sync but we're in an async context + else: + try: + event = next(self._stream) + except StopIteration: + self._finalize_stream() + raise StopAsyncIteration + + self._process_event(event) + return event + except StopAsyncIteration: + self._finalize_stream() + raise + except Exception as e: + logger.error(f"[RESPONSES API WRAPPER] Error in __anext__: {e}") + # Make sure span is ended in case of error + self._span.record_exception(e) + self._span.set_status(Status(StatusCode.ERROR, str(e))) + self._span.end() + context_api.detach(self._token) + raise + + def _process_event(self, event: Any) -> None: + """Process a single event from the Responses API stream.""" + self._event_count += 1 + + # Track first content event + if self._first_token_time is None and hasattr(event, "type"): + if event.type == "response.output_text.delta": + self._first_token_time = time.time() + time_to_first_token = self._first_token_time - self._start_time + self._span.set_attribute(SpanAttributes.LLM_STREAMING_TIME_TO_FIRST_TOKEN, time_to_first_token) + + # Process different event types + if hasattr(event, "type"): + if event.type == "response.created": + if hasattr(event, "response"): + response = event.response + if hasattr(response, "id"): + self._response_id = response.id + self._span.set_attribute(SpanAttributes.LLM_RESPONSE_ID, self._response_id) + if hasattr(response, "model"): + self._model = response.model + self._span.set_attribute(SpanAttributes.LLM_RESPONSE_MODEL, self._model) + + elif event.type == "response.output_text.delta": + if hasattr(event, "delta"): + self._content_chunks.append(event.delta) + + elif event.type == "response.done": + if hasattr(event, "response") and hasattr(event.response, "usage"): + self._usage = event.response.usage + + # Add event tracking + self._span.add_event( + "responses_api_event", + {"event_type": event.type if hasattr(event, "type") else "unknown", "event_number": self._event_count}, + ) + + def _finalize_stream(self) -> None: + """Finalize the Responses API stream.""" + total_time = time.time() - self._start_time + + # Aggregate content + full_content = "".join(self._content_chunks) + if full_content: + self._span.set_attribute(MessageAttributes.COMPLETION_CONTENT.format(i=0), full_content) + + # Set timing + if self._first_token_time: + generation_time = total_time - (self._first_token_time - self._start_time) + self._span.set_attribute(SpanAttributes.LLM_STREAMING_TIME_TO_GENERATE, generation_time) + + # Set usage if available from the API + if self._usage is not None: + # Only set token attributes if they exist and have non-None values + if hasattr(self._usage, "input_tokens") and self._usage.input_tokens is not None: + self._span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, int(self._usage.input_tokens)) + + if hasattr(self._usage, "output_tokens") and self._usage.output_tokens is not None: + self._span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, int(self._usage.output_tokens)) + + if hasattr(self._usage, "total_tokens") and self._usage.total_tokens is not None: + self._span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, int(self._usage.total_tokens)) + + else: + logger.debug( + f"[RESPONSES API] No usage provided by API. " + f"content_length={len(full_content)}, " + f"event_count={self._event_count}" + ) + + # Stream statistics + self._span.set_attribute("llm.openai.responses.event_count", self._event_count) + self._span.set_attribute("llm.openai.responses.content_length", len(full_content)) + self._span.set_attribute("llm.openai.responses.total_duration", total_time) + + # Finalize span and context + self._span.set_status(Status(StatusCode.OK)) + self._span.end() + context_api.detach(self._token) + + +@_with_tracer_wrapper +def responses_stream_wrapper(tracer, wrapped, instance, args, kwargs): + """Wrapper for Responses API (both streaming and non-streaming).""" + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return wrapped(*args, **kwargs) + + # Check if streaming is enabled + is_streaming = kwargs.get("stream", False) + + # If not streaming, just call the wrapped method directly + # The normal instrumentation will handle it + if not is_streaming: + logger.debug("[RESPONSES API WRAPPER] Non-streaming call, delegating to normal instrumentation") + return wrapped(*args, **kwargs) + + # Only create span for streaming responses + span = tracer.start_span( + "openai.responses.create", + kind=SpanKind.CLIENT, + attributes={SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.CHAT.value}, + ) + + try: + # Extract and set request attributes + span.set_attribute(SpanAttributes.LLM_SYSTEM, "OpenAI") + span.set_attribute(SpanAttributes.LLM_REQUEST_STREAMING, is_streaming) + + if "model" in kwargs: + span.set_attribute(SpanAttributes.LLM_REQUEST_MODEL, kwargs["model"]) + if "messages" in kwargs: + # Set messages as prompts for consistency + messages = kwargs["messages"] + for i, msg in enumerate(messages): + prefix = f"{SpanAttributes.LLM_PROMPTS}.{i}" + if isinstance(msg, dict): + if "role" in msg: + span.set_attribute(f"{prefix}.role", msg["role"]) + if "content" in msg: + span.set_attribute(f"{prefix}.content", msg["content"]) + + # Tools + if "tools" in kwargs: + tools = kwargs["tools"] + if tools: + for i, tool in enumerate(tools): + if isinstance(tool, dict) and "function" in tool: + function = tool["function"] + prefix = f"{SpanAttributes.LLM_REQUEST_FUNCTIONS}.{i}" + if "name" in function: + span.set_attribute(f"{prefix}.name", function["name"]) + if "description" in function: + span.set_attribute(f"{prefix}.description", function["description"]) + if "parameters" in function: + import json + + span.set_attribute(f"{prefix}.parameters", json.dumps(function["parameters"])) + + # Temperature and other parameters + if "temperature" in kwargs: + span.set_attribute(SpanAttributes.LLM_REQUEST_TEMPERATURE, kwargs["temperature"]) + if "top_p" in kwargs: + span.set_attribute(SpanAttributes.LLM_REQUEST_TOP_P, kwargs["top_p"]) + + # Call the original method + response = wrapped(*args, **kwargs) + + # For streaming, wrap the stream + return ResponsesAPIStreamWrapper(response, span, kwargs) + + except Exception as e: + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + span.end() + raise + + +@_with_tracer_wrapper +async def async_responses_stream_wrapper(tracer, wrapped, instance, args, kwargs): + """Async wrapper for Responses API (both streaming and non-streaming).""" + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return await wrapped(*args, **kwargs) + + # Check if streaming is enabled + is_streaming = kwargs.get("stream", False) + + # If not streaming, just call the wrapped method directly + # The normal instrumentation will handle it + if not is_streaming: + logger.debug("[RESPONSES API WRAPPER] Non-streaming call, delegating to normal instrumentation") + return await wrapped(*args, **kwargs) + + # Only create span for streaming responses + span = tracer.start_span( + "openai.responses.create", + kind=SpanKind.CLIENT, + attributes={SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.CHAT.value}, + ) + + try: + # Extract and set request attributes + span.set_attribute(SpanAttributes.LLM_SYSTEM, "OpenAI") + span.set_attribute(SpanAttributes.LLM_REQUEST_STREAMING, is_streaming) + + if "model" in kwargs: + span.set_attribute(SpanAttributes.LLM_REQUEST_MODEL, kwargs["model"]) + + if "messages" in kwargs: + # Set messages as prompts for consistency + messages = kwargs["messages"] + for i, msg in enumerate(messages): + prefix = f"{SpanAttributes.LLM_PROMPTS}.{i}" + if isinstance(msg, dict): + if "role" in msg: + span.set_attribute(f"{prefix}.role", msg["role"]) + if "content" in msg: + span.set_attribute(f"{prefix}.content", msg["content"]) + + # Tools + if "tools" in kwargs: + tools = kwargs["tools"] + if tools: + for i, tool in enumerate(tools): + if isinstance(tool, dict) and "function" in tool: + function = tool["function"] + prefix = f"{SpanAttributes.LLM_REQUEST_FUNCTIONS}.{i}" + if "name" in function: + span.set_attribute(f"{prefix}.name", function["name"]) + if "description" in function: + span.set_attribute(f"{prefix}.description", function["description"]) + if "parameters" in function: + import json + + span.set_attribute(f"{prefix}.parameters", json.dumps(function["parameters"])) + + # Temperature and other parameters + if "temperature" in kwargs: + span.set_attribute(SpanAttributes.LLM_REQUEST_TEMPERATURE, kwargs["temperature"]) + if "top_p" in kwargs: + span.set_attribute(SpanAttributes.LLM_REQUEST_TOP_P, kwargs["top_p"]) + + # Call the original method + response = await wrapped(*args, **kwargs) + + # For streaming, wrap the stream + logger.debug("[RESPONSES API WRAPPER] Wrapping streaming response with ResponsesAPIStreamWrapper") + wrapped_stream = ResponsesAPIStreamWrapper(response, span, kwargs) + return wrapped_stream + + except Exception as e: + span.record_exception(e) + span.set_status(Status(StatusCode.ERROR, str(e))) + span.end() + raise diff --git a/agentops/instrumentation/providers/openai/wrappers/chat.py b/agentops/instrumentation/providers/openai/wrappers/chat.py index 65b5e9181..5a40501ce 100644 --- a/agentops/instrumentation/providers/openai/wrappers/chat.py +++ b/agentops/instrumentation/providers/openai/wrappers/chat.py @@ -137,7 +137,7 @@ def handle_chat_attributes( attributes[SpanAttributes.LLM_RESPONSE_ID] = response_dict["id"] if "model" in response_dict: attributes[SpanAttributes.LLM_RESPONSE_MODEL] = response_dict["model"] - if "system_fingerprint" in response_dict: + if "system_fingerprint" in response_dict and response_dict["system_fingerprint"] is not None: attributes[SpanAttributes.LLM_OPENAI_RESPONSE_SYSTEM_FINGERPRINT] = response_dict["system_fingerprint"] # Usage @@ -176,9 +176,9 @@ def handle_chat_attributes( if message: if "role" in message: attributes[f"{prefix}.role"] = message["role"] - if "content" in message: + if "content" in message and message["content"] is not None: attributes[f"{prefix}.content"] = message["content"] - if "refusal" in message: + if "refusal" in message and message["refusal"] is not None: attributes[f"{prefix}.refusal"] = message["refusal"] # Function call diff --git a/agentops/semconv/span_attributes.py b/agentops/semconv/span_attributes.py index 0daf0ddfc..a6be4132a 100644 --- a/agentops/semconv/span_attributes.py +++ b/agentops/semconv/span_attributes.py @@ -98,3 +98,9 @@ class SpanAttributes: # Session/Trace attributes AGENTOPS_SESSION_END_STATE = "agentops.session.end_state" + + # Streaming-specific attributes + LLM_STREAMING_TIME_TO_FIRST_TOKEN = "gen_ai.streaming.time_to_first_token" + LLM_STREAMING_TIME_TO_GENERATE = "gen_ai.streaming.time_to_generate" + LLM_STREAMING_DURATION = "gen_ai.streaming_duration" + LLM_STREAMING_CHUNK_COUNT = "gen_ai.streaming.chunk_count" diff --git a/examples/openai/openai_example_async.py b/examples/openai/openai_example_async.py index 002893172..1e32f52b4 100644 --- a/examples/openai/openai_example_async.py +++ b/examples/openai/openai_example_async.py @@ -12,8 +12,8 @@ from openai import AsyncOpenAI import agentops import os -from dotenv import load_dotenv import asyncio +from dotenv import load_dotenv # Next, we'll grab our API keys. You can use dotenv like below or however else you like to load environment variables load_dotenv() @@ -71,7 +71,8 @@ async def main_stream(): stream=True, ) async for chunk in stream: - print(chunk.choices[0].delta.content or "", end="") + if chunk.choices and len(chunk.choices) > 0: + print(chunk.choices[0].delta.content or "", end="") asyncio.run(main_stream()) diff --git a/examples/openai/openai_example_sync.py b/examples/openai/openai_example_sync.py index 617439e93..6b535251b 100644 --- a/examples/openai/openai_example_sync.py +++ b/examples/openai/openai_example_sync.py @@ -59,7 +59,8 @@ ) for chunk in stream: - print(chunk.choices[0].delta.content or "", end="") + if chunk.choices and len(chunk.choices) > 0: + print(chunk.choices[0].delta.content or "", end="") agentops.end_trace(tracer, end_state="Success") diff --git a/tests/unit/instrumentation/openai_core/test_instrumentor.py b/tests/unit/instrumentation/openai_core/test_instrumentor.py index 52fd6f81d..7a17152c4 100644 --- a/tests/unit/instrumentation/openai_core/test_instrumentor.py +++ b/tests/unit/instrumentation/openai_core/test_instrumentor.py @@ -16,7 +16,6 @@ from agentops.instrumentation.providers.openai.instrumentor import OpenaiInstrumentor -from agentops.instrumentation.common.wrappers import WrapConfig # Utility function to load fixtures @@ -41,44 +40,52 @@ def instrumentor(self): # Create patches for tracer and meter with patch("agentops.instrumentation.common.instrumentor.get_tracer") as mock_get_tracer: with patch("agentops.instrumentation.common.instrumentor.get_meter") as mock_get_meter: - # Set up mock tracer and meter - mock_tracer = MagicMock() - mock_meter = MagicMock() - mock_get_tracer.return_value = mock_tracer - mock_get_meter.return_value = mock_meter - - # Create a real instrumentation setup for testing - mock_tracer_provider = MagicMock() - instrumentor = OpenaiInstrumentor() - - # To avoid timing issues with the fixture, we need to ensure patch - # objects are created before being used in the test - mock_wrap = patch("agentops.instrumentation.common.instrumentor.wrap").start() - mock_unwrap = patch("agentops.instrumentation.common.instrumentor.unwrap").start() - mock_instrument = patch.object(instrumentor, "_instrument", wraps=instrumentor._instrument).start() - mock_uninstrument = patch.object( - instrumentor, "_uninstrument", wraps=instrumentor._uninstrument - ).start() - - # Instrument - instrumentor._instrument(tracer_provider=mock_tracer_provider) - - yield { - "instrumentor": instrumentor, - "tracer_provider": mock_tracer_provider, - "mock_wrap": mock_wrap, - "mock_unwrap": mock_unwrap, - "mock_instrument": mock_instrument, - "mock_uninstrument": mock_uninstrument, - "mock_tracer": mock_tracer, - "mock_meter": mock_meter, - } - - # Uninstrument - must happen before stopping patches - instrumentor._uninstrument() - - # Stop patches - patch.stopall() + with patch("agentops.instrumentation.providers.openai.utils.is_openai_v1", return_value=True): + with patch( + "agentops.instrumentation.providers.openai.instrumentor.is_openai_v1", return_value=True + ): + # Set up mock tracer and meter + mock_tracer = MagicMock() + mock_meter = MagicMock() + mock_get_tracer.return_value = mock_tracer + mock_get_meter.return_value = mock_meter + + # Create a real instrumentation setup for testing + mock_tracer_provider = MagicMock() + instrumentor = OpenaiInstrumentor() + + # To avoid timing issues with the fixture, we need to ensure patch + # objects are created before being used in the test + mock_wrap = patch("agentops.instrumentation.common.instrumentor.wrap").start() + mock_unwrap = patch("agentops.instrumentation.common.instrumentor.unwrap").start() + mock_wrap_function_wrapper = patch("wrapt.wrap_function_wrapper").start() + mock_instrument = patch.object( + instrumentor, "_instrument", wraps=instrumentor._instrument + ).start() + mock_uninstrument = patch.object( + instrumentor, "_uninstrument", wraps=instrumentor._uninstrument + ).start() + + # Instrument + instrumentor._instrument(tracer_provider=mock_tracer_provider) + + yield { + "instrumentor": instrumentor, + "tracer_provider": mock_tracer_provider, + "mock_wrap": mock_wrap, + "mock_unwrap": mock_unwrap, + "mock_wrap_function_wrapper": mock_wrap_function_wrapper, + "mock_instrument": mock_instrument, + "mock_uninstrument": mock_uninstrument, + "mock_tracer": mock_tracer, + "mock_meter": mock_meter, + } + + # Uninstrument - must happen before stopping patches + instrumentor._uninstrument() + + # Stop patches + patch.stopall() def test_instrumentor_initialization(self): """Test instrumentor is initialized with correct configuration""" @@ -92,32 +99,36 @@ def test_instrumentor_initialization(self): def test_instrument_method_wraps_response_api(self, instrumentor): """Test the _instrument method wraps the Response API methods""" - mock_wrap = instrumentor["mock_wrap"] - - # Verify wrap was called multiple times (we wrap many methods) - assert mock_wrap.call_count > 0 - - # Find Response API calls in the wrapped methods - response_api_calls = [] - for call in mock_wrap.call_args_list: - wrap_config = call[0][0] - if isinstance(wrap_config, WrapConfig) and wrap_config.package == "openai.resources.responses": - response_api_calls.append(wrap_config) - - # Verify we have both sync and async Response API methods - assert len(response_api_calls) == 2 - - # Check sync Responses.create - sync_response = next((cfg for cfg in response_api_calls if cfg.class_name == "Responses"), None) - assert sync_response is not None - assert sync_response.trace_name == "openai.responses.create" - assert sync_response.method_name == "create" - - # Check async AsyncResponses.create - async_response = next((cfg for cfg in response_api_calls if cfg.class_name == "AsyncResponses"), None) - assert async_response is not None - assert async_response.trace_name == "openai.responses.create" - assert async_response.method_name == "create" + instrumentor_obj = instrumentor["instrumentor"] + + # Create a new mock for wrap_function_wrapper that we control + with patch("agentops.instrumentation.providers.openai.instrumentor.wrap_function_wrapper") as mock_wfw: + # Call _custom_wrap directly to test Response API wrapping + instrumentor_obj._custom_wrap() + + # Verify wrap_function_wrapper was called for Response API methods + assert ( + mock_wfw.call_count >= 2 + ), f"Expected at least 2 calls to wrap_function_wrapper, got {mock_wfw.call_count}" + + # Find Response API calls + response_api_calls = [] + for call in mock_wfw.call_args_list: + if len(call[0]) >= 2 and "openai.resources.responses" in call[0][0]: + response_api_calls.append( + { + "module": call[0][0], + "method": call[0][1], + } + ) + + # Verify we have both sync and async Response API methods + assert len(response_api_calls) == 2, f"Expected 2 Response API calls, got {len(response_api_calls)}" + + # Check that we have both Responses.create and AsyncResponses.create + methods = [call["method"] for call in response_api_calls] + assert "Responses.create" in methods + assert "AsyncResponses.create" in methods def test_uninstrument_method_unwraps_response_api(self, instrumentor): """Test the _uninstrument method unwraps the Response API methods"""