diff --git a/.github/workflows/examples-integration-test.yml b/.github/workflows/examples-integration-test.yml index 0458311f8..510330b86 100644 --- a/.github/workflows/examples-integration-test.yml +++ b/.github/workflows/examples-integration-test.yml @@ -36,6 +36,7 @@ jobs: - { path: 'examples/openai/openai_example_async.py', name: 'OpenAI Async' } - { path: 'examples/openai/multi_tool_orchestration.py', name: 'OpenAI Multi-Tool' } - { path: 'examples/openai/web_search.py', name: 'OpenAI Web Search' } + - { path: 'examples/openai/o3_responses_example.py', name: 'OpenAI o3 Responses' } # Anthropic examples - { path: 'examples/anthropic/anthropic-example-sync.py', name: 'Anthropic Sync' } diff --git a/agentops/instrumentation/providers/openai/stream_wrapper.py b/agentops/instrumentation/providers/openai/stream_wrapper.py index 1d8c18c12..4d23c5b35 100644 --- a/agentops/instrumentation/providers/openai/stream_wrapper.py +++ b/agentops/instrumentation/providers/openai/stream_wrapper.py @@ -10,7 +10,7 @@ from opentelemetry import context as context_api from opentelemetry.trace import Span, SpanKind, Status, StatusCode, set_span_in_context -from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY +from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY from agentops.logging import logger from agentops.instrumentation.common.wrappers import _with_tracer_wrapper @@ -272,11 +272,11 @@ async def __anext__(self) -> Any: chunk = await self._stream.__anext__() - # Reuse the synchronous implementation - OpenaiStreamWrapper._process_chunk(self, chunk) + # Process the chunk + self._process_chunk(chunk) return chunk except StopAsyncIteration: - OpenaiStreamWrapper._finalize_stream(self) + self._finalize_stream() raise except Exception as e: logger.error(f"[OPENAI ASYNC WRAPPER] Error in __anext__: {e}") @@ -301,6 +301,153 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): context_api.detach(self._token) return False + def _process_chunk(self, chunk: Any) -> None: + """Process a single chunk from the stream. + + Args: + chunk: A chunk from the OpenAI streaming response + """ + self._chunk_count += 1 + + # Usage (may be in final chunk with a different structure) + if hasattr(chunk, "usage"): + self._usage = chunk.usage + # Check if this is a usage-only chunk (often the final chunk when stream_options.include_usage=true) + is_usage_only_chunk = not (hasattr(chunk, "choices") and chunk.choices) + + # If this is a usage-only chunk, we don't need to process it as a content chunk + if is_usage_only_chunk: + return + + # Skip processing if no choices are present + if not hasattr(chunk, "choices") or not chunk.choices: + return + + # Track first token timing + if self._first_token_time is None: + if any(choice.delta.content for choice in chunk.choices if hasattr(choice.delta, "content")): + self._first_token_time = time.time() + time_to_first_token = self._first_token_time - self._start_time + self._span.set_attribute(SpanAttributes.LLM_STREAMING_TIME_TO_FIRST_TOKEN, time_to_first_token) + self._span.add_event("first_token_received", {"time_elapsed": time_to_first_token}) + # Also check for tool_calls as first tokens + elif any( + choice.delta.tool_calls + for choice in chunk.choices + if hasattr(choice.delta, "tool_calls") and choice.delta.tool_calls + ): + self._first_token_time = time.time() + time_to_first_token = self._first_token_time - self._start_time + self._span.set_attribute(SpanAttributes.LLM_STREAMING_TIME_TO_FIRST_TOKEN, time_to_first_token) + self._span.add_event("first_tool_call_token_received", {"time_elapsed": time_to_first_token}) + + # Extract chunk data + if hasattr(chunk, "id") and chunk.id and not self._response_id: + self._response_id = chunk.id + if self._response_id is not None: + self._span.set_attribute(SpanAttributes.LLM_RESPONSE_ID, self._response_id) + + if hasattr(chunk, "model") and chunk.model and not self._model: + self._model = chunk.model + if self._model is not None: + self._span.set_attribute(SpanAttributes.LLM_RESPONSE_MODEL, self._model) + + # Process choices + for choice in chunk.choices: + if not hasattr(choice, "delta"): + continue + + delta = choice.delta + + # Content + if hasattr(delta, "content") and delta.content is not None: + self._content_chunks.append(delta.content) + + # Tool calls + if hasattr(delta, "tool_calls") and delta.tool_calls: + for tool_call in delta.tool_calls: + if hasattr(tool_call, "index"): + idx = tool_call.index + if idx not in self._tool_calls: + self._tool_calls[idx] = { + "id": "", + "type": "function", + "function": {"name": "", "arguments": ""}, + } + + if hasattr(tool_call, "id") and tool_call.id: + self._tool_calls[idx]["id"] = tool_call.id + + if hasattr(tool_call, "function"): + if hasattr(tool_call.function, "name") and tool_call.function.name: + self._tool_calls[idx]["function"]["name"] = tool_call.function.name + if hasattr(tool_call.function, "arguments") and tool_call.function.arguments: + self._tool_calls[idx]["function"]["arguments"] += tool_call.function.arguments + + # Finish reason + if hasattr(choice, "finish_reason") and choice.finish_reason: + self._finish_reason = choice.finish_reason + + def _finalize_stream(self) -> None: + """Finalize the stream and set final attributes on the span.""" + total_time = time.time() - self._start_time + + # Aggregate content + full_content = "".join(self._content_chunks) + + # Set generation time + if self._first_token_time: + generation_time = total_time - (self._first_token_time - self._start_time) + self._span.set_attribute(SpanAttributes.LLM_STREAMING_TIME_TO_GENERATE, generation_time) + + # Add content attributes + if full_content: + self._span.set_attribute(MessageAttributes.COMPLETION_CONTENT.format(i=0), full_content) + self._span.set_attribute(MessageAttributes.COMPLETION_ROLE.format(i=0), "assistant") + + # Set finish reason + if self._finish_reason: + self._span.set_attribute(MessageAttributes.COMPLETION_FINISH_REASON.format(i=0), self._finish_reason) + + # Create tool spans for each tool call + if len(self._tool_calls) > 0: + for idx, tool_call in self._tool_calls.items(): + # Create a child span for this tool call + _create_tool_span(self._span, tool_call) + + # Set usage if available from the API + if self._usage is not None: + # Only set token attributes if they exist and have non-None values + if hasattr(self._usage, "prompt_tokens") and self._usage.prompt_tokens is not None: + self._span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, int(self._usage.prompt_tokens)) + + if hasattr(self._usage, "completion_tokens") and self._usage.completion_tokens is not None: + self._span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, int(self._usage.completion_tokens)) + + if hasattr(self._usage, "total_tokens") and self._usage.total_tokens is not None: + self._span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, int(self._usage.total_tokens)) + + # Stream statistics + self._span.set_attribute("llm.openai.stream.chunk_count", self._chunk_count) + self._span.set_attribute("llm.openai.stream.content_length", len(full_content)) + self._span.set_attribute("llm.openai.stream.total_duration", total_time) + + # Add completion event + self._span.add_event( + "stream_completed", + { + "chunks_received": self._chunk_count, + "total_content_length": len(full_content), + "duration": total_time, + "had_tool_calls": len(self._tool_calls) > 0, + }, + ) + + # Finalize span and context + self._span.set_status(Status(StatusCode.OK)) + self._span.end() + context_api.detach(self._token) + @_with_tracer_wrapper def chat_completion_stream_wrapper(tracer, wrapped, instance, args, kwargs): @@ -456,9 +603,13 @@ def __init__(self, stream: Any, span: Span, request_kwargs: dict): self._first_token_time = None self._event_count = 0 self._content_chunks = [] + self._function_call_chunks = [] + self._reasoning_chunks = [] self._response_id = None self._model = None self._usage = None + self._output_items = [] + self._current_function_args = "" # Make sure the span is attached to the current context current_context = context_api.get_current() @@ -517,7 +668,7 @@ def _process_event(self, event: Any) -> None: # Track first content event if self._first_token_time is None and hasattr(event, "type"): - if event.type == "response.output_text.delta": + if event.type in ["response.output_text.delta", "response.function_call_arguments.delta"]: self._first_token_time = time.time() time_to_first_token = self._first_token_time - self._start_time self._span.set_attribute(SpanAttributes.LLM_STREAMING_TIME_TO_FIRST_TOKEN, time_to_first_token) @@ -538,24 +689,91 @@ def _process_event(self, event: Any) -> None: if hasattr(event, "delta"): self._content_chunks.append(event.delta) - elif event.type == "response.done": - if hasattr(event, "response") and hasattr(event.response, "usage"): - self._usage = event.response.usage + elif event.type == "response.function_call_arguments.delta": + # Accumulate function call arguments + if hasattr(event, "delta"): + self._current_function_args += event.delta - # Add event tracking - self._span.add_event( - "responses_api_event", - {"event_type": event.type if hasattr(event, "type") else "unknown", "event_number": self._event_count}, - ) + elif event.type == "response.completed": + # Process the final response which contains all output items + if hasattr(event, "response"): + response = event.response + if hasattr(response, "usage"): + self._usage = response.usage + + # Extract output items from the completed response + if hasattr(response, "output"): + for output_item in response.output: + if hasattr(output_item, "type"): + if output_item.type == "function_call" and hasattr(output_item, "arguments"): + self._function_call_chunks.append(output_item.arguments) + elif output_item.type == "reasoning": + # Extract reasoning text - could be in summary or content + if hasattr(output_item, "summary"): + self._reasoning_chunks.append(str(output_item.summary)) + elif hasattr(output_item, "content"): + # content might be a list of text items + if isinstance(output_item.content, list): + for content_item in output_item.content: + if hasattr(content_item, "text"): + self._reasoning_chunks.append(str(content_item.text)) + else: + self._reasoning_chunks.append(str(output_item.content)) + elif output_item.type == "message" and hasattr(output_item, "content"): + # Extract text content from message items + if isinstance(output_item.content, list): + for content in output_item.content: + if ( + hasattr(content, "type") + and content.type == "text" + and hasattr(content, "text") + ): + self._content_chunks.append(str(content.text)) + else: + self._content_chunks.append(str(output_item.content)) + + # Only add significant events, not every delta + if hasattr(event, "type") and event.type in [ + "response.created", + "response.completed", + "response.output_item.added", + ]: + self._span.add_event( + "responses_api_event", + {"event_type": event.type, "event_number": self._event_count}, + ) def _finalize_stream(self) -> None: """Finalize the Responses API stream.""" total_time = time.time() - self._start_time - # Aggregate content - full_content = "".join(self._content_chunks) + # Aggregate different types of content + text_content = "".join(self._content_chunks) + function_content = self._current_function_args or "".join(self._function_call_chunks) + reasoning_content = "".join(self._reasoning_chunks) + + # Combine all content types for the completion + full_content = "" + if reasoning_content: + full_content = f"Reasoning: {reasoning_content}" + if function_content: + if full_content: + full_content += f"\nFunction Call: {function_content}" + else: + full_content = f"Function Call: {function_content}" + if text_content: + if full_content: + full_content += f"\nResponse: {text_content}" + else: + full_content = text_content + if full_content: self._span.set_attribute(MessageAttributes.COMPLETION_CONTENT.format(i=0), full_content) + logger.debug( + f"[RESPONSES API] Setting completion content: {full_content[:100]}..." + if len(full_content) > 100 + else f"[RESPONSES API] Setting completion content: {full_content}" + ) # Set timing if self._first_token_time: @@ -586,10 +804,26 @@ def _finalize_stream(self) -> None: self._span.set_attribute("llm.openai.responses.content_length", len(full_content)) self._span.set_attribute("llm.openai.responses.total_duration", total_time) + # Add completion event with summary + self._span.add_event( + "stream_completed", + { + "event_count": self._event_count, + "total_content_length": len(full_content), + "duration": total_time, + "had_function_calls": bool(function_content), + "had_reasoning": bool(reasoning_content), + "had_text": bool(text_content), + }, + ) + # Finalize span and context self._span.set_status(Status(StatusCode.OK)) self._span.end() context_api.detach(self._token) + logger.debug( + f"[RESPONSES API] Finalized streaming span after {self._event_count} events. Content length: {len(full_content)}" + ) @_with_tracer_wrapper @@ -601,70 +835,51 @@ def responses_stream_wrapper(tracer, wrapped, instance, args, kwargs): # Check if streaming is enabled is_streaming = kwargs.get("stream", False) - # If not streaming, just call the wrapped method directly - # The normal instrumentation will handle it - if not is_streaming: - logger.debug("[RESPONSES API WRAPPER] Non-streaming call, delegating to normal instrumentation") - return wrapped(*args, **kwargs) - - # Only create span for streaming responses + # Create span for both streaming and non-streaming span = tracer.start_span( "openai.responses.create", kind=SpanKind.CLIENT, attributes={SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.CHAT.value}, ) + logger.debug(f"[RESPONSES API WRAPPER] Created span for {'streaming' if is_streaming else 'non-streaming'} call") + + # Make sure span is linked to the current trace context + current_context = context_api.get_current() + token = context_api.attach(set_span_in_context(span, current_context)) try: # Extract and set request attributes - span.set_attribute(SpanAttributes.LLM_SYSTEM, "OpenAI") - span.set_attribute(SpanAttributes.LLM_REQUEST_STREAMING, is_streaming) - - if "model" in kwargs: - span.set_attribute(SpanAttributes.LLM_REQUEST_MODEL, kwargs["model"]) - if "messages" in kwargs: - # Set messages as prompts for consistency - messages = kwargs["messages"] - for i, msg in enumerate(messages): - prefix = f"{SpanAttributes.LLM_PROMPTS}.{i}" - if isinstance(msg, dict): - if "role" in msg: - span.set_attribute(f"{prefix}.role", msg["role"]) - if "content" in msg: - span.set_attribute(f"{prefix}.content", msg["content"]) - - # Tools - if "tools" in kwargs: - tools = kwargs["tools"] - if tools: - for i, tool in enumerate(tools): - if isinstance(tool, dict) and "function" in tool: - function = tool["function"] - prefix = f"{SpanAttributes.LLM_REQUEST_FUNCTIONS}.{i}" - if "name" in function: - span.set_attribute(f"{prefix}.name", function["name"]) - if "description" in function: - span.set_attribute(f"{prefix}.description", function["description"]) - if "parameters" in function: - import json - - span.set_attribute(f"{prefix}.parameters", json.dumps(function["parameters"])) - - # Temperature and other parameters - if "temperature" in kwargs: - span.set_attribute(SpanAttributes.LLM_REQUEST_TEMPERATURE, kwargs["temperature"]) - if "top_p" in kwargs: - span.set_attribute(SpanAttributes.LLM_REQUEST_TOP_P, kwargs["top_p"]) + from agentops.instrumentation.providers.openai.wrappers.responses import handle_responses_attributes + + request_attributes = handle_responses_attributes(kwargs=kwargs) + for key, value in request_attributes.items(): + span.set_attribute(key, value) # Call the original method response = wrapped(*args, **kwargs) - # For streaming, wrap the stream - return ResponsesAPIStreamWrapper(response, span, kwargs) + if is_streaming: + # For streaming, wrap the stream + context_api.detach(token) + return ResponsesAPIStreamWrapper(response, span, kwargs) + else: + # For non-streaming, handle response attributes and close span + response_attributes = handle_responses_attributes(kwargs=kwargs, return_value=response) + for key, value in response_attributes.items(): + if key not in request_attributes: # Avoid overwriting request attributes + span.set_attribute(key, value) + + span.set_status(Status(StatusCode.OK)) + span.end() + context_api.detach(token) + logger.debug("[RESPONSES API WRAPPER] Ended non-streaming span") + return response except Exception as e: span.record_exception(e) span.set_status(Status(StatusCode.ERROR, str(e))) span.end() + context_api.detach(token) raise @@ -677,71 +892,50 @@ async def async_responses_stream_wrapper(tracer, wrapped, instance, args, kwargs # Check if streaming is enabled is_streaming = kwargs.get("stream", False) - # If not streaming, just call the wrapped method directly - # The normal instrumentation will handle it - if not is_streaming: - logger.debug("[RESPONSES API WRAPPER] Non-streaming call, delegating to normal instrumentation") - return await wrapped(*args, **kwargs) - - # Only create span for streaming responses + # Create span for both streaming and non-streaming span = tracer.start_span( "openai.responses.create", kind=SpanKind.CLIENT, attributes={SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.CHAT.value}, ) + logger.debug(f"[RESPONSES API WRAPPER] Created span for {'streaming' if is_streaming else 'non-streaming'} call") + + # Make sure span is linked to the current trace context + current_context = context_api.get_current() + token = context_api.attach(set_span_in_context(span, current_context)) try: # Extract and set request attributes - span.set_attribute(SpanAttributes.LLM_SYSTEM, "OpenAI") - span.set_attribute(SpanAttributes.LLM_REQUEST_STREAMING, is_streaming) - - if "model" in kwargs: - span.set_attribute(SpanAttributes.LLM_REQUEST_MODEL, kwargs["model"]) - - if "messages" in kwargs: - # Set messages as prompts for consistency - messages = kwargs["messages"] - for i, msg in enumerate(messages): - prefix = f"{SpanAttributes.LLM_PROMPTS}.{i}" - if isinstance(msg, dict): - if "role" in msg: - span.set_attribute(f"{prefix}.role", msg["role"]) - if "content" in msg: - span.set_attribute(f"{prefix}.content", msg["content"]) - - # Tools - if "tools" in kwargs: - tools = kwargs["tools"] - if tools: - for i, tool in enumerate(tools): - if isinstance(tool, dict) and "function" in tool: - function = tool["function"] - prefix = f"{SpanAttributes.LLM_REQUEST_FUNCTIONS}.{i}" - if "name" in function: - span.set_attribute(f"{prefix}.name", function["name"]) - if "description" in function: - span.set_attribute(f"{prefix}.description", function["description"]) - if "parameters" in function: - import json - - span.set_attribute(f"{prefix}.parameters", json.dumps(function["parameters"])) - - # Temperature and other parameters - if "temperature" in kwargs: - span.set_attribute(SpanAttributes.LLM_REQUEST_TEMPERATURE, kwargs["temperature"]) - if "top_p" in kwargs: - span.set_attribute(SpanAttributes.LLM_REQUEST_TOP_P, kwargs["top_p"]) + from agentops.instrumentation.providers.openai.wrappers.responses import handle_responses_attributes + + request_attributes = handle_responses_attributes(kwargs=kwargs) + for key, value in request_attributes.items(): + span.set_attribute(key, value) # Call the original method response = await wrapped(*args, **kwargs) - # For streaming, wrap the stream - logger.debug("[RESPONSES API WRAPPER] Wrapping streaming response with ResponsesAPIStreamWrapper") - wrapped_stream = ResponsesAPIStreamWrapper(response, span, kwargs) - return wrapped_stream + if is_streaming: + # For streaming, wrap the stream + context_api.detach(token) + logger.debug("[RESPONSES API WRAPPER] Wrapping streaming response with ResponsesAPIStreamWrapper") + return ResponsesAPIStreamWrapper(response, span, kwargs) + else: + # For non-streaming, handle response attributes and close span + response_attributes = handle_responses_attributes(kwargs=kwargs, return_value=response) + for key, value in response_attributes.items(): + if key not in request_attributes: # Avoid overwriting request attributes + span.set_attribute(key, value) + + span.set_status(Status(StatusCode.OK)) + span.end() + context_api.detach(token) + logger.debug("[RESPONSES API WRAPPER] Ended async non-streaming span") + return response except Exception as e: span.record_exception(e) span.set_status(Status(StatusCode.ERROR, str(e))) span.end() + context_api.detach(token) raise diff --git a/agentops/instrumentation/providers/openai/wrappers/__init__.py b/agentops/instrumentation/providers/openai/wrappers/__init__.py index 5348bd91d..b90b8f201 100644 --- a/agentops/instrumentation/providers/openai/wrappers/__init__.py +++ b/agentops/instrumentation/providers/openai/wrappers/__init__.py @@ -14,6 +14,7 @@ handle_run_stream_attributes, handle_messages_attributes, ) +from agentops.instrumentation.providers.openai.wrappers.responses import handle_responses_attributes __all__ = [ "handle_chat_attributes", @@ -25,4 +26,5 @@ "handle_run_retrieve_attributes", "handle_run_stream_attributes", "handle_messages_attributes", + "handle_responses_attributes", ] diff --git a/agentops/instrumentation/providers/openai/wrappers/responses.py b/agentops/instrumentation/providers/openai/wrappers/responses.py new file mode 100644 index 000000000..ebd116cca --- /dev/null +++ b/agentops/instrumentation/providers/openai/wrappers/responses.py @@ -0,0 +1,191 @@ +"""Responses API wrapper for OpenAI instrumentation. + +This module provides attribute extraction for OpenAI Responses API endpoints. +""" + +import json +import logging +from typing import Any, Dict, Optional, Tuple + +from agentops.instrumentation.providers.openai.utils import is_openai_v1 +from agentops.instrumentation.providers.openai.wrappers.shared import ( + model_as_dict, + should_send_prompts, +) +from agentops.instrumentation.common.attributes import AttributeMap +from agentops.semconv import SpanAttributes, LLMRequestTypeValues + +logger = logging.getLogger(__name__) + + +def handle_responses_attributes( + args: Optional[Tuple] = None, + kwargs: Optional[Dict] = None, + return_value: Optional[Any] = None, +) -> AttributeMap: + """Extract attributes from responses API calls.""" + attributes = { + SpanAttributes.LLM_SYSTEM: "OpenAI", + SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.CHAT.value, + } + + # Extract request attributes from kwargs + if kwargs: + # Model + if "model" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_MODEL] = kwargs["model"] + + # Request parameters + if "max_tokens" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_MAX_TOKENS] = kwargs["max_tokens"] + if "temperature" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_TEMPERATURE] = kwargs["temperature"] + if "top_p" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_TOP_P] = kwargs["top_p"] + if "frequency_penalty" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_FREQUENCY_PENALTY] = kwargs["frequency_penalty"] + if "presence_penalty" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_PRESENCE_PENALTY] = kwargs["presence_penalty"] + if "user" in kwargs: + attributes[SpanAttributes.LLM_USER] = kwargs["user"] + + # Streaming + attributes[SpanAttributes.LLM_REQUEST_STREAMING] = kwargs.get("stream", False) + + # Input messages + if should_send_prompts() and "input" in kwargs: + messages = kwargs["input"] + for i, msg in enumerate(messages): + prefix = f"{SpanAttributes.LLM_PROMPTS}.{i}" + if isinstance(msg, dict): + if "role" in msg: + attributes[f"{prefix}.role"] = msg["role"] + if "content" in msg: + content = msg["content"] + if isinstance(content, list): + content = json.dumps(content) + attributes[f"{prefix}.content"] = content + + # Tools + if "tools" in kwargs: + tools = kwargs["tools"] + if tools: + for i, tool in enumerate(tools): + if isinstance(tool, dict) and "function" in tool: + function = tool["function"] + prefix = f"{SpanAttributes.LLM_REQUEST_FUNCTIONS}.{i}" + if "name" in function: + attributes[f"{prefix}.name"] = function["name"] + if "description" in function: + attributes[f"{prefix}.description"] = function["description"] + if "parameters" in function: + attributes[f"{prefix}.parameters"] = json.dumps(function["parameters"]) + + # Extract response attributes from return value + if return_value: + # Convert to dict if needed + response_dict = {} + if hasattr(return_value, "__dict__") and not hasattr(return_value, "__iter__"): + response_dict = model_as_dict(return_value) + elif isinstance(return_value, dict): + response_dict = return_value + elif hasattr(return_value, "model_dump"): + response_dict = return_value.model_dump() + + # Basic response attributes + if "id" in response_dict: + attributes[SpanAttributes.LLM_RESPONSE_ID] = response_dict["id"] + if "model" in response_dict: + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = response_dict["model"] + + # Usage + usage = response_dict.get("usage", {}) + if usage: + if is_openai_v1() and hasattr(usage, "__dict__"): + usage = usage.__dict__ + if "total_tokens" in usage: + attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = usage["total_tokens"] + # Responses API uses input_tokens/output_tokens instead of prompt_tokens/completion_tokens + if "input_tokens" in usage: + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = usage["input_tokens"] + if "output_tokens" in usage: + attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = usage["output_tokens"] + + # Reasoning tokens + output_details = usage.get("output_tokens_details", {}) + if isinstance(output_details, dict) and "reasoning_tokens" in output_details: + attributes[SpanAttributes.LLM_USAGE_REASONING_TOKENS] = output_details["reasoning_tokens"] + + # Output items + if should_send_prompts() and "output" in response_dict: + output_items = response_dict["output"] + completion_idx = 0 + for i, output_item in enumerate(output_items): + # Handle dictionary format + if isinstance(output_item, dict): + item_type = output_item.get("type") + # Handle object format (Pydantic models) + elif hasattr(output_item, "type"): + item_type = output_item.type + output_item_dict = model_as_dict(output_item) + if output_item_dict and isinstance(output_item_dict, dict): + output_item = output_item_dict + else: + continue + else: + continue + + if item_type == "message": + # Extract message content + if isinstance(output_item, dict): + content = output_item.get("content", []) + if isinstance(content, list): + # Aggregate all text content + text_parts = [] + for content_item in content: + if isinstance(content_item, dict) and content_item.get("type") == "text": + text = content_item.get("text", "") + if text: + text_parts.append(text) + if text_parts: + full_text = "".join(text_parts) + attributes[f"{SpanAttributes.LLM_COMPLETIONS}.{completion_idx}.content"] = full_text + attributes[f"{SpanAttributes.LLM_COMPLETIONS}.{completion_idx}.role"] = "assistant" + completion_idx += 1 + elif isinstance(content, str): + # Simple string content + attributes[f"{SpanAttributes.LLM_COMPLETIONS}.{completion_idx}.content"] = content + attributes[f"{SpanAttributes.LLM_COMPLETIONS}.{completion_idx}.role"] = "assistant" + completion_idx += 1 + + elif item_type == "function_call" and isinstance(output_item, dict): + # Handle function calls + # The arguments contain the actual response content for function calls + args_str = output_item.get("arguments", "") + if args_str: + try: + args = json.loads(args_str) + # Extract reasoning if present (common in o3 models) + reasoning = args.get("reasoning", "") + if reasoning: + attributes[f"{SpanAttributes.LLM_COMPLETIONS}.{completion_idx}.content"] = reasoning + attributes[f"{SpanAttributes.LLM_COMPLETIONS}.{completion_idx}.role"] = "assistant" + completion_idx += 1 + except json.JSONDecodeError: + pass + + # Also store tool call details + attributes[f"{SpanAttributes.LLM_COMPLETIONS}.{i}.tool_calls.0.id"] = output_item.get("id", "") + attributes[f"{SpanAttributes.LLM_COMPLETIONS}.{i}.tool_calls.0.name"] = output_item.get("name", "") + attributes[f"{SpanAttributes.LLM_COMPLETIONS}.{i}.tool_calls.0.arguments"] = args_str + + elif item_type == "reasoning" and isinstance(output_item, dict): + # Handle reasoning items (o3 models provide these) + summary = output_item.get("summary", "") + if summary: + attributes[f"{SpanAttributes.LLM_COMPLETIONS}.{completion_idx}.content"] = summary + attributes[f"{SpanAttributes.LLM_COMPLETIONS}.{completion_idx}.role"] = "assistant" + attributes[f"{SpanAttributes.LLM_COMPLETIONS}.{completion_idx}.type"] = "reasoning" + completion_idx += 1 + + return attributes diff --git a/examples/agno/agno_async_operations.ipynb b/examples/agno/agno_async_operations.ipynb index 576d55d53..b37fe6461 100644 --- a/examples/agno/agno_async_operations.ipynb +++ b/examples/agno/agno_async_operations.ipynb @@ -46,7 +46,6 @@ "\n", "import agentops\n", "from agno.agent import Agent\n", - "from agno.team import Team\n", "from agno.models.openai import OpenAIChat" ] }, diff --git a/examples/langgraph/langgraph_example.ipynb b/examples/langgraph/langgraph_example.ipynb index 3272c2308..8ab1e8caa 100644 --- a/examples/langgraph/langgraph_example.ipynb +++ b/examples/langgraph/langgraph_example.ipynb @@ -45,7 +45,7 @@ "from langgraph.graph import StateGraph, END\n", "from langgraph.graph.message import add_messages\n", "from langchain_openai import ChatOpenAI\n", - "from langchain_core.messages import HumanMessage, AIMessage, ToolMessage\n", + "from langchain_core.messages import HumanMessage, ToolMessage\n", "from langchain_core.tools import tool\n", "import agentops\n", "from dotenv import load_dotenv\n", diff --git a/examples/mem0/mem0_memory_example.ipynb b/examples/mem0/mem0_memory_example.ipynb index 2c7e6bbae..d04313065 100644 --- a/examples/mem0/mem0_memory_example.ipynb +++ b/examples/mem0/mem0_memory_example.ipynb @@ -55,8 +55,6 @@ "from mem0 import Memory, AsyncMemory\n", "import os\n", "import asyncio\n", - "import logging\n", - "from dotenv import load_dotenv\n", "import agentops" ] }, @@ -189,7 +187,7 @@ " print(f\"Delete all result: {delete_all_result}\")\n", "\n", " agentops.end_trace(end_state=\"success\")\n", - " except Exception as e:\n", + " except Exception:\n", " agentops.end_trace(end_state=\"error\")" ] }, @@ -263,7 +261,7 @@ "\n", " agentops.end_trace(end_state=\"success\")\n", "\n", - " except Exception as e:\n", + " except Exception:\n", " agentops.end_trace(end_state=\"error\")" ] }, diff --git a/examples/mem0/mem0_memoryclient_example.ipynb b/examples/mem0/mem0_memoryclient_example.ipynb index 1e8130c1b..0ccc8b199 100644 --- a/examples/mem0/mem0_memoryclient_example.ipynb +++ b/examples/mem0/mem0_memoryclient_example.ipynb @@ -199,7 +199,7 @@ " delete_all_result = client.delete_all(user_id=user_id)\n", " print(f\"Delete all result: {delete_all_result}\")\n", " agentops.end_trace(end_state=\"success\")\n", - " except Exception as e:\n", + " except Exception:\n", " agentops.end_trace(end_state=\"error\")" ] }, @@ -279,7 +279,7 @@ "\n", " agentops.end_trace(end_state=\"success\")\n", "\n", - " except Exception as e:\n", + " except Exception:\n", " agentops.end_trace(end_state=\"error\")" ] }, diff --git a/examples/openai/README.md b/examples/openai/README.md index 598fa80e4..bcec79ee5 100644 --- a/examples/openai/README.md +++ b/examples/openai/README.md @@ -39,6 +39,16 @@ Example: `web_search` This example demonstrates: - Web search functionality +### 5. o3 Responses API + +Example: `o3_responses_example` + +This example demonstrates: +- OpenAI's o3 reasoning model with the Responses API +- Tool calls and structured reasoning +- Complex decision-making scenarios +- AgentOps integration with reasoning models + ## AgentOps Integration These examples show how to use AgentOps to monitor and analyze your AI applications. AgentOps automatically instruments your OpenAI calls to provide insights into performance, usage patterns, and model behavior. diff --git a/examples/openai/o3_responses_example.py b/examples/openai/o3_responses_example.py new file mode 100644 index 000000000..ea4f96af8 --- /dev/null +++ b/examples/openai/o3_responses_example.py @@ -0,0 +1,450 @@ +# OpenAI o3 Responses API Example +# +# This example demonstrates AgentOps integration with OpenAI's o3 reasoning model +# through the Responses API. The o3 model excels at complex problem solving and +# multi-step reasoning with tool calls. +# +# This example tests both streaming and non-streaming modes, as well as async versions. + +import openai +import agentops +import json +import os +import asyncio +from dotenv import load_dotenv +from agentops.sdk.decorators import agent +from typing import List, Dict, Any + +# Load environment variables +load_dotenv() +os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY", "your_openai_api_key_here") +os.environ["AGENTOPS_API_KEY"] = os.getenv("AGENTOPS_API_KEY", "your_api_key_here") + +# Initialize AgentOps +agentops.init( + trace_name="o3-responses-example", + tags=["o3", "responses-api"], + auto_start_session=False, +) +tracer = agentops.start_trace(trace_name="o3 Responses API Example", tags=["o3", "responses-api"]) + +# Initialize OpenAI client +client = openai.OpenAI() +async_client = openai.AsyncOpenAI() + +# ANSI escape codes for colors +LIGHT_BLUE = "\033[94m" +YELLOW = "\033[93m" +GREEN = "\033[92m" +RESET_COLOR = "\033[0m" + + +def create_decision_prompt(scenario: str, available_actions: List[str]) -> str: + """Create a prompt for decision making.""" + return f""" +You are a strategic decision-making agent. You need to analyze the current scenario and choose the best action from the available options. + +Current Scenario: +{scenario} + +Available Actions: +{chr(10).join(f"- {action}" for action in available_actions)} + +Your goal is to make the best strategic decision based on the scenario. Consider: +1. The immediate benefits of each action +2. Potential long-term consequences +3. Risk vs reward trade-offs +4. Strategic positioning + +Reason carefully about the best action to take and explain your reasoning. +""" + + +@agent +class O3DecisionAgent: + """A decision-making agent that uses OpenAI's o3 model with the Responses API.""" + + def __init__(self, model: str = "o3-mini", color: str = LIGHT_BLUE): + self.model = model + self.color = color + + def make_decision_sync(self, scenario: str, available_actions: List[str], stream: bool = False) -> Dict[str, Any]: + """ + Make a decision using the o3 model synchronously. + + Args: + scenario: Description of the current situation + available_actions: List of possible actions to choose from + stream: Whether to use streaming mode + + Returns: + Dictionary containing the chosen action and reasoning + """ + + # Define the tool for action selection + tools = [ + { + "type": "function", + "name": "select_action", + "description": "Select the best action from the available options.", + "parameters": { + "type": "object", + "properties": { + "action": {"type": "string", "description": "The selected action from the available options"}, + "reasoning": { + "type": "string", + "description": "Detailed reasoning for why this action was chosen", + }, + }, + "required": ["action", "reasoning"], + "additionalProperties": False, + }, + } + ] + + # Create the prompt + system_prompt = create_decision_prompt(scenario, available_actions) + user_message = f"Select the best action from these options: {available_actions}. Provide your reasoning and make your choice." + + mode_desc = "streaming" if stream else "non-streaming" + print(f"{self.color}Making decision with o3 model ({mode_desc})...{RESET_COLOR}") + print(f"{self.color}Scenario: {scenario}{RESET_COLOR}") + print(f"{self.color}Available actions: {available_actions}{RESET_COLOR}") + + # Make the API call using the Responses API + if stream: + response = client.responses.create( + model=self.model, + input=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_message}], + tools=tools, # type: ignore + tool_choice="required", + stream=True, + ) + + # Process streaming response + tool_call = None + + for event in response: + if hasattr(event, "type"): + if event.type == "response.output_text.delta": + # Handle text deltas (if any) + pass + elif event.type == "response.function_call_arguments.delta": + # Tool arguments are accumulated by the API + pass + elif event.type == "response.output_item.added": + # New tool call started + if hasattr(event, "output_item") and event.output_item.type == "function_call": + pass # Tool call tracking handled elsewhere + elif event.type == "response.completed": + # Process final response + if hasattr(event, "response") and hasattr(event.response, "output"): + for output_item in event.response.output: + if output_item.type == "function_call": + tool_call = output_item + break + + if tool_call: + args = json.loads(tool_call.arguments) + chosen_action = args["action"] + reasoning = args["reasoning"] + + print(f"{self.color}Chosen action: {chosen_action}{RESET_COLOR}") + print(f"{self.color}Tool reasoning: {reasoning}{RESET_COLOR}") + + return { + "action": chosen_action, + "reasoning": reasoning, + "available_actions": available_actions, + "scenario": scenario, + "mode": "sync_streaming", + } + else: + response = client.responses.create( + model=self.model, + input=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_message}], + tools=tools, # type: ignore + tool_choice="required", + ) + + # Process non-streaming response + tool_call = None + reasoning_text = "" + + for output_item in response.output: + if output_item.type == "function_call": + tool_call = output_item + elif output_item.type == "message" and hasattr(output_item, "content"): + for content in output_item.content: + if hasattr(content, "type"): + if content.type == "text" and hasattr(content, "text"): + reasoning_text += content.text + print(f"{self.color}Reasoning: {content.text}{RESET_COLOR}") + elif content.type == "refusal" and hasattr(content, "refusal"): + print(f"{self.color}Refusal: {content.refusal}{RESET_COLOR}") + + if tool_call: + args = json.loads(tool_call.arguments) + chosen_action = args["action"] + reasoning = args["reasoning"] + + print(f"{self.color}Chosen action: {chosen_action}{RESET_COLOR}") + print(f"{self.color}Tool reasoning: {reasoning}{RESET_COLOR}") + + return { + "action": chosen_action, + "reasoning": reasoning, + "full_reasoning": reasoning_text, + "available_actions": available_actions, + "scenario": scenario, + "mode": "sync_non_streaming", + } + + # Fallback + print(f"{self.color}No tool call found, using fallback{RESET_COLOR}") + return { + "action": available_actions[0] if available_actions else "no_action", + "reasoning": "Fallback: No tool call received", + "available_actions": available_actions, + "scenario": scenario, + "mode": f"sync_{mode_desc}_fallback", + } + + async def make_decision_async( + self, scenario: str, available_actions: List[str], stream: bool = False + ) -> Dict[str, Any]: + """ + Make a decision using the o3 model asynchronously. + + Args: + scenario: Description of the current situation + available_actions: List of possible actions to choose from + stream: Whether to use streaming mode + + Returns: + Dictionary containing the chosen action and reasoning + """ + + # Define the tool for action selection + tools = [ + { + "type": "function", + "name": "select_action", + "description": "Select the best action from the available options.", + "parameters": { + "type": "object", + "properties": { + "action": {"type": "string", "description": "The selected action from the available options"}, + "reasoning": { + "type": "string", + "description": "Detailed reasoning for why this action was chosen", + }, + }, + "required": ["action", "reasoning"], + "additionalProperties": False, + }, + } + ] + + # Create the prompt + system_prompt = create_decision_prompt(scenario, available_actions) + user_message = f"Select the best action from these options: {available_actions}. Provide your reasoning and make your choice." + + mode_desc = "streaming" if stream else "non-streaming" + print(f"{self.color}Making async decision with o3 model ({mode_desc})...{RESET_COLOR}") + print(f"{self.color}Scenario: {scenario}{RESET_COLOR}") + print(f"{self.color}Available actions: {available_actions}{RESET_COLOR}") + + # Make the API call using the Responses API + if stream: + response = await async_client.responses.create( + model=self.model, + input=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_message}], + tools=tools, # type: ignore + tool_choice="required", + stream=True, + ) + + # Process streaming response + tool_call = None + + async for event in response: + if hasattr(event, "type"): + if event.type == "response.output_text.delta": + # Handle text deltas (if any) + pass + elif event.type == "response.function_call_arguments.delta": + # Tool arguments are accumulated by the API + pass + elif event.type == "response.output_item.added": + # New tool call started + if hasattr(event, "output_item") and event.output_item.type == "function_call": + pass # Tool call tracking handled elsewhere + elif event.type == "response.completed": + # Process final response + if hasattr(event, "response") and hasattr(event.response, "output"): + for output_item in event.response.output: + if output_item.type == "function_call": + tool_call = output_item + break + + if tool_call: + args = json.loads(tool_call.arguments) + chosen_action = args["action"] + reasoning = args["reasoning"] + + print(f"{self.color}Chosen action: {chosen_action}{RESET_COLOR}") + print(f"{self.color}Tool reasoning: {reasoning}{RESET_COLOR}") + + return { + "action": chosen_action, + "reasoning": reasoning, + "available_actions": available_actions, + "scenario": scenario, + "mode": "async_streaming", + } + else: + response = await async_client.responses.create( + model=self.model, + input=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_message}], + tools=tools, # type: ignore + tool_choice="required", + ) + + # Process non-streaming response + tool_call = None + reasoning_text = "" + + for output_item in response.output: + if output_item.type == "function_call": + tool_call = output_item + elif output_item.type == "message" and hasattr(output_item, "content"): + for content in output_item.content: + if hasattr(content, "type"): + if content.type == "text" and hasattr(content, "text"): + reasoning_text += content.text + print(f"{self.color}Reasoning: {content.text}{RESET_COLOR}") + elif content.type == "refusal" and hasattr(content, "refusal"): + print(f"{self.color}Refusal: {content.refusal}{RESET_COLOR}") + + if tool_call: + args = json.loads(tool_call.arguments) + chosen_action = args["action"] + reasoning = args["reasoning"] + + print(f"{self.color}Chosen action: {chosen_action}{RESET_COLOR}") + print(f"{self.color}Tool reasoning: {reasoning}{RESET_COLOR}") + + return { + "action": chosen_action, + "reasoning": reasoning, + "full_reasoning": reasoning_text, + "available_actions": available_actions, + "scenario": scenario, + "mode": "async_non_streaming", + } + + # Fallback + print(f"{self.color}No tool call found, using fallback{RESET_COLOR}") + return { + "action": available_actions[0] if available_actions else "no_action", + "reasoning": "Fallback: No tool call received", + "available_actions": available_actions, + "scenario": scenario, + "mode": f"async_{mode_desc}_fallback", + } + + +async def run_example(): + """Run the example with multiple scenarios in different modes.""" + + # Create agents with different colors for different modes + sync_agent = O3DecisionAgent(model="o3-mini", color=LIGHT_BLUE) + + # Test scenario + scenario = { + "scenario": "You're managing a project with limited resources and need to prioritize tasks.", + "actions": ["focus_on_critical_path", "distribute_resources_evenly", "outsource_some_tasks", "extend_deadline"], + } + + results = [] + + # Test 2: Sync streaming + print(f"\n{'=' * 60}") + print(f"{LIGHT_BLUE}Test: Synchronous Streaming{RESET_COLOR}") + print(f"{'=' * 60}") + result = sync_agent.make_decision_sync( + scenario=scenario["scenario"], available_actions=scenario["actions"], stream=True + ) + results.append(result) + + # Test 3: Sync non-streaming + print(f"\n{'=' * 60}") + print(f"{LIGHT_BLUE}Test: Synchronous Non-Streaming{RESET_COLOR}") + print(f"{'=' * 60}") + result = sync_agent.make_decision_sync( + scenario=scenario["scenario"], available_actions=scenario["actions"], stream=False + ) + results.append(result) + + # Test 4: Async streaming + print(f"\n{'=' * 60}") + print(f"{LIGHT_BLUE}Test: Asynchronous Streaming{RESET_COLOR}") + print(f"{'=' * 60}") + result = await sync_agent.make_decision_async( + scenario=scenario["scenario"], available_actions=scenario["actions"], stream=True + ) + results.append(result) + + # Test 5: Async non-streaming + print(f"\n{'=' * 60}") + print(f"{LIGHT_BLUE}Test: Asynchronous Non-Streaming{RESET_COLOR}") + print(f"{'=' * 60}") + result = await sync_agent.make_decision_async( + scenario=scenario["scenario"], available_actions=scenario["actions"], stream=False + ) + results.append(result) + + return results + + +def main(): + """Main function to run the example.""" + print("Starting OpenAI o3 Responses API Example (All Modes)") + print("=" * 60) + + try: + # Run async example + results = asyncio.run(run_example()) + + print(f"\n{'=' * 60}") + print("Example Summary") + print(f"{'=' * 60}") + + for i, result in enumerate(results, 1): + print(f"Test {i} ({result.get('mode', 'unknown')}): {result['action']}") + + # End the trace + agentops.end_trace(tracer, end_state="Success") + + # Validate the trace + print(f"\n{'=' * 60}") + print("Validating AgentOps Trace") + print(f"{'=' * 60}") + + try: + validation_result = agentops.validate_trace_spans(trace_context=tracer) + agentops.print_validation_summary(validation_result) + print("✅ Example completed successfully!") + except agentops.ValidationError as e: + print(f"❌ Error validating spans: {e}") + raise + + except Exception as e: + print(f"❌ Example failed: {e}") + agentops.end_trace(tracer, end_state="Error") + raise + + +if __name__ == "__main__": + main() diff --git a/examples/smolagents/multi_smolagents_system.ipynb b/examples/smolagents/multi_smolagents_system.ipynb index 637842c01..d01c6d6d1 100644 --- a/examples/smolagents/multi_smolagents_system.ipynb +++ b/examples/smolagents/multi_smolagents_system.ipynb @@ -96,7 +96,6 @@ "metadata": {}, "outputs": [], "source": [ - "from smolagents import LiteLLMModel, tool, CodeAgent, ToolCallingAgent, DuckDuckGoSearchTool\n", "\n", "agentops.init(auto_start_session=False)\n", "tracer = agentops.start_trace(\n",