diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index 732af7666..b9cdcb226 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -58,7 +58,7 @@ def get_instance(self) -> BaseInstrumentor: provider_import_name="openai", ), InstrumentorLoader( - module_name="opentelemetry.instrumentation.anthropic", + module_name="agentops.instrumentation.anthropic", class_name="AnthropicInstrumentor", provider_import_name="anthropic", ), diff --git a/agentops/instrumentation/anthropic/__init__.py b/agentops/instrumentation/anthropic/__init__.py new file mode 100644 index 000000000..91a197253 --- /dev/null +++ b/agentops/instrumentation/anthropic/__init__.py @@ -0,0 +1,38 @@ +"""Anthropic API instrumentation. + +This module provides instrumentation for the Anthropic API, +including chat completions, streaming, and event handling. +""" + +import logging +from typing import Collection + +def get_version() -> str: + """Get the version of the Anthropic SDK, or 'unknown' if not found + + Attempts to retrieve the installed version of the Anthropic SDK using importlib.metadata. + Falls back to 'unknown' if the version cannot be determined. + + Returns: + The version string of the Anthropic SDK or 'unknown' + """ + try: + from importlib.metadata import version + return version("anthropic") + except ImportError: + logger.debug("Could not find Anthropic SDK version") + return "unknown" + +LIBRARY_NAME = "anthropic" +LIBRARY_VERSION: str = get_version() + +logger = logging.getLogger(__name__) + +# Import after defining constants to avoid circular imports +from agentops.instrumentation.anthropic.instrumentor import AnthropicInstrumentor # noqa: E402 + +__all__ = [ + "LIBRARY_NAME", + "LIBRARY_VERSION", + "AnthropicInstrumentor", +] \ No newline at end of file diff --git a/agentops/instrumentation/anthropic/attributes/__init__.py b/agentops/instrumentation/anthropic/attributes/__init__.py new file mode 100644 index 000000000..37b7384ef --- /dev/null +++ b/agentops/instrumentation/anthropic/attributes/__init__.py @@ -0,0 +1,20 @@ +"""Attribute extraction for Anthropic API instrumentation.""" + +from agentops.instrumentation.anthropic.attributes.common import get_common_instrumentation_attributes +from agentops.instrumentation.anthropic.attributes.message import get_message_attributes, get_completion_attributes +from agentops.instrumentation.anthropic.attributes.tools import ( + extract_tool_definitions, + extract_tool_use_blocks, + extract_tool_results, + get_tool_attributes +) + +__all__ = [ + "get_common_instrumentation_attributes", + "get_message_attributes", + "get_completion_attributes", + "extract_tool_definitions", + "extract_tool_use_blocks", + "extract_tool_results", + "get_tool_attributes", +] \ No newline at end of file diff --git a/agentops/instrumentation/anthropic/attributes/common.py b/agentops/instrumentation/anthropic/attributes/common.py new file mode 100644 index 000000000..e6033d9bc --- /dev/null +++ b/agentops/instrumentation/anthropic/attributes/common.py @@ -0,0 +1,62 @@ +"""Common attribute extraction for Anthropic instrumentation.""" + +from typing import Dict, Any + +from agentops.logging import logger +from agentops.semconv import InstrumentationAttributes, SpanAttributes +from agentops.instrumentation.common.attributes import AttributeMap, get_common_attributes +from agentops.instrumentation.anthropic import LIBRARY_NAME, LIBRARY_VERSION + +def get_common_instrumentation_attributes() -> AttributeMap: + """Get common instrumentation attributes for the Anthropic instrumentation. + + This combines the generic AgentOps attributes with Anthropic specific library attributes. + + Returns: + Dictionary of common instrumentation attributes + """ + attributes = get_common_attributes() + attributes.update({ + InstrumentationAttributes.LIBRARY_NAME: LIBRARY_NAME, + InstrumentationAttributes.LIBRARY_VERSION: LIBRARY_VERSION, + }) + return attributes + + +def extract_request_attributes(kwargs: Dict[str, Any]) -> AttributeMap: + """Extract all request attributes from kwargs. + + This consolidated function extracts all relevant attributes from the request + kwargs, including model, system prompt, messages, max_tokens, temperature, + and other parameters. It replaces the individual extraction functions with + a single comprehensive approach. + + Args: + kwargs: Request keyword arguments + + Returns: + Dictionary of extracted request attributes + """ + attributes = {} + + # Extract model + if 'model' in kwargs: + attributes[SpanAttributes.LLM_REQUEST_MODEL] = kwargs["model"] + + # Extract max_tokens + if 'max_tokens' in kwargs: + attributes[SpanAttributes.LLM_REQUEST_MAX_TOKENS] = kwargs["max_tokens"] + + # Extract temperature + if 'temperature' in kwargs: + attributes[SpanAttributes.LLM_REQUEST_TEMPERATURE] = kwargs["temperature"] + + # Extract top_p + if "top_p" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_TOP_P] = kwargs["top_p"] + + # Extract streaming + if "stream" in kwargs: + attributes[SpanAttributes.LLM_REQUEST_STREAMING] = kwargs["stream"] + + return attributes \ No newline at end of file diff --git a/agentops/instrumentation/anthropic/attributes/message.py b/agentops/instrumentation/anthropic/attributes/message.py new file mode 100644 index 000000000..ee0ea0ae3 --- /dev/null +++ b/agentops/instrumentation/anthropic/attributes/message.py @@ -0,0 +1,492 @@ +"""Attribute extraction for Anthropic Message responses.""" + +import json +from typing import Dict, Any, Optional, Tuple + +from agentops.logging import logger +from agentops.semconv import ( + SpanAttributes, + LLMRequestTypeValues, + MessageAttributes, +) +from agentops.instrumentation.common.attributes import AttributeMap +from agentops.instrumentation.anthropic.attributes.common import ( + get_common_instrumentation_attributes, + extract_request_attributes, +) +from agentops.instrumentation.anthropic.attributes.tools import ( + extract_tool_definitions, + get_tool_attributes, +) + +def get_message_attributes(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None, + return_value: Any = None) -> AttributeMap: + """Extract attributes from Anthropic message API call. + + This handles both the request parameters (in kwargs) and the response value + (in return_value) for comprehensive instrumentation. It serves as the main + attribute extraction function for the modern Messages API, handling both + synchronous and asynchronous calls in a consistent manner. + + Args: + args: Positional arguments (not used in this handler) + kwargs: Keyword arguments from the API call + return_value: Response object from the API call + + Returns: + Dictionary of attributes extracted from the request/response + """ + attributes = get_common_instrumentation_attributes() + attributes[SpanAttributes.LLM_REQUEST_TYPE] = LLMRequestTypeValues.CHAT.value + + if kwargs: + attributes.update(get_message_request_attributes(kwargs)) + + if return_value: + try: + from anthropic.types import Message, MessageStartEvent, ContentBlockStartEvent, ContentBlockDeltaEvent, MessageStopEvent, MessageStreamEvent + + if isinstance(return_value, Message): + attributes.update(get_message_response_attributes(return_value)) + + if hasattr(return_value, "content"): + attributes.update(get_tool_attributes(return_value.content)) + elif isinstance(return_value, MessageStreamEvent): + attributes.update(get_stream_attributes(return_value)) + elif isinstance(return_value, (MessageStartEvent, ContentBlockStartEvent, ContentBlockDeltaEvent, MessageStopEvent)): + attributes.update(get_stream_event_attributes(return_value)) + else: + logger.debug(f"[agentops.instrumentation.anthropic] Unrecognized return type: {type(return_value)}") + except Exception as e: + logger.debug(f"[agentops.instrumentation.anthropic] Error extracting response attributes: {e}") + + return attributes + + +def get_completion_attributes(args: Optional[Tuple] = None, kwargs: Optional[Dict] = None, + return_value: Any = None) -> AttributeMap: + """Extract attributes from Anthropic completion API call (legacy API). + + This handles both the request parameters (in kwargs) and the response value + (in return_value) for comprehensive instrumentation of the legacy Completions API. + While similar to get_message_attributes, it accounts for the differences in the + request and response formats between the modern and legacy APIs. + + Args: + args: Positional arguments (not used in this handler) + kwargs: Keyword arguments from the API call + return_value: Response object from the API call + + Returns: + Dictionary of attributes extracted from the request/response + """ + attributes = get_common_instrumentation_attributes() + attributes[SpanAttributes.LLM_REQUEST_TYPE] = LLMRequestTypeValues.COMPLETION.value + + if kwargs: + attributes.update(get_completion_request_attributes(kwargs)) + + if return_value: + try: + if hasattr(return_value, "__class__") and return_value.__class__.__name__ == "Completion": + attributes.update(get_completion_response_attributes(return_value)) + elif hasattr(return_value, "__class__") and return_value.__class__.__name__ == "Stream": + attributes.update(get_stream_attributes(return_value)) + else: + logger.debug(f"[agentops.instrumentation.anthropic] Unrecognized completion return type: {type(return_value)}") + except Exception as e: + logger.debug(f"[agentops.instrumentation.anthropic] Error extracting completion response attributes: {e}") + + return attributes + + +def _process_content(content, role, index): + """Helper function to process content and extract attributes. + + Args: + content: The content to process + role: The role of the message + index: The index of the message + + Returns: + Dictionary of attributes for this content + """ + attributes = {} + + if isinstance(content, str): + # String content is easy + attributes[MessageAttributes.PROMPT_ROLE.format(i=index)] = role + attributes[MessageAttributes.PROMPT_CONTENT.format(i=index)] = content + attributes[MessageAttributes.PROMPT_TYPE.format(i=index)] = "text" + elif isinstance(content, list): + # For list content, create a simplified representation + content_str = "" + for item in content: + if isinstance(item, dict) and "type" in item: + if item["type"] == "text" and "text" in item: + content_str += item["text"] + " " + elif item["type"] == "tool_result" and "content" in item: + content_str += f"[Tool Result: {str(item['content'])}] " + elif hasattr(item, "type"): + if item.type == "text" and hasattr(item, "text"): + content_str += item.text + " " + + attributes[MessageAttributes.PROMPT_ROLE.format(i=index)] = role + attributes[MessageAttributes.PROMPT_CONTENT.format(i=index)] = content_str.strip() + attributes[MessageAttributes.PROMPT_TYPE.format(i=index)] = "text" + else: + # Other types - try to convert to string + try: + simple_content = str(content) + attributes[MessageAttributes.PROMPT_ROLE.format(i=index)] = role + attributes[MessageAttributes.PROMPT_CONTENT.format(i=index)] = simple_content + attributes[MessageAttributes.PROMPT_TYPE.format(i=index)] = "text" + except: + # Ultimate fallback + attributes[MessageAttributes.PROMPT_ROLE.format(i=index)] = role + attributes[MessageAttributes.PROMPT_CONTENT.format(i=index)] = "(complex content)" + attributes[MessageAttributes.PROMPT_TYPE.format(i=index)] = "unknown" + + return attributes + +def _create_simplified_message(msg): + """Helper function to create a simplified message for LLM_PROMPTS attribute. + + Args: + msg: The message to simplify + + Returns: + Dictionary with role and content + """ + role = msg.get("role", "user") + content = msg.get("content", "") + + if isinstance(content, str): + return {"role": role, "content": content} + elif isinstance(content, list): + content_str = "" + for item in content: + if isinstance(item, dict) and "type" in item: + if item["type"] == "text" and "text" in item: + content_str += item["text"] + " " + elif item["type"] == "tool_result" and "content" in item: + content_str += f"[Tool Result: {str(item['content'])}] " + elif hasattr(item, "type"): + if item.type == "text" and hasattr(item, "text"): + content_str += item.text + " " + return {"role": role, "content": content_str.strip()} + else: + try: + return {"role": role, "content": str(content)} + except: + return {"role": role, "content": "(complex content)"} + +def get_message_request_attributes(kwargs: Dict[str, Any]) -> AttributeMap: + """Extract attributes from message request parameters. + + This function processes the request parameters for the Messages API call and extracts + standardized attributes for telemetry. It handles different message formats including + system prompts, user/assistant messages, and tool-using messages. + + It extracts: + - System prompt (if present) + - User and assistant messages + - Tool definitions (if present) + - Model parameters (temperature, max_tokens, etc.) + + Args: + kwargs: Request keyword arguments + + Returns: + Dictionary of extracted attributes + """ + attributes = extract_request_attributes(kwargs=kwargs) + + # Extract system prompt if present + system = kwargs.get("system", "") + if system: + attributes[MessageAttributes.PROMPT_ROLE.format(i=0)] = "system" + attributes[MessageAttributes.PROMPT_CONTENT.format(i=0)] = system + attributes[MessageAttributes.PROMPT_TYPE.format(i=0)] = "text" + + # Extract messages + messages = kwargs.get("messages", []) + for index, msg in enumerate(messages): + role = msg.get("role", "user") + content = msg.get("content", "") + + # Process content and extract attributes + content_attributes = _process_content(content, role, index) + attributes.update(content_attributes) + + # Extract tools if present + tools = kwargs.get("tools", []) + if tools: + tool_attributes = extract_tool_definitions(tools) + attributes.update(tool_attributes) + + return attributes + + +def get_completion_request_attributes(kwargs: Dict[str, Any]) -> AttributeMap: + """Extract attributes from completion request parameters (legacy API). + + This function handles the legacy Completions API format, which differs from + the modern Messages API in its structure and parameters. It standardizes + the attributes to make them consistent with the OpenTelemetry conventions. + + This is specifically for the older Anthropic API format which used a prompt + parameter rather than the messages array format of the newer API. + + Args: + kwargs: Keyword arguments from the legacy API call + + Returns: + Dictionary of extracted attributes + """ + attributes = extract_request_attributes(kwargs=kwargs) + + prompt = kwargs.get("prompt", "") + if prompt: + # Use structured prompt attributes + attributes[MessageAttributes.PROMPT_ROLE.format(i=0)] = "user" + attributes[MessageAttributes.PROMPT_CONTENT.format(i=0)] = prompt + attributes[MessageAttributes.PROMPT_TYPE.format(i=0)] = "text" + + return attributes + + +def get_message_response_attributes(response: "Message") -> AttributeMap: + """Extract attributes from a Message response. + + This function processes the response from the Messages API call and extracts + standardized attributes for telemetry. It handles different response structures + including text content, token usage, and tool-using responses. + + It extracts: + - Completion content (the assistant's response) + - Token usage metrics (input, output, total) + - Model information + - Content type information + - Tool usage information (via related functions) + + Args: + response: The Message response object from Anthropic + + Returns: + Dictionary of extracted attributes + """ + attributes = {} + + # Extract message ID + if hasattr(response, "id"): + message_id = response.id + attributes[SpanAttributes.LLM_RESPONSE_ID] = message_id + # Also add to the completion ID + attributes[MessageAttributes.COMPLETION_ID.format(i=0)] = message_id + + # Extract model + if hasattr(response, "model"): + model = response.model + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = model + + # Extract usage information + if hasattr(response, "usage"): + usage = response.usage + if hasattr(usage, "input_tokens"): + input_tokens = usage.input_tokens + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = input_tokens + + if hasattr(usage, "output_tokens"): + output_tokens = usage.output_tokens + attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = output_tokens + + if hasattr(usage, "input_tokens") and hasattr(usage, "output_tokens"): + total_tokens = usage.input_tokens + usage.output_tokens + attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = total_tokens + + # Extract stop reason if available + if hasattr(response, "stop_reason"): + stop_reason = response.stop_reason + attributes[SpanAttributes.LLM_RESPONSE_STOP_REASON] = stop_reason + attributes[SpanAttributes.LLM_RESPONSE_FINISH_REASON] = stop_reason + attributes[MessageAttributes.COMPLETION_FINISH_REASON.format(i=0)] = stop_reason + + # Extract content + if hasattr(response, "content"): + try: + content_list = response.content + + # Set role for all content (assistant for Claude) + attributes[MessageAttributes.COMPLETION_ROLE.format(i=0)] = "assistant" + + # Process different content block types + extracted_content = [] + tool_calls = [] + + for i, block in enumerate(content_list): + if hasattr(block, "type") and block.type == "text": + # Add as text content + text_content = block.text if hasattr(block, "text") else "" + extracted_content.append({"type": "text", "text": text_content}) + # Use structured completion attributes + attributes[MessageAttributes.COMPLETION_TYPE.format(i=i)] = "text" + attributes[MessageAttributes.COMPLETION_CONTENT.format(i=i)] = text_content + + elif hasattr(block, "type") and block.type == "tool_use": + # Add as tool call + tool_call = { + "name": block.name if hasattr(block, "name") else "unknown", + "id": block.id if hasattr(block, "id") else "unknown", + "input": block.input if hasattr(block, "input") else {} + } + tool_calls.append(tool_call) + + # Add structured tool call attributes + j = len(tool_calls) - 1 + attributes[MessageAttributes.COMPLETION_TOOL_CALL_NAME.format(i=0, j=j)] = tool_call["name"] + attributes[MessageAttributes.COMPLETION_TOOL_CALL_ID.format(i=0, j=j)] = tool_call["id"] + attributes[MessageAttributes.COMPLETION_TOOL_CALL_TYPE.format(i=0, j=j)] = "function" + + if isinstance(tool_call["input"], dict): + tool_input = json.dumps(tool_call["input"]) + else: + tool_input = str(tool_call["input"]) + + attributes[MessageAttributes.COMPLETION_TOOL_CALL_ARGUMENTS.format(i=0, j=j)] = tool_input + + except Exception as e: + logger.debug(f"[agentops.instrumentation.anthropic] Error extracting content: {e}") + + return attributes + + +def get_completion_response_attributes(response: "Completion") -> AttributeMap: + """Extract attributes from a Completion response (legacy API). + + This function processes the response from the legacy Completions API call + and extracts standardized attributes for telemetry. The structure differs + from the modern Messages API, so this handles the specific format of the + older API responses. + + Args: + response: The Completion response object from Anthropic + + Returns: + Dictionary of extracted attributes + """ + attributes = {} + + # Extract completion ID + if hasattr(response, "id"): + completion_id = response.id + attributes[SpanAttributes.LLM_RESPONSE_ID] = completion_id + attributes[MessageAttributes.COMPLETION_ID.format(i=0)] = completion_id + + # Extract model + if hasattr(response, "model"): + model = response.model + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = model + + # Extract completion + if hasattr(response, "completion"): + completion_text = response.completion + # Add structured completion attributes + attributes[MessageAttributes.COMPLETION_TYPE.format(i=0)] = "text" + attributes[MessageAttributes.COMPLETION_ROLE.format(i=0)] = "assistant" + attributes[MessageAttributes.COMPLETION_CONTENT.format(i=0)] = completion_text + + # For backward compatibility + attributes[SpanAttributes.LLM_COMPLETIONS] = json.dumps([{"type": "text", "text": completion_text}]) + attributes[SpanAttributes.LLM_CONTENT_COMPLETION_CHUNK] = completion_text + + # Extract stop reason if available + if hasattr(response, "stop_reason"): + stop_reason = response.stop_reason + attributes[SpanAttributes.LLM_RESPONSE_STOP_REASON] = stop_reason + attributes[SpanAttributes.LLM_RESPONSE_FINISH_REASON] = stop_reason + attributes[MessageAttributes.COMPLETION_FINISH_REASON.format(i=0)] = stop_reason + + # Extract usage information (newer versions have this) + if hasattr(response, "usage"): + usage = response.usage + if hasattr(usage, "input_tokens"): + input_tokens = usage.input_tokens + attributes[SpanAttributes.LLM_USAGE_PROMPT_TOKENS] = input_tokens + + if hasattr(usage, "output_tokens"): + output_tokens = usage.output_tokens + attributes[SpanAttributes.LLM_USAGE_COMPLETION_TOKENS] = output_tokens + + # Calculate total tokens if we have both input and output + if hasattr(usage, "input_tokens") and hasattr(usage, "output_tokens"): + total_tokens = usage.input_tokens + usage.output_tokens + attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] = total_tokens + + return attributes + + +def get_stream_attributes(stream: Any) -> AttributeMap: + """Extract attributes from a streaming response. + + This function captures available metadata from a streaming response object + before the full content is available. This is typically limited to identifying + information rather than content or token usage which becomes available only + after the stream completes. + + Args: + stream: The stream object from an Anthropic streaming request + + Returns: + Dictionary of available stream metadata attributes + """ + attributes = {} + + attributes[SpanAttributes.LLM_REQUEST_STREAMING] = True + + if hasattr(stream, "model"): + model = stream.model + attributes[SpanAttributes.LLM_REQUEST_MODEL] = model + + return attributes + + +def get_stream_event_attributes(event: Any) -> AttributeMap: + """Extract attributes from a streaming event. + + This function processes individual streaming events from the Anthropic API + and extracts available metadata. Different event types contain different + information, so the function handles various event classes appropriately. + + Args: + event: A streaming event object from Anthropic + + Returns: + Dictionary of available event attributes + """ + attributes = {} + + # Extract only necessary information from events + event_type = event.__class__.__name__ + + if event_type == "MessageStartEvent": + if hasattr(event, "message"): + if hasattr(event.message, "id"): + message_id = event.message.id + attributes[SpanAttributes.LLM_RESPONSE_ID] = message_id + attributes[MessageAttributes.COMPLETION_ID.format(i=0)] = message_id + + if hasattr(event.message, "model"): + model = event.message.model + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = model + + elif event_type == "MessageStopEvent": + if hasattr(event, "message"): + # Extract stop reason + if hasattr(event.message, "stop_reason"): + stop_reason = event.message.stop_reason + attributes[SpanAttributes.LLM_RESPONSE_STOP_REASON] = stop_reason + attributes[SpanAttributes.LLM_RESPONSE_FINISH_REASON] = stop_reason + attributes[MessageAttributes.COMPLETION_FINISH_REASON.format(i=0)] = stop_reason + + return attributes \ No newline at end of file diff --git a/agentops/instrumentation/anthropic/attributes/tools.py b/agentops/instrumentation/anthropic/attributes/tools.py new file mode 100644 index 000000000..3be951097 --- /dev/null +++ b/agentops/instrumentation/anthropic/attributes/tools.py @@ -0,0 +1,233 @@ +"""Tool-related attribute extraction for Anthropic API.""" + +import json +from typing import Dict, Any, List, Optional + +from agentops.logging import logger +from agentops.semconv import SpanAttributes, MessageAttributes, ToolAttributes, ToolStatus +from agentops.instrumentation.common.attributes import AttributeMap + +def extract_tool_definitions(tools: List[Dict[str, Any]]) -> AttributeMap: + """Extract attributes from tool definitions. + + Processes a list of Anthropic tool definitions and converts them into + standardized attributes for OpenTelemetry instrumentation. This captures + information about each tool's name, description, and input schema. + + Args: + tools: List of tool definition objects + + Returns: + Dictionary of tool-related attributes + """ + attributes = {} + + try: + if not tools: + return attributes + + for i, tool in enumerate(tools): + name = tool.get("name", "unknown") + description = tool.get("description", "") + + attributes[MessageAttributes.TOOL_CALL_NAME.format(i=i)] = name + attributes[MessageAttributes.TOOL_CALL_TYPE.format(i=i)] = "function" + + if description: + attributes[MessageAttributes.TOOL_CALL_DESCRIPTION.format(i=i)] = description + + if "input_schema" in tool: + attributes[MessageAttributes.TOOL_CALL_ARGUMENTS.format(i=i)] = json.dumps(tool["input_schema"]) + + tool_id = tool.get("id", f"tool-{i}") + attributes[MessageAttributes.TOOL_CALL_ID.format(i=i)] = tool_id + attributes[MessageAttributes.TOOL_CALL_NAME.format(i=i)] = name + if description: + attributes[MessageAttributes.TOOL_CALL_DESCRIPTION.format(i=i)] = description + + tool_names = [tool.get("name", "unknown") for tool in tools] + attributes[SpanAttributes.LLM_REQUEST_FUNCTIONS] = json.dumps(tool_names) + + tool_schemas = [] + for tool in tools: + schema = { + "name": tool.get("name", "unknown"), + "schema": {} + } + + if "description" in tool: + schema["schema"]["description"] = tool["description"] + if "input_schema" in tool: + schema["schema"]["input_schema"] = tool["input_schema"] + + tool_schemas.append(schema) + + attributes["anthropic.tools.schemas"] = json.dumps(tool_schemas) + + except Exception as e: + logger.debug(f"[agentops.instrumentation.anthropic] Error extracting tool definitions: {e}") + + return attributes + + +def extract_tool_use_blocks(content_blocks: List[Any]) -> Optional[List[Dict[str, Any]]]: + """Extract tool use blocks from message content. + + Analyzes message content blocks to find and extract tool use information. + This is used to track which tools the model called and with what parameters. + + Args: + content_blocks: List of content blocks from a Message + + Returns: + List of tool use information or None if no tools used + """ + if not content_blocks: + return None + + try: + tool_uses = [] + + for block in content_blocks: + if hasattr(block, "type") and block.type == "tool_use": + tool_use = { + "name": block.name if hasattr(block, "name") else "unknown", + "id": block.id if hasattr(block, "id") else "unknown", + } + + if hasattr(block, "input"): + try: + if isinstance(block.input, dict): + tool_use["input"] = block.input + elif isinstance(block.input, str): + tool_use["input"] = json.loads(block.input) + else: + tool_use["input"] = {"raw": str(block.input)} + except Exception: + tool_use["input"] = {"raw": str(block.input)} + + tool_uses.append(tool_use) + + return tool_uses if tool_uses else None + + except Exception as e: + logger.debug(f"[agentops.instrumentation.anthropic] Error extracting tool use blocks: {e}") + return None + + +def extract_tool_results(content_blocks: List[Any]) -> Optional[List[Dict[str, Any]]]: + """Extract tool result blocks from message content. + + Analyzes message content blocks to find and extract tool result information. + This is used to track the outputs returned from tool executions. + + Args: + content_blocks: List of content blocks from a Message + + Returns: + List of tool result information or None if no tool results + """ + if not content_blocks: + return None + + try: + tool_results = [] + + for block in content_blocks: + if hasattr(block, "type") and block.type == "tool_result": + tool_result = { + "tool_use_id": block.tool_use_id if hasattr(block, "tool_use_id") else "unknown", + } + + if hasattr(block, "content"): + try: + if isinstance(block.content, dict): + tool_result["content"] = block.content + elif isinstance(block.content, str): + tool_result["content"] = json.loads(block.content) + else: + tool_result["content"] = {"raw": str(block.content)} + except Exception: + tool_result["content"] = {"raw": str(block.content)} + + tool_results.append(tool_result) + + return tool_results if tool_results else None + + except Exception as e: + logger.debug(f"[agentops.instrumentation.anthropic] Error extracting tool results: {e}") + return None + + +def get_tool_attributes(message_content: List[Any]) -> AttributeMap: + """Extract tool-related attributes from message content. + + Processes message content to extract comprehensive information about + tool usage, including both tool calls and tool results. This creates a + standardized set of attributes representing the tool interaction flow. + + Args: + message_content: List of content blocks from a Message + + Returns: + Dictionary of tool-related attributes + """ + attributes = {} + + try: + tool_uses = extract_tool_use_blocks(message_content) + if tool_uses: + for j, tool_use in enumerate(tool_uses): + tool_name = tool_use.get("name", "unknown") + tool_id = tool_use.get("id", f"tool-call-{j}") + + attributes[MessageAttributes.COMPLETION_TOOL_CALL_ID.format(i=0, j=j)] = tool_id + attributes[MessageAttributes.COMPLETION_TOOL_CALL_NAME.format(i=0, j=j)] = tool_name + attributes[MessageAttributes.COMPLETION_TOOL_CALL_TYPE.format(i=0, j=j)] = "function" + + tool_input = tool_use.get("input", {}) + if isinstance(tool_input, dict): + input_str = json.dumps(tool_input) + else: + input_str = str(tool_input) + attributes[MessageAttributes.COMPLETION_TOOL_CALL_ARGUMENTS.format(i=0, j=j)] = input_str + + attributes[MessageAttributes.TOOL_CALL_ID.format(i=j)] = tool_id + attributes[MessageAttributes.TOOL_CALL_NAME.format(i=j)] = tool_name + attributes[MessageAttributes.TOOL_CALL_ARGUMENTS.format(i=j)] = input_str + attributes[f"{ToolAttributes.TOOL_STATUS}.{j}"] = ToolStatus.EXECUTING.value + + attributes["anthropic.tool_calls.count"] = len(tool_uses) + + tool_results = extract_tool_results(message_content) + if tool_results: + attributes["anthropic.tool_results"] = json.dumps(tool_results) + attributes["anthropic.tool_results.count"] = len(tool_results) + + for j, tool_result in enumerate(tool_results): + tool_use_id = tool_result.get("tool_use_id", "unknown") + + tool_index = None + for k in range(attributes.get("anthropic.tool_calls.count", 0)): + if attributes.get(MessageAttributes.COMPLETION_TOOL_CALL_ID.format(i=0, j=k)) == tool_use_id: + tool_index = k + break + + if tool_index is not None: + attributes[MessageAttributes.COMPLETION_TOOL_CALL_STATUS.format(i=0, j=tool_index)] = "complete" + + content = tool_result.get("content", {}) + if isinstance(content, dict): + content_str = json.dumps(content) + else: + content_str = str(content) + + attributes[f"{ToolAttributes.TOOL_STATUS}.{tool_index}"] = ToolStatus.SUCCEEDED.value + attributes[f"{ToolAttributes.TOOL_RESULT}.{tool_index}"] = content_str + + attributes[f"anthropic.tool_result.{tool_index}.content"] = content_str + + except Exception as e: + logger.debug(f"[agentops.instrumentation.anthropic] Error extracting tool attributes: {e}") + + return attributes \ No newline at end of file diff --git a/agentops/instrumentation/anthropic/event_handler_wrapper.py b/agentops/instrumentation/anthropic/event_handler_wrapper.py new file mode 100644 index 000000000..9e8fe8620 --- /dev/null +++ b/agentops/instrumentation/anthropic/event_handler_wrapper.py @@ -0,0 +1,90 @@ +"""Event handler wrapper for Anthropic's streaming API. + +This module provides a wrapper for Anthropic's event handlers to +track events and metrics during streaming. +""" + +import logging +from typing import Any, Dict, Optional + +from opentelemetry.trace import Span +from agentops.semconv import CoreAttributes + +logger = logging.getLogger(__name__) + + +class EventHandleWrapper: + """Wrapper for Anthropic's EventHandler. + + This wrapper forwards all events to the original handler while also + capturing metrics and adding them to the provided span. + """ + + def __init__(self, original_handler: Optional[Any], span: Span): + """Initialize the wrapper with the original handler and a span. + + Args: + original_handler: The original Anthropic event handler (or None) + span: The OpenTelemetry span to record metrics to + """ + self._original_handler = original_handler + self._span = span + + def _forward_event(self, method_name: str, *args, **kwargs) -> None: + """Forward an event to the original handler if it exists. + + Args: + method_name: Name of the method to call on the original handler + *args: Positional arguments to pass to the method + **kwargs: Keyword arguments to pass to the method + """ + try: + if self._original_handler is not None and hasattr(self._original_handler, method_name): + method = getattr(self._original_handler, method_name) + method(*args, **kwargs) + except Exception as e: + logger.debug(f"Error in event handler {method_name}: {e}") + + def on_event(self, event: Dict[str, Any]) -> None: + """Handle any event by forwarding it to the original handler.""" + self._forward_event("on_event", event) + + def on_text_delta(self, delta: Dict[str, Any], snapshot: Dict[str, Any]) -> None: + """Handle a text delta event.""" + self._forward_event("on_text_delta", delta, snapshot) + + def on_content_block_start(self, content_block_start: Dict[str, Any]) -> None: + """Handle a content block start event.""" + self._forward_event("on_content_block_start", content_block_start) + + def on_content_block_delta(self, delta: Dict[str, Any], snapshot: Dict[str, Any]) -> None: + """Handle a content block delta event.""" + self._forward_event("on_content_block_delta", delta, snapshot) + + def on_content_block_stop(self, content_block_stop: Dict[str, Any]) -> None: + """Handle a content block stop event.""" + self._forward_event("on_content_block_stop", content_block_stop) + + def on_message_start(self, message_start: Dict[str, Any]) -> None: + """Handle a message start event.""" + self._forward_event("on_message_start", message_start) + + def on_message_delta(self, delta: Dict[str, Any], snapshot: Dict[str, Any]) -> None: + """Handle a message delta event.""" + self._forward_event("on_message_delta", delta, snapshot) + + def on_message_stop(self, message_stop: Dict[str, Any]) -> None: + """Handle a message stop event.""" + self._forward_event("on_message_stop", message_stop) + + def on_error(self, error: Exception) -> None: + """Handle an error event.""" + try: + self._span.record_exception(error) + self._span.set_attribute(CoreAttributes.ERROR_MESSAGE, str(error)) + self._span.set_attribute(CoreAttributes.ERROR_TYPE, error.__class__.__name__) + + if self._original_handler is not None and hasattr(self._original_handler, "on_error"): + self._original_handler.on_error(error) + except Exception as e: + logger.debug(f"Error in event handler on_error: {e}") \ No newline at end of file diff --git a/agentops/instrumentation/anthropic/instrumentor.py b/agentops/instrumentation/anthropic/instrumentor.py new file mode 100644 index 000000000..da09e3464 --- /dev/null +++ b/agentops/instrumentation/anthropic/instrumentor.py @@ -0,0 +1,194 @@ +"""Anthropic API Instrumentation for AgentOps + +This module provides instrumentation for the Anthropic API, implementing OpenTelemetry +instrumentation for Claude model requests and responses. + +We focus on instrumenting the following key endpoints: +- Client.messages.create - The main completion endpoint +- Client.messages.stream - Streaming API for messages +- Client.completions.create - The legacy completion endpoint +- Streaming responses - Special handling for streaming responses +- Tool-using completions - Capturing tool usage information + +The instrumentation captures: +1. Request parameters (model, max_tokens, temperature, etc.) +2. Response data (completion content, token usage, etc.) +3. Timing information (latency, time to first token, etc.) +4. Tool usage information (tool calls, tool outputs) + +1. Standard Method Wrapping: + - Uses the common wrappers module to wrap methods with tracers + - Applies to both sync and async methods + - Captures request/response attributes via attribute extractors + +2. Streaming Approach: + - Special handling for streaming responses + - Uses direct wrapt.wrap_function_wrapper for stream methods + - Captures events as they arrive rather than waiting for completion + - Maintains span context across multiple events +""" +from typing import List, Optional, Collection +from opentelemetry.trace import get_tracer +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.metrics import get_meter +from wrapt import wrap_function_wrapper + +from agentops.logging import logger +from agentops.instrumentation.common.wrappers import WrapConfig, wrap, unwrap +from agentops.instrumentation.anthropic import LIBRARY_NAME, LIBRARY_VERSION +from agentops.instrumentation.anthropic.attributes.common import get_common_instrumentation_attributes +from agentops.instrumentation.anthropic.attributes.message import ( + get_message_attributes, + get_completion_attributes +) +from agentops.instrumentation.anthropic.stream_wrapper import ( + messages_stream_wrapper, + messages_stream_async_wrapper, +) +from agentops.semconv import Meters + +# Methods to wrap for instrumentation +WRAPPED_METHODS: List[WrapConfig] = [ + # Main messages.create (modern API) + WrapConfig( + trace_name="anthropic.messages.create", + package="anthropic.resources.messages", + class_name="Messages", + method_name="create", + handler=get_message_attributes, + ), + # Async variant + WrapConfig( + trace_name="anthropic.messages.create", + package="anthropic.resources.messages", + class_name="AsyncMessages", + method_name="create", + handler=get_message_attributes, + is_async=True, + ), + # Legacy completions API + WrapConfig( + trace_name="anthropic.completions.create", + package="anthropic.resources.completions", + class_name="Completions", + method_name="create", + handler=get_completion_attributes, + ), + # Async variant of legacy API + WrapConfig( + trace_name="anthropic.completions.create", + package="anthropic.resources.completions", + class_name="AsyncCompletions", + method_name="create", + handler=get_completion_attributes, + is_async=True, + ), +] + + +class AnthropicInstrumentor(BaseInstrumentor): + """An instrumentor for Anthropic's Claude API. + + This class provides instrumentation for Anthropic's Claude API by wrapping key methods + in the client library and capturing telemetry data. It supports both synchronous and + asynchronous API calls, including streaming responses. + + The instrumentor wraps the following methods: + - messages.create: For the modern Messages API + - completions.create: For the legacy Completions API + - messages.stream: For streaming responses + + It captures metrics including token usage, operation duration, and exceptions. + """ + + def instrumentation_dependencies(self) -> Collection[str]: + """Return packages required for instrumentation. + + Returns: + A collection of package specifications required for this instrumentation. + """ + return ["anthropic >= 0.7.0"] + + def _instrument(self, **kwargs): + """Instrument the Anthropic API. + + This method wraps the key methods in the Anthropic client to capture + telemetry data for API calls. It sets up tracers, meters, and wraps the appropriate + methods for instrumentation. + + Args: + **kwargs: Configuration options for instrumentation. + """ + tracer_provider = kwargs.get("tracer_provider") + tracer = get_tracer(LIBRARY_NAME, LIBRARY_VERSION, tracer_provider) + + meter_provider = kwargs.get("meter_provider") + meter = get_meter(LIBRARY_NAME, LIBRARY_VERSION, meter_provider) + + tokens_histogram = meter.create_histogram( + name=Meters.LLM_TOKEN_USAGE, + unit="token", + description="Measures number of input and output tokens used with Anthropic models", + ) + + duration_histogram = meter.create_histogram( + name=Meters.LLM_OPERATION_DURATION, + unit="s", + description="Anthropic API operation duration", + ) + + exception_counter = meter.create_counter( + name=Meters.LLM_COMPLETIONS_EXCEPTIONS, + unit="time", + description="Number of exceptions occurred during Anthropic completions", + ) + + # Standard method wrapping approach + # Uses the common wrappers module to wrap methods with tracers + for wrap_config in WRAPPED_METHODS: + try: + wrap(wrap_config, tracer) + except (AttributeError, ModuleNotFoundError): + logger.debug(f"Could not wrap {wrap_config.package}.{wrap_config.class_name}.{wrap_config.method_name}") + + # Special handling for streaming responses + # Uses direct wrapt.wrap_function_wrapper for stream methods + # This approach captures events as they arrive rather than waiting for completion + try: + wrap_function_wrapper( + "anthropic.resources.messages.messages", + "Messages.stream", + messages_stream_wrapper(tracer), + ) + + wrap_function_wrapper( + "anthropic.resources.messages.messages", + "AsyncMessages.stream", + messages_stream_async_wrapper(tracer), + ) + except (AttributeError, ModuleNotFoundError): + logger.debug("Failed to wrap Anthropic streaming methods") + + def _uninstrument(self, **kwargs): + """Remove instrumentation from Anthropic API. + + This method unwraps all methods that were wrapped during instrumentation, + restoring the original behavior of the Anthropic API. + + Args: + **kwargs: Configuration options for uninstrumentation. + """ + # Unwrap standard methods + for wrap_config in WRAPPED_METHODS: + try: + unwrap(wrap_config) + except Exception: + logger.debug(f"Failed to unwrap {wrap_config.package}.{wrap_config.class_name}.{wrap_config.method_name}") + + # Unwrap streaming methods + try: + from opentelemetry.instrumentation.utils import unwrap as otel_unwrap + otel_unwrap("anthropic.resources.messages.messages", "Messages.stream") + otel_unwrap("anthropic.resources.messages.messages", "AsyncMessages.stream") + except (AttributeError, ModuleNotFoundError): + logger.debug("Failed to unwrap Anthropic streaming methods") \ No newline at end of file diff --git a/agentops/instrumentation/anthropic/stream_wrapper.py b/agentops/instrumentation/anthropic/stream_wrapper.py new file mode 100644 index 000000000..244830386 --- /dev/null +++ b/agentops/instrumentation/anthropic/stream_wrapper.py @@ -0,0 +1,432 @@ +"""Anthropic stream wrapper implementation. + +This module provides wrappers for Anthropic's streaming functionality, +focusing on the MessageStreamManager for both sync and async operations. +It instruments streams to collect telemetry data for monitoring and analysis. +""" + +import logging +from typing import TypeVar, Any, Awaitable + +from opentelemetry import context as context_api +from opentelemetry.trace import SpanKind +from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY + +from agentops.semconv import SpanAttributes, LLMRequestTypeValues, CoreAttributes, MessageAttributes +from agentops.instrumentation.common.wrappers import _with_tracer_wrapper +from agentops.instrumentation.anthropic.attributes.message import ( + get_message_request_attributes, + get_stream_attributes, +) +from agentops.instrumentation.anthropic.event_handler_wrapper import EventHandleWrapper + +logger = logging.getLogger(__name__) + +T = TypeVar('T') + + +@_with_tracer_wrapper +def messages_stream_wrapper(tracer, wrapped, instance, args, kwargs): + """Wrapper for the Messages.stream method. + + This wrapper creates spans for tracking stream performance and injects + an event handler wrapper to capture streaming events. + + Args: + tracer: The OpenTelemetry tracer to use + wrapped: The original stream method + instance: The instance the method is bound to + args: Positional arguments to the method + kwargs: Keyword arguments to the method + + Returns: + A wrapped stream manager that captures telemetry data + """ + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return wrapped(*args, **kwargs) + + span = tracer.start_span( + "anthropic.messages.stream", + kind=SpanKind.CLIENT, + attributes={SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.CHAT.value}, + ) + + request_attributes = get_message_request_attributes(kwargs) + for key, value in request_attributes.items(): + span.set_attribute(key, value) + + span.set_attribute(SpanAttributes.LLM_REQUEST_STREAMING, True) + + original_event_handler = kwargs.get("event_handler") + + if original_event_handler is not None: + wrapped_handler = EventHandleWrapper( + original_handler=original_event_handler, + span=span + ) + kwargs["event_handler"] = wrapped_handler + + try: + class TracedStreamManager: + """A wrapper for Anthropic's MessageStreamManager that adds telemetry. + + This class wraps the original stream manager to capture metrics about + the streaming process, including token counts, content, and errors. + """ + + def __init__(self, original_manager): + """Initialize with the original manager. + + Args: + original_manager: The Anthropic MessageStreamManager to wrap + """ + self.original_manager = original_manager + self.stream = None + + def __enter__(self): + """Context manager entry that initializes stream monitoring. + + Returns: + The original stream with instrumentation added + """ + self.stream = self.original_manager.__enter__() + + try: + stream_attributes = get_stream_attributes(self.stream) + for key, value in stream_attributes.items(): + span.set_attribute(key, value) + except Exception as e: + logger.debug(f"Error getting stream attributes: {e}") + + # Set the event handler on the stream if provided + if original_event_handler is not None: + self.stream.event_handler = kwargs["event_handler"] + else: + try: + original_text_stream = self.stream.text_stream + token_count = 0 + + class InstrumentedTextStream: + """A wrapper for Anthropic's text stream that counts tokens.""" + + def __iter__(self): + """Iterate through text chunks, counting tokens. + + Yields: + Text chunks from the original stream + """ + nonlocal token_count + for text in original_text_stream: + token_count += len(text.split()) + span.set_attribute(SpanAttributes.LLM_USAGE_STREAMING_TOKENS, token_count) + yield text + + self.stream.text_stream = InstrumentedTextStream() + except Exception as e: + logger.debug(f"Error patching text_stream: {e}") + + return self.stream + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit that records final metrics. + + Args: + exc_type: Exception type, if an exception occurred + exc_val: Exception value, if an exception occurred + exc_tb: Exception traceback, if an exception occurred + + Returns: + Result of the original context manager's __exit__ + """ + try: + if exc_type is not None: + span.record_exception(exc_val) + span.set_attribute(CoreAttributes.ERROR_MESSAGE, str(exc_val)) + span.set_attribute(CoreAttributes.ERROR_TYPE, exc_type.__name__) + + try: + final_message = None + + if hasattr(self.original_manager, "_MessageStreamManager__stream") and \ + hasattr(self.original_manager._MessageStreamManager__stream, "_MessageStream__final_message_snapshot"): + final_message = self.original_manager._MessageStreamManager__stream._MessageStream__final_message_snapshot + + if final_message: + if hasattr(final_message, "content"): + content_text = "" + if isinstance(final_message.content, list): + for content_block in final_message.content: + if hasattr(content_block, "text"): + content_text += content_block.text + + if content_text: + span.set_attribute(MessageAttributes.COMPLETION_TYPE.format(i=0), "text") + span.set_attribute(MessageAttributes.COMPLETION_ROLE.format(i=0), "assistant") + span.set_attribute(MessageAttributes.COMPLETION_CONTENT.format(i=0), content_text) + + if hasattr(final_message, "usage"): + usage = final_message.usage + if hasattr(usage, "input_tokens"): + span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, usage.input_tokens) + + if hasattr(usage, "output_tokens"): + span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, usage.output_tokens) + + if hasattr(usage, "input_tokens") and hasattr(usage, "output_tokens"): + total_tokens = usage.input_tokens + usage.output_tokens + span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, total_tokens) + except Exception as e: + logger.debug(f"Failed to extract final message data: {e}") + finally: + if span.is_recording(): + span.end() + return self.original_manager.__exit__(exc_type, exc_val, exc_tb) + + stream_manager = wrapped(*args, **kwargs) + + return TracedStreamManager(stream_manager) + + except Exception as e: + span.record_exception(e) + span.set_attribute(CoreAttributes.ERROR_MESSAGE, str(e)) + span.set_attribute(CoreAttributes.ERROR_TYPE, e.__class__.__name__) + span.end() + raise + + +class AsyncStreamContextManagerWrapper: + """A wrapper that implements both async context manager and awaitable protocols. + + This wrapper allows the instrumented async stream to be used either with + 'async with' or by awaiting it first, preserving compatibility with + different usage patterns. + """ + + def __init__(self, coro): + """Initialize with a coroutine. + + Args: + coro: The coroutine that will return a stream manager + """ + self._coro = coro + self._stream_manager = None + + def __await__(self): + """Make this wrapper awaitable. + + This allows users to do: + stream_manager = await client.messages.stream(...) + + Returns: + An awaitable that yields the traced stream manager + """ + async def get_stream_manager(): + self._stream_manager = await self._coro + return self._stream_manager + + return get_stream_manager().__await__() + + async def __aenter__(self): + """Async context manager enter. + + This allows users to do: + async with client.messages.stream(...) as stream: + + Returns: + The result of the stream manager's __aenter__ + """ + if self._stream_manager is None: + self._stream_manager = await self._coro + + return await self._stream_manager.__aenter__() + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit. + + Args: + exc_type: Exception type + exc_val: Exception value + exc_tb: Exception traceback + + Returns: + The result of the stream manager's __aexit__ + """ + if self._stream_manager is not None: + return await self._stream_manager.__aexit__(exc_type, exc_val, exc_tb) + return False + + +@_with_tracer_wrapper +def messages_stream_async_wrapper(tracer, wrapped, instance, args, kwargs): + """Wrapper for the async Messages.stream method. + + This wrapper creates spans for tracking stream performance and injects + an event handler wrapper to capture streaming events in async contexts. + + Args: + tracer: The OpenTelemetry tracer to use + wrapped: The original async stream method + instance: The instance the method is bound to + args: Positional arguments to the method + kwargs: Keyword arguments to the method + + Returns: + An object that can be used with async with or awaited + """ + if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return wrapped(*args, **kwargs) + + span = tracer.start_span( + "anthropic.messages.stream", + kind=SpanKind.CLIENT, + attributes={SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.CHAT.value}, + ) + + request_attributes = get_message_request_attributes(kwargs) + for key, value in request_attributes.items(): + span.set_attribute(key, value) + + span.set_attribute(SpanAttributes.LLM_REQUEST_STREAMING, True) + + original_event_handler = kwargs.get("event_handler") + + if original_event_handler is not None: + wrapped_handler = EventHandleWrapper( + original_handler=original_event_handler, + span=span + ) + kwargs["event_handler"] = wrapped_handler + + async def _wrapped_stream(): + """Async wrapper function for the stream method. + + Returns: + A traced async stream manager + """ + try: + # Don't await wrapped(*args, **kwargs) - it returns an async context manager, not a coroutine + stream_manager = wrapped(*args, **kwargs) + + class TracedAsyncStreamManager: + """A wrapper for Anthropic's AsyncMessageStreamManager that adds telemetry. + + This class wraps the original async stream manager to capture metrics + about the streaming process, including token counts, content, and errors. + """ + + def __init__(self, original_manager): + """Initialize with the original manager. + + Args: + original_manager: The Anthropic AsyncMessageStreamManager to wrap + """ + self.original_manager = original_manager + self.stream = None + + async def __aenter__(self): + """Async context manager entry that initializes stream monitoring. + + Returns: + The original stream with instrumentation added + """ + self.stream = await self.original_manager.__aenter__() + + try: + stream_attributes = get_stream_attributes(self.stream) + for key, value in stream_attributes.items(): + span.set_attribute(key, value) + except Exception as e: + logger.debug(f"Error getting async stream attributes: {e}") + + if original_event_handler is None: + try: + original_text_stream = self.stream.text_stream + token_count = 0 + + class InstrumentedAsyncTextStream: + """A wrapper for Anthropic's async text stream that counts tokens.""" + + async def __aiter__(self): + """Async iterate through text chunks, counting tokens. + + Yields: + Text chunks from the original async stream + """ + nonlocal token_count + async for text in original_text_stream: + token_count += len(text.split()) + span.set_attribute(SpanAttributes.LLM_USAGE_STREAMING_TOKENS, token_count) + yield text + + self.stream.text_stream = InstrumentedAsyncTextStream() + except Exception as e: + logger.debug(f"Error patching async text_stream: {e}") + + return self.stream + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit that records final metrics. + + Args: + exc_type: Exception type, if an exception occurred + exc_val: Exception value, if an exception occurred + exc_tb: Exception traceback, if an exception occurred + + Returns: + Result of the original async context manager's __aexit__ + """ + try: + if exc_type is not None: + span.record_exception(exc_val) + span.set_attribute(CoreAttributes.ERROR_MESSAGE, str(exc_val)) + span.set_attribute(CoreAttributes.ERROR_TYPE, exc_type.__name__) + + try: + final_message = None + + if hasattr(self.original_manager, "_AsyncMessageStreamManager__stream") and \ + hasattr(self.original_manager._AsyncMessageStreamManager__stream, "_AsyncMessageStream__final_message_snapshot"): + final_message = self.original_manager._AsyncMessageStreamManager__stream._AsyncMessageStream__final_message_snapshot + + if final_message: + if hasattr(final_message, "content"): + content_text = "" + if isinstance(final_message.content, list): + for content_block in final_message.content: + if hasattr(content_block, "text"): + content_text += content_block.text + + if content_text: + span.set_attribute(MessageAttributes.COMPLETION_TYPE.format(i=0), "text") + span.set_attribute(MessageAttributes.COMPLETION_ROLE.format(i=0), "assistant") + span.set_attribute(MessageAttributes.COMPLETION_CONTENT.format(i=0), content_text) + + if hasattr(final_message, "usage"): + usage = final_message.usage + if hasattr(usage, "input_tokens"): + span.set_attribute(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, usage.input_tokens) + + if hasattr(usage, "output_tokens"): + span.set_attribute(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, usage.output_tokens) + + if hasattr(usage, "input_tokens") and hasattr(usage, "output_tokens"): + total_tokens = usage.input_tokens + usage.output_tokens + span.set_attribute(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, total_tokens) + except Exception as e: + logger.debug(f"Failed to extract final async message data: {e}") + finally: + if span.is_recording(): + span.end() + return await self.original_manager.__aexit__(exc_type, exc_val, exc_tb) + + return TracedAsyncStreamManager(stream_manager) + + except Exception as e: + span.record_exception(e) + span.set_attribute(CoreAttributes.ERROR_MESSAGE, str(e)) + span.set_attribute(CoreAttributes.ERROR_TYPE, e.__class__.__name__) + span.end() + raise + + # Return a wrapper that implements both async context manager and awaitable protocols + return AsyncStreamContextManagerWrapper(_wrapped_stream()) \ No newline at end of file diff --git a/agentops/instrumentation/common/__init__.py b/agentops/instrumentation/common/__init__.py index 1bcf25e1a..b2ca44b6d 100644 --- a/agentops/instrumentation/common/__init__.py +++ b/agentops/instrumentation/common/__init__.py @@ -1,6 +1,8 @@ from .attributes import AttributeMap, _extract_attributes_from_mapping +from .wrappers import _with_tracer_wrapper __all__ = [ "AttributeMap", "_extract_attributes_from_mapping", + "_with_tracer_wrapper" ] \ No newline at end of file diff --git a/agentops/instrumentation/common/wrappers.py b/agentops/instrumentation/common/wrappers.py index 8ef93e191..469aaaea9 100644 --- a/agentops/instrumentation/common/wrappers.py +++ b/agentops/instrumentation/common/wrappers.py @@ -7,6 +7,7 @@ """ from typing import Any, Optional, Tuple, Dict, Callable from dataclasses import dataclass +import logging from wrapt import wrap_function_wrapper # type: ignore from opentelemetry.instrumentation.utils import unwrap as _unwrap from opentelemetry.trace import Tracer @@ -16,6 +17,7 @@ from agentops.instrumentation.common.attributes import AttributeMap +logger = logging.getLogger(__name__) AttributeHandler = Callable[[Optional[Tuple], Optional[Dict], Optional[Any]], AttributeMap] @@ -207,3 +209,25 @@ def unwrap(wrap_config: WrapConfig): f"{wrap_config.package}.{wrap_config.class_name}", wrap_config.method_name, ) + + +def _with_tracer_wrapper(func): + """Wrap a function with a tracer. + + This decorator creates a higher-order function that takes a tracer as its first argument + and returns a function suitable for use with wrapt's wrap_function_wrapper. It's used + to consistently apply OpenTelemetry tracing to SDK functions. + + Args: + func: The instrumentation function to wrap + + Returns: + A decorator function that takes a tracer and returns a wrapt-compatible wrapper + """ + def _with_tracer(tracer): + def wrapper(wrapped, instance, args, kwargs): + return func(tracer, wrapped, instance, args, kwargs) + + return wrapper + + return _with_tracer \ No newline at end of file diff --git a/examples/anthropic_examples/agentops-anthropic-understanding-tools.ipynb b/examples/anthropic_examples/agentops-anthropic-understanding-tools.ipynb index 963da2bf7..00f2bf144 100644 --- a/examples/anthropic_examples/agentops-anthropic-understanding-tools.ipynb +++ b/examples/anthropic_examples/agentops-anthropic-understanding-tools.ipynb @@ -35,7 +35,7 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -57,7 +57,7 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -85,7 +85,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -101,7 +101,7 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -157,7 +157,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -212,7 +212,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -313,7 +313,7 @@ }, { "cell_type": "code", - "execution_count": 12, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -384,7 +384,7 @@ }, { "cell_type": "code", - "execution_count": 15, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -441,7 +441,7 @@ }, { "cell_type": "code", - "execution_count": 16, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -462,7 +462,7 @@ }, { "cell_type": "code", - "execution_count": 17, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -502,7 +502,7 @@ }, { "cell_type": "code", - "execution_count": 19, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -595,11 +595,11 @@ }, { "cell_type": "code", - "execution_count": 23, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ - "initial_messages.append({\"role\": \"assistant\", \"content\": f\"{str(response)}\"})" + "initial_messages.append({\"role\": \"assistant\", \"content\": f\"{str(response.content[0].text)}\"})" ] }, { @@ -659,7 +659,7 @@ }, { "cell_type": "code", - "execution_count": 25, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -728,7 +728,7 @@ "sourceType": "notebook" }, "kernelspec": { - "display_name": "ops", + "display_name": ".venv (3.11.11)", "language": "python", "name": "python3" }, @@ -742,7 +742,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.9" + "version": "3.11.11" } }, "nbformat": 4, diff --git a/examples/anthropic_examples/anthropic-example-sync.ipynb b/examples/anthropic_examples/anthropic-example-sync.ipynb index 53c0240f5..2b6ab48cd 100644 --- a/examples/anthropic_examples/anthropic-example-sync.ipynb +++ b/examples/anthropic_examples/anthropic-example-sync.ipynb @@ -132,7 +132,7 @@ ] }, { - "cell_type": "raw", + "cell_type": "markdown", "metadata": {}, "source": [ "Remember that story we made earlier? As of writing, claude-3-5-sonnet-20240620 (the version we will be using) has a 150k word, 680k character length. We also get an 8192 context length. This is great because we can actually set an example for the script! \n", @@ -321,7 +321,7 @@ "sourceType": "notebook" }, "kernelspec": { - "display_name": "Python 3", + "display_name": ".venv (3.11.11)", "language": "python", "name": "python3" }, @@ -335,7 +335,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.14" + "version": "3.11.11" } }, "nbformat": 4, diff --git a/tests/unit/instrumentation/anthropic/conftest.py b/tests/unit/instrumentation/anthropic/conftest.py new file mode 100644 index 000000000..e4576e317 --- /dev/null +++ b/tests/unit/instrumentation/anthropic/conftest.py @@ -0,0 +1,143 @@ +import json +import pytest +from pathlib import Path +from unittest.mock import MagicMock, patch +from opentelemetry.trace import Span, SpanContext +from opentelemetry.metrics import Meter + +# Load fixture data +FIXTURES_DIR = Path(__file__).parent.parent / "fixtures" + +def load_fixture(filename): + """Load a JSON fixture file""" + with open(FIXTURES_DIR / filename) as f: + return json.load(f) + +@pytest.fixture +def mock_tracer(): + """Create a mock OpenTelemetry tracer with configured span and context""" + tracer = MagicMock() + span = MagicMock(spec=Span) + span_context = MagicMock(spec=SpanContext) + span.get_span_context.return_value = span_context + tracer.start_span.return_value = span + return tracer + +@pytest.fixture +def mock_meter(): + """Create a mock OpenTelemetry meter with histogram and counter""" + meter = MagicMock(spec=Meter) + histogram = MagicMock() + counter = MagicMock() + meter.create_histogram.return_value = histogram + meter.create_counter.return_value = counter + return meter + +@pytest.fixture +def mock_anthropic_client(): + """Create a mock Anthropic client with configured message and stream responses""" + client = MagicMock() + message_response = load_fixture("anthropic_message.json") + client.messages.create.return_value = MagicMock(**message_response) + + stream_response = load_fixture("anthropic_stream.json") + stream_manager = MagicMock() + stream_manager.__enter__.return_value = MagicMock( + text_stream=iter(stream_response["messages"]), + _MessageStreamManager__stream=MagicMock( + _MessageStream__final_message_snapshot=MagicMock(**stream_response["final_message"]) + ) + ) + client.messages.stream.return_value = stream_manager + return client + +@pytest.fixture +def mock_event_handler(): + """Create a mock event handler with all required event handling methods""" + handler = MagicMock() + handler.on_event = MagicMock() + handler.on_text_delta = MagicMock() + handler.on_content_block_start = MagicMock() + handler.on_content_block_delta = MagicMock() + handler.on_content_block_stop = MagicMock() + handler.on_message_start = MagicMock() + handler.on_message_delta = MagicMock() + handler.on_message_stop = MagicMock() + handler.on_error = MagicMock() + return handler + +@pytest.fixture +def mock_stream_manager(): + """Create a mock stream manager that emits events during text streaming""" + manager = MagicMock() + stream = MagicMock() + + def text_stream_iter(): + chunks = ["1", "2", "3", "4", "5"] + for chunk in chunks: + if hasattr(stream, "event_handler") and stream.event_handler is not None: + stream.event_handler.on_text_delta({"text": chunk}, {"text": chunk}) + yield chunk + + stream.text_stream = text_stream_iter() + manager.__enter__.return_value = stream + return manager + +@pytest.fixture +def mock_async_stream_manager(): + """Create a mock async stream manager with async iteration support""" + manager = MagicMock() + stream = MagicMock() + stream.text_stream = MagicMock() + stream.text_stream.__aiter__.return_value = iter(["1", "2", "3", "4", "5"]) + manager.__aenter__.return_value = stream + return manager + +@pytest.fixture +def mock_stream_event(): + """Fixture for a mock streaming event.""" + class MockMessageStartEvent: + def __init__(self): + self.message = type('obj', (object,), { + 'id': 'msg_123', + 'model': 'claude-3-opus-20240229' + }) + self.__class__.__name__ = 'MessageStartEvent' + return MockMessageStartEvent() + +@pytest.fixture +def mock_message_stop_event(): + """Fixture for a mock message stop event.""" + class MockMessageStopEvent: + def __init__(self): + self.message = type('obj', (object,), { + 'stop_reason': 'stop_sequence' + }) + self.__class__.__name__ = 'MessageStopEvent' + return MockMessageStopEvent() + +@pytest.fixture +def mock_tool_definition(): + """Fixture for a mock tool definition.""" + return [{ + 'name': 'calculator', + 'description': 'A simple calculator', + 'input_schema': { + 'type': 'object', + 'properties': { + 'operation': {'type': 'string'}, + 'numbers': {'type': 'array'} + } + } + }] + +@pytest.fixture +def mock_tool_use_content(): + """Fixture for mock tool use content.""" + class MockToolUseBlock: + def __init__(self): + self.type = "tool_use" + self.name = "calculator" + self.id = "tool_123" + self.input = {"operation": "add", "numbers": [1, 2]} + return [MockToolUseBlock()] \ No newline at end of file diff --git a/tests/unit/instrumentation/anthropic/test_attributes.py b/tests/unit/instrumentation/anthropic/test_attributes.py new file mode 100644 index 000000000..3dfae5466 --- /dev/null +++ b/tests/unit/instrumentation/anthropic/test_attributes.py @@ -0,0 +1,180 @@ +"""Tests for Anthropic attribute extraction functionality.""" + +import pytest +from typing import Dict, Any + +from agentops.semconv import ( + InstrumentationAttributes, + SpanAttributes, + LLMRequestTypeValues, + MessageAttributes, + ToolAttributes, + ToolStatus, +) +from agentops.instrumentation.anthropic.attributes.common import ( + get_common_instrumentation_attributes, + extract_request_attributes, +) +from agentops.instrumentation.anthropic.attributes.message import ( + get_message_attributes, + get_message_request_attributes, + get_message_response_attributes, + get_stream_attributes, + get_stream_event_attributes, +) +from agentops.instrumentation.anthropic.attributes.tools import ( + extract_tool_definitions, + extract_tool_use_blocks, + get_tool_attributes, +) + + +# Common Attributes Tests +def test_get_common_instrumentation_attributes(): + """Test extraction of common instrumentation attributes.""" + attributes = get_common_instrumentation_attributes() + assert attributes[InstrumentationAttributes.LIBRARY_NAME] == "anthropic" + assert attributes[InstrumentationAttributes.LIBRARY_VERSION] == "0.49.0" + + +def test_extract_request_attributes(): + """Test extraction of request attributes from kwargs.""" + kwargs = { + 'model': 'claude-3-opus-20240229', + 'max_tokens': 100, + 'temperature': 0.7, + 'top_p': 0.9, + 'stream': True + } + attributes = extract_request_attributes(kwargs) + assert attributes[SpanAttributes.LLM_REQUEST_MODEL] == 'claude-3-opus-20240229' + assert attributes[SpanAttributes.LLM_REQUEST_MAX_TOKENS] == 100 + assert attributes[SpanAttributes.LLM_REQUEST_TEMPERATURE] == 0.7 + assert attributes[SpanAttributes.LLM_REQUEST_TOP_P] == 0.9 + assert attributes[SpanAttributes.LLM_REQUEST_STREAMING] is True + + +def test_extract_request_attributes_partial(): + """Test extraction of request attributes with partial kwargs.""" + kwargs = { + 'model': 'claude-3-opus-20240229', + 'temperature': 0.7 + } + attributes = extract_request_attributes(kwargs) + assert attributes[SpanAttributes.LLM_REQUEST_MODEL] == 'claude-3-opus-20240229' + assert attributes[SpanAttributes.LLM_REQUEST_TEMPERATURE] == 0.7 + assert SpanAttributes.LLM_REQUEST_MAX_TOKENS not in attributes + assert SpanAttributes.LLM_REQUEST_TOP_P not in attributes + assert SpanAttributes.LLM_REQUEST_STREAMING not in attributes + + +def test_get_message_request_attributes(): + """Test extraction of message request attributes.""" + kwargs = { + 'model': 'claude-3-opus-20240229', + 'messages': [ + {'role': 'system', 'content': 'You are a helpful assistant'}, + {'role': 'user', 'content': 'Hello'} + ], + 'max_tokens': 100 + } + attributes = get_message_request_attributes(kwargs) + assert attributes[SpanAttributes.LLM_REQUEST_MODEL] == 'claude-3-opus-20240229' + assert attributes[SpanAttributes.LLM_REQUEST_MAX_TOKENS] == 100 + assert MessageAttributes.PROMPT_ROLE.format(i=0) in attributes + assert MessageAttributes.PROMPT_CONTENT.format(i=0) in attributes + assert MessageAttributes.PROMPT_ROLE.format(i=1) in attributes + assert MessageAttributes.PROMPT_CONTENT.format(i=1) in attributes + + +# Stream Attributes Tests +def test_get_stream_attributes(): + """Test extraction of stream attributes.""" + class MockStream: + def __init__(self): + self.model = 'claude-3-opus-20240229' + stream = MockStream() + attributes = get_stream_attributes(stream) + assert attributes[SpanAttributes.LLM_REQUEST_STREAMING] is True + assert attributes[SpanAttributes.LLM_REQUEST_MODEL] == 'claude-3-opus-20240229' + + +def test_get_stream_event_attributes_start(mock_stream_event): + """Test extraction of stream start event attributes.""" + attributes = get_stream_event_attributes(mock_stream_event) + assert attributes[SpanAttributes.LLM_RESPONSE_ID] == 'msg_123' + assert attributes[SpanAttributes.LLM_RESPONSE_MODEL] == 'claude-3-opus-20240229' + assert attributes[MessageAttributes.COMPLETION_ID.format(i=0)] == 'msg_123' + + +def test_get_stream_event_attributes_stop(mock_message_stop_event): + """Test extraction of stream stop event attributes.""" + attributes = get_stream_event_attributes(mock_message_stop_event) + assert attributes[SpanAttributes.LLM_RESPONSE_STOP_REASON] == 'stop_sequence' + assert attributes[SpanAttributes.LLM_RESPONSE_FINISH_REASON] == 'stop_sequence' + assert attributes[MessageAttributes.COMPLETION_FINISH_REASON.format(i=0)] == 'stop_sequence' + + +# Tool Attributes Tests +def test_extract_tool_definitions(mock_tool_definition): + """Test extraction of tool definitions.""" + attributes = extract_tool_definitions(mock_tool_definition) + assert attributes[MessageAttributes.TOOL_CALL_NAME.format(i=0)] == 'calculator' + assert attributes[MessageAttributes.TOOL_CALL_TYPE.format(i=0)] == 'function' + assert attributes[MessageAttributes.TOOL_CALL_DESCRIPTION.format(i=0)] == 'A simple calculator' + tool_args = attributes[MessageAttributes.TOOL_CALL_ARGUMENTS.format(i=0)] + assert isinstance(tool_args, str) + assert 'type' in tool_args + assert 'properties' in tool_args + assert SpanAttributes.LLM_REQUEST_FUNCTIONS in attributes + + +def test_extract_tool_use_blocks(mock_tool_use_content): + """Test extraction of tool use blocks.""" + tool_uses = extract_tool_use_blocks(mock_tool_use_content) + assert tool_uses is not None + assert len(tool_uses) == 1 + assert tool_uses[0]['name'] == 'calculator' + assert tool_uses[0]['id'] == 'tool_123' + assert tool_uses[0]['input'] == {'operation': 'add', 'numbers': [1, 2]} + + +def test_get_tool_attributes(mock_tool_use_content): + """Test extraction of tool attributes from content.""" + attributes = get_tool_attributes(mock_tool_use_content) + assert attributes[MessageAttributes.COMPLETION_TOOL_CALL_NAME.format(i=0, j=0)] == 'calculator' + assert attributes[MessageAttributes.COMPLETION_TOOL_CALL_ID.format(i=0, j=0)] == 'tool_123' + assert attributes[MessageAttributes.COMPLETION_TOOL_CALL_TYPE.format(i=0, j=0)] == 'function' + tool_args = attributes[MessageAttributes.COMPLETION_TOOL_CALL_ARGUMENTS.format(i=0, j=0)] + assert isinstance(tool_args, str) + assert 'operation' in tool_args + assert attributes[MessageAttributes.TOOL_CALL_ID.format(i=0)] == 'tool_123' + assert attributes[MessageAttributes.TOOL_CALL_NAME.format(i=0)] == 'calculator' + assert attributes[f"{ToolAttributes.TOOL_STATUS}.0"] == ToolStatus.EXECUTING.value + assert attributes["anthropic.tool_calls.count"] == 1 + + +def test_get_tool_attributes_empty(): + """Test extraction of tool attributes with empty content.""" + attributes = get_tool_attributes([]) + assert not attributes + + +def test_get_tool_attributes_mixed_content(): + """Test extraction of tool attributes with mixed content types.""" + class MockTextBlock: + def __init__(self): + self.type = "text" + self.text = "Hello world" + + class MockToolUseBlock: + def __init__(self): + self.type = "tool_use" + self.name = "calculator" + self.id = "tool_123" + self.input = {"operation": "add", "numbers": [1, 2]} + + content = [MockTextBlock(), MockToolUseBlock()] + attributes = get_tool_attributes(content) + assert MessageAttributes.COMPLETION_TOOL_CALL_NAME.format(i=0, j=0) in attributes + assert attributes[MessageAttributes.COMPLETION_TOOL_CALL_NAME.format(i=0, j=0)] == 'calculator' \ No newline at end of file diff --git a/tests/unit/instrumentation/anthropic/test_event_handler.py b/tests/unit/instrumentation/anthropic/test_event_handler.py new file mode 100644 index 000000000..2655e7ca3 --- /dev/null +++ b/tests/unit/instrumentation/anthropic/test_event_handler.py @@ -0,0 +1,89 @@ +import pytest +from unittest.mock import MagicMock +from opentelemetry.trace import Span + +from agentops.instrumentation.anthropic.event_handler_wrapper import EventHandleWrapper +from agentops.semconv import CoreAttributes + +def test_event_handler_initialization(): + """Test that event handler initializes correctly with a span and no original handler.""" + span = MagicMock(spec=Span) + handler = EventHandleWrapper(None, span) + assert handler._span == span + assert handler._original_handler is None + +def test_event_handler_with_original_handler(): + """Test that event handler properly stores the original handler reference.""" + original_handler = MagicMock() + span = MagicMock(spec=Span) + handler = EventHandleWrapper(original_handler, span) + assert handler._original_handler == original_handler + +def test_event_forwarding(): + """Test that all event types are correctly forwarded to the original handler + while maintaining the original event data.""" + original_handler = MagicMock() + span = MagicMock(spec=Span) + handler = EventHandleWrapper(original_handler, span) + + event = {"type": "test"} + handler.on_event(event) + original_handler.on_event.assert_called_with(event) + + delta = {"text": "test"} + snapshot = {"content": "test"} + handler.on_text_delta(delta, snapshot) + original_handler.on_text_delta.assert_called_with(delta, snapshot) + + content_block = {"type": "text"} + handler.on_content_block_start(content_block) + original_handler.on_content_block_start.assert_called_with(content_block) + +def test_event_handler_without_original_handler(): + """Test that event handler gracefully handles events when no original handler + is provided, ensuring no exceptions are raised.""" + span = MagicMock(spec=Span) + handler = EventHandleWrapper(None, span) + + handler.on_event({}) + handler.on_text_delta({}, {}) + handler.on_content_block_start({}) + +def test_error_handling(): + """Test that errors are properly recorded in the span and forwarded to the + original handler with correct error attributes.""" + original_handler = MagicMock() + span = MagicMock(spec=Span) + handler = EventHandleWrapper(original_handler, span) + + error = Exception("Test error") + handler.on_error(error) + + span.record_exception.assert_called_with(error) + span.set_attribute.assert_any_call(CoreAttributes.ERROR_MESSAGE, "Test error") + span.set_attribute.assert_any_call(CoreAttributes.ERROR_TYPE, "Exception") + original_handler.on_error.assert_called_with(error) + +def test_error_handling_without_original_handler(): + """Test that errors are properly recorded in the span even when no original + handler is present.""" + span = MagicMock(spec=Span) + handler = EventHandleWrapper(None, span) + + error = Exception("Test error") + handler.on_error(error) + + span.record_exception.assert_called_with(error) + span.set_attribute.assert_any_call(CoreAttributes.ERROR_MESSAGE, "Test error") + span.set_attribute.assert_any_call(CoreAttributes.ERROR_TYPE, "Exception") + +def test_error_in_original_handler(): + """Test that errors from the original handler are caught and logged without + disrupting the event handling flow.""" + original_handler = MagicMock() + original_handler.on_event.side_effect = Exception("Handler error") + span = MagicMock(spec=Span) + handler = EventHandleWrapper(original_handler, span) + + handler.on_event({}) + assert original_handler.on_event.called \ No newline at end of file diff --git a/tests/unit/instrumentation/anthropic/test_instrumentor.py b/tests/unit/instrumentation/anthropic/test_instrumentor.py new file mode 100644 index 000000000..fce7f30cc --- /dev/null +++ b/tests/unit/instrumentation/anthropic/test_instrumentor.py @@ -0,0 +1,91 @@ +import pytest +from unittest.mock import patch, MagicMock, ANY +from opentelemetry.trace import SpanKind + +from agentops.instrumentation.anthropic.instrumentor import AnthropicInstrumentor +from agentops.instrumentation.anthropic import LIBRARY_NAME, LIBRARY_VERSION +from agentops.semconv import Meters, SpanAttributes, LLMRequestTypeValues + +def test_instrumentor_initialization(): + """Test that the instrumentor initializes with correct dependencies.""" + instrumentor = AnthropicInstrumentor() + assert isinstance(instrumentor, AnthropicInstrumentor) + assert instrumentor.instrumentation_dependencies() == ["anthropic >= 0.7.0"] + +def test_instrumentor_setup(mock_tracer, mock_meter): + """Test that the instrumentor properly sets up tracers and meters with correct + configuration and attributes.""" + instrumentor = AnthropicInstrumentor() + + with patch("agentops.instrumentation.anthropic.instrumentor.get_tracer", return_value=mock_tracer) as mock_get_tracer, \ + patch("agentops.instrumentation.anthropic.instrumentor.get_meter", return_value=mock_meter) as mock_get_meter: + + instrumentor._instrument() + + mock_get_tracer.assert_called_with(LIBRARY_NAME, LIBRARY_VERSION, None) + mock_get_meter.assert_called_with(LIBRARY_NAME, LIBRARY_VERSION, None) + +def test_instrumentor_wraps_methods(mock_tracer, mock_meter): + """Test that the instrumentor correctly wraps both standard and streaming methods + with proper instrumentation.""" + instrumentor = AnthropicInstrumentor() + mock_wrap = MagicMock() + + with patch("agentops.instrumentation.anthropic.instrumentor.get_tracer", return_value=mock_tracer), \ + patch("agentops.instrumentation.anthropic.instrumentor.get_meter", return_value=mock_meter), \ + patch("agentops.instrumentation.anthropic.instrumentor.wrap", mock_wrap), \ + patch("agentops.instrumentation.anthropic.instrumentor.wrap_function_wrapper") as mock_wrap_function: + + instrumentor._instrument() + + assert mock_wrap.call_count == 4 + + mock_wrap_function.assert_any_call( + "anthropic.resources.messages.messages", + "Messages.stream", + ANY + ) + mock_wrap_function.assert_any_call( + "anthropic.resources.messages.messages", + "AsyncMessages.stream", + ANY + ) + +def test_instrumentor_uninstrument(mock_tracer, mock_meter): + """Test that the instrumentor properly unwraps all instrumented methods and + cleans up resources.""" + instrumentor = AnthropicInstrumentor() + mock_unwrap = MagicMock() + + with patch("agentops.instrumentation.anthropic.instrumentor.get_tracer", return_value=mock_tracer), \ + patch("agentops.instrumentation.anthropic.instrumentor.get_meter", return_value=mock_meter), \ + patch("agentops.instrumentation.anthropic.instrumentor.unwrap", mock_unwrap), \ + patch("opentelemetry.instrumentation.utils.unwrap") as mock_otel_unwrap: + + instrumentor._uninstrument() + + assert mock_unwrap.call_count == 4 + + mock_otel_unwrap.assert_any_call( + "anthropic.resources.messages.messages", + "Messages.stream" + ) + mock_otel_unwrap.assert_any_call( + "anthropic.resources.messages.messages", + "AsyncMessages.stream" + ) + +def test_instrumentor_handles_missing_methods(mock_tracer, mock_meter): + """Test that the instrumentor gracefully handles missing or inaccessible methods + without raising exceptions.""" + instrumentor = AnthropicInstrumentor() + mock_wrap = MagicMock(side_effect=AttributeError) + mock_wrap_function = MagicMock(side_effect=AttributeError) + + with patch("agentops.instrumentation.anthropic.instrumentor.get_tracer", return_value=mock_tracer), \ + patch("agentops.instrumentation.anthropic.instrumentor.get_meter", return_value=mock_meter), \ + patch("agentops.instrumentation.anthropic.instrumentor.wrap", mock_wrap), \ + patch("wrapt.wrap_function_wrapper", mock_wrap_function): + + instrumentor._instrument() + instrumentor._uninstrument() \ No newline at end of file diff --git a/tests/unit/instrumentation/anthropic/test_stream_wrapper.py b/tests/unit/instrumentation/anthropic/test_stream_wrapper.py new file mode 100644 index 000000000..79fe662b9 --- /dev/null +++ b/tests/unit/instrumentation/anthropic/test_stream_wrapper.py @@ -0,0 +1,122 @@ +import pytest +from unittest.mock import patch, MagicMock, ANY +from opentelemetry.trace import SpanKind + +from agentops.instrumentation.anthropic.stream_wrapper import ( + messages_stream_wrapper, + messages_stream_async_wrapper, + AsyncStreamContextManagerWrapper +) +from agentops.semconv import SpanAttributes, LLMRequestTypeValues, CoreAttributes, MessageAttributes + +def test_sync_stream_wrapper(mock_tracer, mock_stream_manager): + """Test the synchronous stream wrapper functionality including span creation, + context manager behavior, and token counting.""" + wrapper = messages_stream_wrapper(mock_tracer) + wrapped = MagicMock(return_value=mock_stream_manager) + result = wrapper(wrapped, None, [], {}) + + assert hasattr(result, "__enter__") + assert hasattr(result, "__exit__") + + mock_tracer.start_span.assert_called_with( + "anthropic.messages.stream", + kind=SpanKind.CLIENT, + attributes={SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.CHAT.value} + ) + + span = mock_tracer.start_span.return_value + with result as stream: + assert span.set_attribute.called + text = list(stream.text_stream) + assert len(text) == 5 + assert span.set_attribute.call_count > 0 + +def test_async_stream_wrapper(mock_tracer, mock_async_stream_manager): + """Test the asynchronous stream wrapper functionality including span creation + and proper async context manager setup.""" + wrapper = messages_stream_async_wrapper(mock_tracer) + wrapped = MagicMock(return_value=mock_async_stream_manager) + result = wrapper(wrapped, None, [], {}) + + assert isinstance(result, AsyncStreamContextManagerWrapper) + + mock_tracer.start_span.assert_called_with( + "anthropic.messages.stream", + kind=SpanKind.CLIENT, + attributes={SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.CHAT.value} + ) + +@pytest.mark.asyncio +async def test_async_stream_context_manager(mock_tracer, mock_async_stream_manager): + """Test the async stream context manager functionality including token counting + and attribute setting.""" + wrapper = messages_stream_async_wrapper(mock_tracer) + wrapped = MagicMock(return_value=mock_async_stream_manager) + result = wrapper(wrapped, None, [], {}) + + async with result as stream: + span = mock_tracer.start_span.return_value + assert span.set_attribute.called + + text = [] + async for chunk in stream.text_stream: + text.append(chunk) + assert len(text) == 5 + assert span.set_attribute.call_count > 0 + +def test_stream_error_handling(mock_tracer): + """Test error handling in stream wrapper including exception recording and + attribute setting.""" + wrapper = messages_stream_wrapper(mock_tracer) + wrapped = MagicMock(side_effect=Exception("Test error")) + + with pytest.raises(Exception): + wrapper(wrapped, None, [], {}) + + span = mock_tracer.start_span.return_value + span.record_exception.assert_called() + span.set_attribute.assert_any_call(CoreAttributes.ERROR_MESSAGE, "Test error") + span.set_attribute.assert_any_call(CoreAttributes.ERROR_TYPE, "Exception") + span.end.assert_called() + +def test_stream_with_event_handler(mock_tracer, mock_stream_manager, mock_event_handler): + """Test stream wrapper with event handler including proper event forwarding + and handler integration.""" + wrapper = messages_stream_wrapper(mock_tracer) + wrapped = MagicMock(return_value=mock_stream_manager) + result = wrapper(wrapped, None, [], {"event_handler": mock_event_handler}) + + assert hasattr(result, "__enter__") + assert hasattr(result, "__exit__") + + with result as stream: + text = list(stream.text_stream) + assert len(text) == 5 + assert mock_event_handler.on_text_delta.call_count > 0 + +def test_stream_final_message_attributes(mock_tracer, mock_stream_manager): + """Test that final message attributes are properly captured and set on the span.""" + wrapper = messages_stream_wrapper(mock_tracer) + wrapped = MagicMock(return_value=mock_stream_manager) + + final_message = MagicMock() + final_message.content = [MagicMock(text="Final response")] + final_message.usage = MagicMock( + input_tokens=10, + output_tokens=20 + ) + mock_stream_manager._MessageStreamManager__stream._MessageStream__final_message_snapshot = final_message + + result = wrapper(wrapped, None, [], {}) + + with result as stream: + list(stream.text_stream) + + span = mock_tracer.start_span.return_value + span.set_attribute.assert_any_call(MessageAttributes.COMPLETION_TYPE.format(i=0), "text") + span.set_attribute.assert_any_call(MessageAttributes.COMPLETION_ROLE.format(i=0), "assistant") + span.set_attribute.assert_any_call(MessageAttributes.COMPLETION_CONTENT.format(i=0), "Final response") + span.set_attribute.assert_any_call(SpanAttributes.LLM_USAGE_PROMPT_TOKENS, 10) + span.set_attribute.assert_any_call(SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, 20) + span.set_attribute.assert_any_call(SpanAttributes.LLM_USAGE_TOTAL_TOKENS, 30) \ No newline at end of file diff --git a/tests/unit/instrumentation/fixtures/anthropic_conversation.json b/tests/unit/instrumentation/fixtures/anthropic_conversation.json new file mode 100644 index 000000000..3246e4d72 --- /dev/null +++ b/tests/unit/instrumentation/fixtures/anthropic_conversation.json @@ -0,0 +1,21 @@ +{ + "id": "msg_01NdttFm4EZyLDk6s4hutk9P", + "content": [ + { + "citations": null, + "text": "Japan is an excellent choice for a trip! It has a rich culture, delicious food, and stunning landscapes. Here are a few steps to help you plan your trip:\n\n1. Decide when to go: Japan has four distinct seasons, each offering unique experiences. Cherry blossom season (late March to early April) and autumn foliage season (October to November) are popular times to visit.\n\n2. Choose your destinations: Some popular cities include Tokyo, Kyoto", + "type": "text" + } + ], + "model": "claude-3-opus-20240229", + "role": "assistant", + "stop_reason": "max_tokens", + "stop_sequence": null, + "type": "message", + "usage": { + "cache_creation_input_tokens": 0, + "cache_read_input_tokens": 0, + "input_tokens": 43, + "output_tokens": 100 + } +} \ No newline at end of file diff --git a/tests/unit/instrumentation/fixtures/anthropic_message.json b/tests/unit/instrumentation/fixtures/anthropic_message.json new file mode 100644 index 000000000..3389837d1 --- /dev/null +++ b/tests/unit/instrumentation/fixtures/anthropic_message.json @@ -0,0 +1,21 @@ +{ + "id": "msg_01BivLm8ByQh7vss4xQptv5u", + "content": [ + { + "citations": null, + "text": "The capital of France is Paris.", + "type": "text" + } + ], + "model": "claude-3-opus-20240229", + "role": "assistant", + "stop_reason": "end_turn", + "stop_sequence": null, + "type": "message", + "usage": { + "cache_creation_input_tokens": 0, + "cache_read_input_tokens": 0, + "input_tokens": 14, + "output_tokens": 10 + } +} \ No newline at end of file diff --git a/tests/unit/instrumentation/fixtures/anthropic_stream.json b/tests/unit/instrumentation/fixtures/anthropic_stream.json new file mode 100644 index 000000000..3219cfa59 --- /dev/null +++ b/tests/unit/instrumentation/fixtures/anthropic_stream.json @@ -0,0 +1,41 @@ +{ + "messages": [ + { + "type": "text", + "content": "1..." + }, + { + "type": "text", + "content": "\n2...\n3" + }, + { + "type": "text", + "content": "...\n4..." + }, + { + "type": "text", + "content": "\n5." + } + ], + "final_message": { + "id": "msg_019SYRnQyEzgoDFAbodrKH1o", + "content": [ + { + "citations": null, + "text": "Here is me counting slowly from 1 to 5:\n\n1...\n2...\n3...\n4...\n5.", + "type": "text" + } + ], + "model": "claude-3-opus-20240229", + "role": "assistant", + "stop_reason": "end_turn", + "stop_sequence": null, + "type": "message", + "usage": { + "cache_creation_input_tokens": 0, + "cache_read_input_tokens": 0, + "input_tokens": 18, + "output_tokens": 31 + } + } +} \ No newline at end of file diff --git a/tests/unit/instrumentation/fixtures/anthropic_system_message.json b/tests/unit/instrumentation/fixtures/anthropic_system_message.json new file mode 100644 index 000000000..c402f2058 --- /dev/null +++ b/tests/unit/instrumentation/fixtures/anthropic_system_message.json @@ -0,0 +1,21 @@ +{ + "id": "msg_01B6YbGDPQbBzjnz9F8uXZe3", + "content": [ + { + "citations": null, + "text": "Python is a high-level, general-purpose programming language known for its simplicity, readability, and versatility. It supports multiple programming paradigms, including procedural, object-oriented, and functional programming. Python is widely used for web development, scientific computing, artificial intelligence, data analysis, and automation tasks. Its extensive standard library and vast ecosystem of third-party packages make it a popular choice among developers.", + "type": "text" + } + ], + "model": "claude-3-opus-20240229", + "role": "assistant", + "stop_reason": "end_turn", + "stop_sequence": null, + "type": "message", + "usage": { + "cache_creation_input_tokens": 0, + "cache_read_input_tokens": 0, + "input_tokens": 22, + "output_tokens": 88 + } +} \ No newline at end of file diff --git a/tests/unit/instrumentation/fixtures/generate_anthropic_fixtures.py b/tests/unit/instrumentation/fixtures/generate_anthropic_fixtures.py new file mode 100644 index 000000000..7cf8202a3 --- /dev/null +++ b/tests/unit/instrumentation/fixtures/generate_anthropic_fixtures.py @@ -0,0 +1,105 @@ +import os +import json +from pathlib import Path +from dotenv import load_dotenv +from anthropic import Anthropic + +# Load environment variables +load_dotenv() + +# Initialize Anthropic client +api_key = os.getenv("ANTHROPIC_API_KEY") +if not api_key: + raise ValueError("ANTHROPIC_API_KEY not found in environment variables") + +client = Anthropic() + +# Directory to save fixtures +FIXTURES_DIR = Path(__file__).parent + +def save_fixture(data, filename): + """Save response data as a JSON fixture""" + filepath = FIXTURES_DIR / filename + with open(filepath, 'w') as f: + json.dump(data, f, indent=2) + print(f"Saved fixture: {filepath}") + +def generate_fixtures(): + """Generate various Anthropic API response fixtures""" + + # 1. Basic message completion + message_response = client.messages.create( + model="claude-3-opus-20240229", + max_tokens=100, + messages=[{ + "role": "user", + "content": "What is the capital of France?" + }] + ) + save_fixture(message_response.model_dump(), "anthropic_message.json") + + # 2. Message with system prompt + system_message_response = client.messages.create( + model="claude-3-opus-20240229", + max_tokens=100, + system="You are a helpful assistant that provides concise answers.", + messages=[{ + "role": "user", + "content": "What is Python?" + }] + ) + save_fixture(system_message_response.model_dump(), "anthropic_system_message.json") + + # 3. Multi-turn conversation + conversation_response = client.messages.create( + model="claude-3-opus-20240229", + max_tokens=100, + messages=[ + { + "role": "user", + "content": "Let's plan a trip." + }, + { + "role": "assistant", + "content": "I'd be happy to help plan a trip. Where would you like to go?" + }, + { + "role": "user", + "content": "I'm thinking about visiting Japan." + } + ] + ) + save_fixture(conversation_response.model_dump(), "anthropic_conversation.json") + + # 4. Streaming response + stream_messages = [] + stream = client.messages.stream( + model="claude-3-opus-20240229", + max_tokens=100, + messages=[{ + "role": "user", + "content": "Count from 1 to 5 slowly." + }] + ) + + with stream as response: + for text in response.text_stream: + stream_messages.append({"type": "text", "content": text}) + + # Get the final message after streaming is complete + final_message = client.messages.create( + model="claude-3-opus-20240229", + max_tokens=100, + messages=[{ + "role": "user", + "content": "Count from 1 to 5 slowly." + }] + ) + + save_fixture({ + "messages": stream_messages, + "final_message": final_message.model_dump() + }, "anthropic_stream.json") + +if __name__ == "__main__": + generate_fixtures() \ No newline at end of file diff --git a/third_party/opentelemetry/instrumentation/anthropic/LICENSE b/third_party/opentelemetry/instrumentation/anthropic/LICENSE deleted file mode 100644 index 0f2a333f0..000000000 --- a/third_party/opentelemetry/instrumentation/anthropic/LICENSE +++ /dev/null @@ -1,201 +0,0 @@ - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright 2023 openllmetry - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff --git a/third_party/opentelemetry/instrumentation/anthropic/NOTICE.md b/third_party/opentelemetry/instrumentation/anthropic/NOTICE.md deleted file mode 100644 index ca711b794..000000000 --- a/third_party/opentelemetry/instrumentation/anthropic/NOTICE.md +++ /dev/null @@ -1,8 +0,0 @@ -This package contains code derived from the OpenLLMetry project, which is licensed under the Apache License, Version 2.0. - -Original repository: https://github.com/traceloop/openllmetry - -Copyright notice from the original project: -Copyright (c) Traceloop (https://traceloop.com) - -The Apache 2.0 license can be found in the LICENSE file in this directory. diff --git a/third_party/opentelemetry/instrumentation/anthropic/__init__.py b/third_party/opentelemetry/instrumentation/anthropic/__init__.py deleted file mode 100644 index 0cb718c0e..000000000 --- a/third_party/opentelemetry/instrumentation/anthropic/__init__.py +++ /dev/null @@ -1,800 +0,0 @@ -"""OpenTelemetry Anthropic instrumentation""" - -import json -import logging -import os -import time -from typing import Callable, Collection, Dict, Any, Optional -from typing_extensions import Coroutine - -from anthropic._streaming import AsyncStream, Stream -from opentelemetry import context as context_api -from opentelemetry.instrumentation.anthropic.config import Config -from opentelemetry.instrumentation.anthropic.streaming import ( - abuild_from_streaming_response, - build_from_streaming_response, -) -from opentelemetry.instrumentation.anthropic.utils import ( - acount_prompt_tokens_from_request, - dont_throw, - error_metrics_attributes, - count_prompt_tokens_from_request, - run_async, - set_span_attribute, - shared_metrics_attributes, - should_send_prompts, -) -from opentelemetry.instrumentation.anthropic.version import __version__ -from opentelemetry.instrumentation.instrumentor import BaseInstrumentor -from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY, unwrap -from opentelemetry.metrics import Counter, Histogram, Meter, get_meter -from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_RESPONSE_ID -from agentops.semconv import ( - SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY, - LLMRequestTypeValues, - SpanAttributes, - Meters, -) -from opentelemetry.trace import SpanKind, Tracer, get_tracer -from opentelemetry.trace.status import Status, StatusCode -from wrapt import wrap_function_wrapper - -logger = logging.getLogger(__name__) - -_instruments = ("anthropic >= 0.3.11",) - -WRAPPED_METHODS = [ - { - "package": "anthropic.resources.completions", - "object": "Completions", - "method": "create", - "span_name": "anthropic.completion", - }, - { - "package": "anthropic.resources.messages", - "object": "Messages", - "method": "create", - "span_name": "anthropic.chat", - }, - { - "package": "anthropic.resources.messages", - "object": "Messages", - "method": "stream", - "span_name": "anthropic.chat", - }, - { - "package": "anthropic.resources.beta.prompt_caching.messages", - "object": "Messages", - "method": "create", - "span_name": "anthropic.chat", - }, - { - "package": "anthropic.resources.beta.prompt_caching.messages", - "object": "Messages", - "method": "stream", - "span_name": "anthropic.chat", - }, -] -WRAPPED_AMETHODS = [ - { - "package": "anthropic.resources.completions", - "object": "AsyncCompletions", - "method": "create", - "span_name": "anthropic.completion", - }, - { - "package": "anthropic.resources.messages", - "object": "AsyncMessages", - "method": "create", - "span_name": "anthropic.chat", - }, - { - "package": "anthropic.resources.messages", - "object": "AsyncMessages", - "method": "stream", - "span_name": "anthropic.chat", - }, - { - "package": "anthropic.resources.beta.prompt_caching.messages", - "object": "AsyncMessages", - "method": "create", - "span_name": "anthropic.chat", - }, - { - "package": "anthropic.resources.beta.prompt_caching.messages", - "object": "AsyncMessages", - "method": "stream", - "span_name": "anthropic.chat", - }, -] - - -def is_streaming_response(response): - return isinstance(response, Stream) or isinstance(response, AsyncStream) - - -async def _process_image_item(item, trace_id, span_id, message_index, content_index): - if not Config.upload_base64_image: - return item - - image_format = item.get("source").get("media_type").split("/")[1] - image_name = f"message_{message_index}_content_{content_index}.{image_format}" - base64_string = item.get("source").get("data") - url = await Config.upload_base64_image(trace_id, span_id, image_name, base64_string) - - return {"type": "image_url", "image_url": {"url": url}} - - -async def _dump_content(message_index, content, span): - if isinstance(content, str): - return content - elif isinstance(content, list): - # If the content is a list of text blocks, concatenate them. - # This is more commonly used in prompt caching. - if all([item.get("type") == "text" for item in content]): - return "".join([item.get("text") for item in content]) - - content = [ - ( - await _process_image_item(item, span.context.trace_id, span.context.span_id, message_index, j) - if _is_base64_image(item) - else item - ) - for j, item in enumerate(content) - ] - - return json.dumps(content) - - -@dont_throw -async def _aset_input_attributes(span, kwargs): - set_span_attribute(span, SpanAttributes.LLM_REQUEST_MODEL, kwargs.get("model")) - set_span_attribute(span, SpanAttributes.LLM_REQUEST_MAX_TOKENS, kwargs.get("max_tokens_to_sample")) - set_span_attribute(span, SpanAttributes.LLM_REQUEST_TEMPERATURE, kwargs.get("temperature")) - set_span_attribute(span, SpanAttributes.LLM_REQUEST_TOP_P, kwargs.get("top_p")) - set_span_attribute(span, SpanAttributes.LLM_REQUEST_FREQUENCY_PENALTY, kwargs.get("frequency_penalty")) - set_span_attribute(span, SpanAttributes.LLM_REQUEST_PRESENCE_PENALTY, kwargs.get("presence_penalty")) - set_span_attribute(span, SpanAttributes.LLM_REQUEST_STREAMING, kwargs.get("stream")) - - if should_send_prompts(): - if kwargs.get("prompt") is not None: - set_span_attribute(span, f"{SpanAttributes.LLM_PROMPTS}.0.user", kwargs.get("prompt")) - - elif kwargs.get("messages") is not None: - has_system_message = False - if kwargs.get("system"): - has_system_message = True - set_span_attribute( - span, - f"{SpanAttributes.LLM_PROMPTS}.0.content", - await _dump_content(message_index=0, span=span, content=kwargs.get("system")), - ) - set_span_attribute( - span, - f"{SpanAttributes.LLM_PROMPTS}.0.role", - "system", - ) - for i, message in enumerate(kwargs.get("messages")): - prompt_index = i + (1 if has_system_message else 0) - set_span_attribute( - span, - f"{SpanAttributes.LLM_PROMPTS}.{prompt_index}.content", - await _dump_content(message_index=i, span=span, content=message.get("content")), - ) - set_span_attribute( - span, - f"{SpanAttributes.LLM_PROMPTS}.{prompt_index}.role", - message.get("role"), - ) - - if kwargs.get("tools") is not None: - for i, tool in enumerate(kwargs.get("tools")): - prefix = f"{SpanAttributes.LLM_REQUEST_FUNCTIONS}.{i}" - set_span_attribute(span, f"{prefix}.name", tool.get("name")) - set_span_attribute(span, f"{prefix}.description", tool.get("description")) - input_schema = tool.get("input_schema") - if input_schema is not None: - set_span_attribute(span, f"{prefix}.input_schema", json.dumps(input_schema)) - - -def _set_span_completions(span, response): - index = 0 - prefix = f"{SpanAttributes.LLM_COMPLETIONS}.{index}" - set_span_attribute(span, f"{prefix}.finish_reason", response.get("stop_reason")) - if response.get("role"): - set_span_attribute(span, f"{prefix}.role", response.get("role")) - - if response.get("completion"): - set_span_attribute(span, f"{prefix}.content", response.get("completion")) - elif response.get("content"): - tool_call_index = 0 - text = "" - for content in response.get("content"): - content_block_type = content.type - # usually, Antrhopic responds with just one text block, - # but the API allows for multiple text blocks, so concatenate them - if content_block_type == "text": - text += content.text - elif content_block_type == "tool_use": - content = dict(content) - set_span_attribute( - span, - f"{prefix}.tool_calls.{tool_call_index}.id", - content.get("id"), - ) - set_span_attribute( - span, - f"{prefix}.tool_calls.{tool_call_index}.name", - content.get("name"), - ) - tool_arguments = content.get("input") - if tool_arguments is not None: - set_span_attribute( - span, - f"{prefix}.tool_calls.{tool_call_index}.arguments", - json.dumps(tool_arguments), - ) - tool_call_index += 1 - set_span_attribute(span, f"{prefix}.content", text) - - -@dont_throw -async def _aset_token_usage( - span, - anthropic, - request, - response, - metric_attributes: dict = {}, - token_histogram: Histogram = None, - choice_counter: Counter = None, -): - if not isinstance(response, dict): - response = response.__dict__ - - if usage := response.get("usage"): - prompt_tokens = usage.input_tokens - else: - prompt_tokens = await acount_prompt_tokens_from_request(anthropic, request) - - if usage := response.get("usage"): - cache_read_tokens = dict(usage).get("cache_read_input_tokens", 0) - else: - cache_read_tokens = 0 - - if usage := response.get("usage"): - cache_creation_tokens = dict(usage).get("cache_creation_input_tokens", 0) - else: - cache_creation_tokens = 0 - - input_tokens = prompt_tokens + cache_read_tokens + cache_creation_tokens - - if token_histogram and isinstance(input_tokens, int) and input_tokens >= 0: - token_histogram.record( - input_tokens, - attributes={ - **metric_attributes, - SpanAttributes.LLM_TOKEN_TYPE: "input", - }, - ) - - if usage := response.get("usage"): - completion_tokens = usage.output_tokens - else: - completion_tokens = 0 - if hasattr(anthropic, "count_tokens"): - if response.get("completion"): - completion_tokens = await anthropic.count_tokens(response.get("completion")) - elif response.get("content"): - completion_tokens = await anthropic.count_tokens(response.get("content")[0].text) - - if token_histogram and isinstance(completion_tokens, int) and completion_tokens >= 0: - token_histogram.record( - completion_tokens, - attributes={ - **metric_attributes, - SpanAttributes.LLM_TOKEN_TYPE: "output", - }, - ) - - total_tokens = input_tokens + completion_tokens - - choices = 0 - if isinstance(response.get("content"), list): - choices = len(response.get("content")) - elif response.get("completion"): - choices = 1 - - if choices > 0 and choice_counter: - choice_counter.add( - choices, - attributes={ - **metric_attributes, - SpanAttributes.LLM_RESPONSE_STOP_REASON: response.get("stop_reason"), - }, - ) - - set_span_attribute(span, SpanAttributes.LLM_USAGE_PROMPT_TOKENS, input_tokens) - set_span_attribute(span, SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, completion_tokens) - set_span_attribute(span, SpanAttributes.LLM_USAGE_TOTAL_TOKENS, total_tokens) - - set_span_attribute(span, SpanAttributes.LLM_USAGE_CACHE_READ_INPUT_TOKENS, cache_read_tokens) - set_span_attribute(span, SpanAttributes.LLM_USAGE_CACHE_CREATION_INPUT_TOKENS, cache_creation_tokens) - - -@dont_throw -def _set_token_usage( - span, - anthropic, - request, - response, - metric_attributes: dict = {}, - token_histogram: Histogram = None, - choice_counter: Counter = None, -): - if not isinstance(response, dict): - response = response.__dict__ - - if usage := response.get("usage"): - prompt_tokens = usage.input_tokens - else: - prompt_tokens = count_prompt_tokens_from_request(anthropic, request) - - if usage := response.get("usage"): - cache_read_tokens = dict(usage).get("cache_read_input_tokens", 0) - else: - cache_read_tokens = 0 - - if usage := response.get("usage"): - cache_creation_tokens = dict(usage).get("cache_creation_input_tokens", 0) - else: - cache_creation_tokens = 0 - - input_tokens = prompt_tokens + cache_read_tokens + cache_creation_tokens - - if token_histogram and isinstance(input_tokens, int) and input_tokens >= 0: - token_histogram.record( - input_tokens, - attributes={ - **metric_attributes, - SpanAttributes.LLM_TOKEN_TYPE: "input", - }, - ) - - if usage := response.get("usage"): - completion_tokens = usage.output_tokens - else: - completion_tokens = 0 - if hasattr(anthropic, "count_tokens"): - if response.get("completion"): - completion_tokens = anthropic.count_tokens(response.get("completion")) - elif response.get("content"): - completion_tokens = anthropic.count_tokens(response.get("content")[0].text) - - if token_histogram and isinstance(completion_tokens, int) and completion_tokens >= 0: - token_histogram.record( - completion_tokens, - attributes={ - **metric_attributes, - SpanAttributes.LLM_TOKEN_TYPE: "output", - }, - ) - - total_tokens = input_tokens + completion_tokens - - choices = 0 - if isinstance(response.get("content"), list): - choices = len(response.get("content")) - elif response.get("completion"): - choices = 1 - - if choices > 0 and choice_counter: - choice_counter.add( - choices, - attributes={ - **metric_attributes, - SpanAttributes.LLM_RESPONSE_STOP_REASON: response.get("stop_reason"), - }, - ) - - set_span_attribute(span, SpanAttributes.LLM_USAGE_PROMPT_TOKENS, input_tokens) - set_span_attribute(span, SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, completion_tokens) - set_span_attribute(span, SpanAttributes.LLM_USAGE_TOTAL_TOKENS, total_tokens) - - set_span_attribute(span, SpanAttributes.LLM_USAGE_CACHE_READ_INPUT_TOKENS, cache_read_tokens) - set_span_attribute(span, SpanAttributes.LLM_USAGE_CACHE_CREATION_INPUT_TOKENS, cache_creation_tokens) - - -@dont_throw -def _set_response_attributes(span, response): - if not isinstance(response, dict): - response = response.__dict__ - set_span_attribute(span, SpanAttributes.LLM_RESPONSE_MODEL, response.get("model")) - set_span_attribute(span, GEN_AI_RESPONSE_ID, response.get("id")) - - if response.get("usage"): - prompt_tokens = response.get("usage").input_tokens - completion_tokens = response.get("usage").output_tokens - set_span_attribute(span, SpanAttributes.LLM_USAGE_PROMPT_TOKENS, prompt_tokens) - set_span_attribute(span, SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, completion_tokens) - set_span_attribute( - span, - SpanAttributes.LLM_USAGE_TOTAL_TOKENS, - prompt_tokens + completion_tokens, - ) - - if should_send_prompts(): - _set_span_completions(span, response) - - -def _with_tracer_wrapper(func): - """Helper for providing tracer for wrapper functions.""" - - def _with_tracer(tracer, to_wrap): - def wrapper(wrapped, instance, args, kwargs): - return func(tracer, to_wrap, wrapped, instance, args, kwargs) - - return wrapper - - return _with_tracer - - -def _with_chat_telemetry_wrapper(func): - """Helper for providing tracer for wrapper functions. Includes metric collectors.""" - - def _with_chat_telemetry( - tracer, - token_histogram, - choice_counter, - duration_histogram, - exception_counter, - to_wrap, - ): - def wrapper(wrapped, instance, args, kwargs): - return func( - tracer, - token_histogram, - choice_counter, - duration_histogram, - exception_counter, - to_wrap, - wrapped, - instance, - args, - kwargs, - ) - - return wrapper - - return _with_chat_telemetry - - -def _create_metrics(meter: Meter): - token_histogram = meter.create_histogram( - name=Meters.LLM_TOKEN_USAGE, - unit="token", - description="Measures number of input and output tokens used", - ) - - choice_counter = meter.create_counter( - name=Meters.LLM_GENERATION_CHOICES, - unit="choice", - description="Number of choices returned by chat completions call", - ) - - duration_histogram = meter.create_histogram( - name=Meters.LLM_OPERATION_DURATION, - unit="s", - description="GenAI operation duration", - ) - - exception_counter = meter.create_counter( - name=Meters.LLM_ANTHROPIC_COMPLETION_EXCEPTIONS, - unit="time", - description="Number of exceptions occurred during chat completions", - ) - - return token_histogram, choice_counter, duration_histogram, exception_counter - - -def _is_base64_image(item: Dict[str, Any]) -> bool: - if not isinstance(item, dict): - return False - - if not isinstance(item.get("source"), dict): - return False - - if item.get("type") != "image" or item["source"].get("type") != "base64": - return False - - return True - - -@_with_chat_telemetry_wrapper -def _wrap( - tracer: Tracer, - token_histogram: Histogram, - choice_counter: Counter, - duration_histogram: Histogram, - exception_counter: Counter, - to_wrap, - wrapped, - instance, - args, - kwargs, -): - """Instruments and calls every function defined in TO_WRAP.""" - if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY) or context_api.get_value( - SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY - ): - return wrapped(*args, **kwargs) - - name = to_wrap.get("span_name") - span = tracer.start_span( - name, - kind=SpanKind.CLIENT, - attributes={ - SpanAttributes.LLM_SYSTEM: "Anthropic", - SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.COMPLETION.value, - }, - ) - - if span.is_recording(): - run_async(_aset_input_attributes(span, kwargs)) - - start_time = time.time() - try: - response = wrapped(*args, **kwargs) - except Exception as e: # pylint: disable=broad-except - end_time = time.time() - attributes = error_metrics_attributes(e) - - if duration_histogram: - duration = end_time - start_time - duration_histogram.record(duration, attributes=attributes) - - if exception_counter: - exception_counter.add(1, attributes=attributes) - - raise e - - end_time = time.time() - - if is_streaming_response(response): - return build_from_streaming_response( - span, - response, - instance._client, - start_time, - token_histogram, - choice_counter, - duration_histogram, - exception_counter, - kwargs, - ) - elif response: - try: - metric_attributes = shared_metrics_attributes(response) - - if duration_histogram: - duration = time.time() - start_time - duration_histogram.record( - duration, - attributes=metric_attributes, - ) - - if span.is_recording(): - _set_response_attributes(span, response) - _set_token_usage( - span, - instance._client, - kwargs, - response, - metric_attributes, - token_histogram, - choice_counter, - ) - - except Exception as ex: # pylint: disable=broad-except - logger.warning( - "Failed to set response attributes for anthropic span, error: %s", - str(ex), - ) - if span.is_recording(): - span.set_status(Status(StatusCode.OK)) - span.end() - return response - - -@_with_chat_telemetry_wrapper -async def _awrap( - tracer, - token_histogram: Histogram, - choice_counter: Counter, - duration_histogram: Histogram, - exception_counter: Counter, - to_wrap, - wrapped, - instance, - args, - kwargs, -): - """Instruments and calls every function defined in TO_WRAP.""" - if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY) or context_api.get_value( - SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY - ): - return await wrapped(*args, **kwargs) - - name = to_wrap.get("span_name") - span = tracer.start_span( - name, - kind=SpanKind.CLIENT, - attributes={ - SpanAttributes.LLM_SYSTEM: "Anthropic", - SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.COMPLETION.value, - }, - ) - try: - if span.is_recording(): - await _aset_input_attributes(span, kwargs) - - except Exception as ex: # pylint: disable=broad-except - logger.warning("Failed to set input attributes for anthropic span, error: %s", str(ex)) - - start_time = time.time() - try: - response = await wrapped(*args, **kwargs) - except Exception as e: # pylint: disable=broad-except - end_time = time.time() - attributes = error_metrics_attributes(e) - - if duration_histogram: - duration = end_time - start_time - duration_histogram.record(duration, attributes=attributes) - - if exception_counter: - exception_counter.add(1, attributes=attributes) - - raise e - - if is_streaming_response(response): - return abuild_from_streaming_response( - span, - response, - instance._client, - start_time, - token_histogram, - choice_counter, - duration_histogram, - exception_counter, - kwargs, - ) - elif response: - metric_attributes = shared_metrics_attributes(response) - - if duration_histogram: - duration = time.time() - start_time - duration_histogram.record( - duration, - attributes=metric_attributes, - ) - - if span.is_recording(): - _set_response_attributes(span, response) - await _aset_token_usage( - span, - instance._client, - kwargs, - response, - metric_attributes, - token_histogram, - choice_counter, - ) - - if span.is_recording(): - span.set_status(Status(StatusCode.OK)) - span.end() - return response - - -def is_metrics_enabled() -> bool: - return (os.getenv("TRACELOOP_METRICS_ENABLED") or "true").lower() == "true" - - -class AnthropicInstrumentor(BaseInstrumentor): - """An instrumentor for Anthropic's client library.""" - - def __init__( - self, - enrich_token_usage: bool = False, - exception_logger=None, - get_common_metrics_attributes: Callable[[], dict] = lambda: {}, - upload_base64_image: Optional[Callable[[str, str, str, str], Coroutine[None, None, str]]] = None, - ): - super().__init__() - Config.exception_logger = exception_logger - Config.enrich_token_usage = enrich_token_usage - Config.get_common_metrics_attributes = get_common_metrics_attributes - Config.upload_base64_image = upload_base64_image - - def instrumentation_dependencies(self) -> Collection[str]: - return _instruments - - def _instrument(self, **kwargs): - tracer_provider = kwargs.get("tracer_provider") - tracer = get_tracer(__name__, __version__, tracer_provider) - - # meter and counters are inited here - meter_provider = kwargs.get("meter_provider") - meter = get_meter(__name__, __version__, meter_provider) - - if is_metrics_enabled(): - ( - token_histogram, - choice_counter, - duration_histogram, - exception_counter, - ) = _create_metrics(meter) - else: - ( - token_histogram, - choice_counter, - duration_histogram, - exception_counter, - ) = (None, None, None, None) - - for wrapped_method in WRAPPED_METHODS: - wrap_package = wrapped_method.get("package") - wrap_object = wrapped_method.get("object") - wrap_method = wrapped_method.get("method") - - try: - wrap_function_wrapper( - wrap_package, - f"{wrap_object}.{wrap_method}", - _wrap( - tracer, - token_histogram, - choice_counter, - duration_histogram, - exception_counter, - wrapped_method, - ), - ) - except ModuleNotFoundError: - pass # that's ok, we don't want to fail if some methods do not exist - - for wrapped_method in WRAPPED_AMETHODS: - wrap_package = wrapped_method.get("package") - wrap_object = wrapped_method.get("object") - wrap_method = wrapped_method.get("method") - try: - wrap_function_wrapper( - wrap_package, - f"{wrap_object}.{wrap_method}", - _awrap( - tracer, - token_histogram, - choice_counter, - duration_histogram, - exception_counter, - wrapped_method, - ), - ) - except ModuleNotFoundError: - pass # that's ok, we don't want to fail if some methods do not exist - - def _uninstrument(self, **kwargs): - for wrapped_method in WRAPPED_METHODS: - wrap_package = wrapped_method.get("package") - wrap_object = wrapped_method.get("object") - unwrap( - f"{wrap_package}.{wrap_object}", - wrapped_method.get("method"), - ) - for wrapped_method in WRAPPED_AMETHODS: - wrap_package = wrapped_method.get("package") - wrap_object = wrapped_method.get("object") - unwrap( - f"{wrap_package}.{wrap_object}", - wrapped_method.get("method"), - ) diff --git a/third_party/opentelemetry/instrumentation/anthropic/config.py b/third_party/opentelemetry/instrumentation/anthropic/config.py deleted file mode 100644 index 898b2bad4..000000000 --- a/third_party/opentelemetry/instrumentation/anthropic/config.py +++ /dev/null @@ -1,9 +0,0 @@ -from typing import Callable, Optional -from typing_extensions import Coroutine - - -class Config: - enrich_token_usage = False - exception_logger = None - get_common_metrics_attributes: Callable[[], dict] = lambda: {} # noqa: E731 - upload_base64_image: Optional[Callable[[str, str, str, str], Coroutine[None, None, str]]] = None diff --git a/third_party/opentelemetry/instrumentation/anthropic/streaming.py b/third_party/opentelemetry/instrumentation/anthropic/streaming.py deleted file mode 100644 index ed839e144..000000000 --- a/third_party/opentelemetry/instrumentation/anthropic/streaming.py +++ /dev/null @@ -1,260 +0,0 @@ -import logging -import time - -from opentelemetry.instrumentation.anthropic.config import Config -from opentelemetry.instrumentation.anthropic.utils import ( - dont_throw, - error_metrics_attributes, - count_prompt_tokens_from_request, - set_span_attribute, - shared_metrics_attributes, - should_send_prompts, -) -from opentelemetry.metrics import Counter, Histogram -from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import GEN_AI_RESPONSE_ID -from agentops.semconv import SpanAttributes -from opentelemetry.trace.status import Status, StatusCode - -logger = logging.getLogger(__name__) - - -@dont_throw -def _process_response_item(item, complete_response): - if item.type == "message_start": - complete_response["model"] = item.message.model - complete_response["usage"] = dict(item.message.usage) - complete_response["id"] = item.message.id - elif item.type == "content_block_start": - index = item.index - if len(complete_response.get("events")) <= index: - complete_response["events"].append({"index": index, "text": ""}) - elif item.type == "content_block_delta" and item.delta.type == "text_delta": - index = item.index - complete_response.get("events")[index]["text"] += item.delta.text - elif item.type == "message_delta": - for event in complete_response.get("events", []): - event["finish_reason"] = item.delta.stop_reason - if item.usage: - if "usage" in complete_response: - item_output_tokens = dict(item.usage).get("output_tokens", 0) - existing_output_tokens = complete_response["usage"].get("output_tokens", 0) - complete_response["usage"]["output_tokens"] = item_output_tokens + existing_output_tokens - else: - complete_response["usage"] = dict(item.usage) - - -def _set_token_usage( - span, - complete_response, - prompt_tokens, - completion_tokens, - metric_attributes: dict = {}, - token_histogram: Histogram = None, - choice_counter: Counter = None, -): - cache_read_tokens = complete_response.get("usage", {}).get("cache_read_input_tokens", 0) - cache_creation_tokens = complete_response.get("usage", {}).get("cache_creation_input_tokens", 0) - - input_tokens = prompt_tokens + cache_read_tokens + cache_creation_tokens - total_tokens = input_tokens + completion_tokens - - set_span_attribute(span, SpanAttributes.LLM_USAGE_PROMPT_TOKENS, input_tokens) - set_span_attribute(span, SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, completion_tokens) - set_span_attribute(span, SpanAttributes.LLM_USAGE_TOTAL_TOKENS, total_tokens) - - set_span_attribute(span, SpanAttributes.LLM_RESPONSE_MODEL, complete_response.get("model")) - set_span_attribute(span, SpanAttributes.LLM_USAGE_CACHE_READ_INPUT_TOKENS, cache_read_tokens) - set_span_attribute(span, SpanAttributes.LLM_USAGE_CACHE_CREATION_INPUT_TOKENS, cache_creation_tokens) - - if token_histogram and isinstance(input_tokens, int) and input_tokens >= 0: - token_histogram.record( - input_tokens, - attributes={ - **metric_attributes, - SpanAttributes.LLM_TOKEN_TYPE: "input", - }, - ) - - if token_histogram and isinstance(completion_tokens, int) and completion_tokens >= 0: - token_histogram.record( - completion_tokens, - attributes={ - **metric_attributes, - SpanAttributes.LLM_TOKEN_TYPE: "output", - }, - ) - - if isinstance(complete_response.get("events"), list) and choice_counter: - for event in complete_response.get("events"): - choice_counter.add( - 1, - attributes={ - **metric_attributes, - SpanAttributes.LLM_RESPONSE_FINISH_REASON: event.get("finish_reason"), - }, - ) - - -def _set_completions(span, events): - if not span.is_recording() or not events: - return - - try: - for event in events: - index = event.get("index") - prefix = f"{SpanAttributes.LLM_COMPLETIONS}.{index}" - set_span_attribute(span, f"{prefix}.finish_reason", event.get("finish_reason")) - set_span_attribute(span, f"{prefix}.content", event.get("text")) - except Exception as e: - logger.warning("Failed to set completion attributes, error: %s", str(e)) - - -@dont_throw -def build_from_streaming_response( - span, - response, - instance, - start_time, - token_histogram: Histogram = None, - choice_counter: Counter = None, - duration_histogram: Histogram = None, - exception_counter: Counter = None, - kwargs: dict = {}, -): - complete_response = {"events": [], "model": "", "usage": {}, "id": ""} - for item in response: - try: - yield item - except Exception as e: - attributes = error_metrics_attributes(e) - if exception_counter: - exception_counter.add(1, attributes=attributes) - raise e - _process_response_item(item, complete_response) - - metric_attributes = shared_metrics_attributes(complete_response) - set_span_attribute(span, GEN_AI_RESPONSE_ID, complete_response.get("id")) - if duration_histogram: - duration = time.time() - start_time - duration_histogram.record( - duration, - attributes=metric_attributes, - ) - - # calculate token usage - if Config.enrich_token_usage: - try: - completion_tokens = -1 - # prompt_usage - if usage := complete_response.get("usage"): - prompt_tokens = usage.get("input_tokens", 0) - else: - prompt_tokens = count_prompt_tokens_from_request(instance, kwargs) - - # completion_usage - if usage := complete_response.get("usage"): - completion_tokens = usage.get("output_tokens", 0) - else: - completion_content = "" - if complete_response.get("events"): - model_name = complete_response.get("model") or None - for event in complete_response.get("events"): # type: dict - if event.get("text"): - completion_content += event.get("text") - - if model_name: - completion_tokens = instance.count_tokens(completion_content) - - _set_token_usage( - span, - complete_response, - prompt_tokens, - completion_tokens, - metric_attributes, - token_histogram, - choice_counter, - ) - except Exception as e: - logger.warning("Failed to set token usage, error: %s", e) - - if should_send_prompts(): - _set_completions(span, complete_response.get("events")) - - span.set_status(Status(StatusCode.OK)) - span.end() - - -@dont_throw -async def abuild_from_streaming_response( - span, - response, - instance, - start_time, - token_histogram: Histogram = None, - choice_counter: Counter = None, - duration_histogram: Histogram = None, - exception_counter: Counter = None, - kwargs: dict = {}, -): - complete_response = {"events": [], "model": "", "usage": {}, "id": ""} - async for item in response: - try: - yield item - except Exception as e: - attributes = error_metrics_attributes(e) - if exception_counter: - exception_counter.add(1, attributes=attributes) - raise e - _process_response_item(item, complete_response) - - set_span_attribute(span, GEN_AI_RESPONSE_ID, complete_response.get("id")) - - metric_attributes = shared_metrics_attributes(complete_response) - - if duration_histogram: - duration = time.time() - start_time - duration_histogram.record( - duration, - attributes=metric_attributes, - ) - - # calculate token usage - if Config.enrich_token_usage: - try: - # prompt_usage - if usage := complete_response.get("usage"): - prompt_tokens = usage.get("input_tokens", 0) - else: - prompt_tokens = count_prompt_tokens_from_request(instance, kwargs) - - # completion_usage - if usage := complete_response.get("usage"): - completion_tokens = usage.get("output_tokens", 0) - else: - completion_content = "" - if complete_response.get("events"): - model_name = complete_response.get("model") or None - for event in complete_response.get("events"): # type: dict - if event.get("text"): - completion_content += event.get("text") - - if model_name: - completion_tokens = instance.count_tokens(completion_content) - - _set_token_usage( - span, - complete_response, - prompt_tokens, - completion_tokens, - metric_attributes, - token_histogram, - choice_counter, - ) - except Exception as e: - logger.warning("Failed to set token usage, error: %s", str(e)) - - if should_send_prompts(): - _set_completions(span, complete_response.get("events")) - - span.set_status(Status(StatusCode.OK)) - span.end() diff --git a/third_party/opentelemetry/instrumentation/anthropic/utils.py b/third_party/opentelemetry/instrumentation/anthropic/utils.py deleted file mode 100644 index 8aa210673..000000000 --- a/third_party/opentelemetry/instrumentation/anthropic/utils.py +++ /dev/null @@ -1,131 +0,0 @@ -import asyncio -import os -import logging -import threading -import traceback -from opentelemetry import context as context_api -from opentelemetry.instrumentation.anthropic.config import Config -from agentops.semconv import SpanAttributes - -GEN_AI_SYSTEM = "gen_ai.system" -GEN_AI_SYSTEM_ANTHROPIC = "anthropic" - - -def set_span_attribute(span, name, value): - if value is not None: - if value != "": - span.set_attribute(name, value) - return - - -def should_send_prompts(): - return (os.getenv("TRACELOOP_TRACE_CONTENT") or "true").lower() == "true" or context_api.get_value( - "override_enable_content_tracing" - ) - - -def dont_throw(func): - """ - A decorator that wraps the passed in function and logs exceptions instead of throwing them. - Works for both synchronous and asynchronous functions. - """ - logger = logging.getLogger(func.__module__) - - async def async_wrapper(*args, **kwargs): - try: - return await func(*args, **kwargs) - except Exception as e: - _handle_exception(e, func, logger) - - def sync_wrapper(*args, **kwargs): - try: - return func(*args, **kwargs) - except Exception as e: - _handle_exception(e, func, logger) - - def _handle_exception(e, func, logger): - logger.debug( - "OpenLLMetry failed to trace in %s, error: %s", - func.__name__, - traceback.format_exc(), - ) - if Config.exception_logger: - Config.exception_logger(e) - - return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper - - -@dont_throw -def shared_metrics_attributes(response): - if not isinstance(response, dict): - response = response.__dict__ - - common_attributes = Config.get_common_metrics_attributes() - - return { - **common_attributes, - GEN_AI_SYSTEM: GEN_AI_SYSTEM_ANTHROPIC, - SpanAttributes.LLM_RESPONSE_MODEL: response.get("model"), - } - - -@dont_throw -def error_metrics_attributes(exception): - return { - GEN_AI_SYSTEM: GEN_AI_SYSTEM_ANTHROPIC, - "error.type": exception.__class__.__name__, - } - - -@dont_throw -def count_prompt_tokens_from_request(anthropic, request): - prompt_tokens = 0 - if hasattr(anthropic, "count_tokens"): - if request.get("prompt"): - prompt_tokens = anthropic.count_tokens(request.get("prompt")) - elif messages := request.get("messages"): - prompt_tokens = 0 - for m in messages: - content = m.get("content") - if isinstance(content, str): - prompt_tokens += anthropic.count_tokens(content) - elif isinstance(content, list): - for item in content: - # TODO: handle image and tool tokens - if isinstance(item, dict) and item.get("type") == "text": - prompt_tokens += anthropic.count_tokens(item.get("text", "")) - return prompt_tokens - - -@dont_throw -async def acount_prompt_tokens_from_request(anthropic, request): - prompt_tokens = 0 - if hasattr(anthropic, "count_tokens"): - if request.get("prompt"): - prompt_tokens = await anthropic.count_tokens(request.get("prompt")) - elif messages := request.get("messages"): - prompt_tokens = 0 - for m in messages: - content = m.get("content") - if isinstance(content, str): - prompt_tokens += await anthropic.count_tokens(content) - elif isinstance(content, list): - for item in content: - # TODO: handle image and tool tokens - if isinstance(item, dict) and item.get("type") == "text": - prompt_tokens += await anthropic.count_tokens(item.get("text", "")) - return prompt_tokens - - -def run_async(method): - try: - loop = asyncio.get_running_loop() - except RuntimeError: - loop = None - - if loop and loop.is_running(): - thread = threading.Thread(target=lambda: asyncio.run(method)) - thread.start() - thread.join() - else: - asyncio.run(method) diff --git a/third_party/opentelemetry/instrumentation/anthropic/version.py b/third_party/opentelemetry/instrumentation/anthropic/version.py deleted file mode 100644 index 703f9571b..000000000 --- a/third_party/opentelemetry/instrumentation/anthropic/version.py +++ /dev/null @@ -1 +0,0 @@ -__version__ = "0.38.7"