diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index e9c8f2459..e47b6e7fb 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -112,6 +112,12 @@ class InstrumentorConfig(TypedDict): "class_name": "LanggraphInstrumentor", "min_version": "0.2.0", }, + "xpander_sdk": { + "module_name": "agentops.instrumentation.agentic.xpander", + "class_name": "XpanderInstrumentor", + "min_version": "1.0.0", + "package_name": "xpander-sdk", + }, } # Combine all target packages for monitoring diff --git a/agentops/instrumentation/agentic/xpander/__init__.py b/agentops/instrumentation/agentic/xpander/__init__.py new file mode 100644 index 000000000..007656dfa --- /dev/null +++ b/agentops/instrumentation/agentic/xpander/__init__.py @@ -0,0 +1,15 @@ +"""Xpander SDK instrumentation for AgentOps.""" + +from agentops.instrumentation.agentic.xpander.instrumentor import XpanderInstrumentor +from agentops.instrumentation.agentic.xpander.trace_probe import ( + wrap_openai_call_for_xpander, + is_xpander_session_active, + get_active_xpander_session, +) + +__all__ = [ + "XpanderInstrumentor", + "wrap_openai_call_for_xpander", + "is_xpander_session_active", + "get_active_xpander_session", +] diff --git a/agentops/instrumentation/agentic/xpander/context.py b/agentops/instrumentation/agentic/xpander/context.py new file mode 100644 index 000000000..abc9583c6 --- /dev/null +++ b/agentops/instrumentation/agentic/xpander/context.py @@ -0,0 +1,112 @@ +"""Xpander context management for session tracking.""" + +import time +import threading +from typing import Any, Dict, Optional + + +class XpanderContext: + """Context manager for Xpander sessions with nested conversation spans.""" + + def __init__(self): + self._sessions = {} # session_id -> session_data + self._workflow_spans = {} # session_id -> active workflow span + self._agent_spans = {} # session_id -> active agent span + self._conversation_spans = {} # session_id -> active conversation span + self._conversation_counters = {} # session_id -> conversation counter + self._lock = threading.Lock() + + def start_session(self, session_id: str, agent_info: Dict[str, Any], workflow_span=None, agent_span=None) -> None: + """Start a new session with agent info.""" + with self._lock: + self._sessions[session_id] = { + "agent_name": agent_info.get("agent_name", "unknown"), + "agent_id": agent_info.get("agent_id", "unknown"), + "task_input": agent_info.get("task_input"), + "phase": "planning", + "step_count": 0, + "total_tokens": 0, + "tools_executed": [], + "start_time": time.time(), + } + if workflow_span: + self._workflow_spans[session_id] = workflow_span + if agent_span: + self._agent_spans[session_id] = agent_span + + # Initialize conversation counter + self._conversation_counters[session_id] = 0 + + def start_conversation(self, session_id: str, conversation_span) -> None: + """Start a new conversation within the session.""" + with self._lock: + self._conversation_spans[session_id] = conversation_span + self._conversation_counters[session_id] = self._conversation_counters.get(session_id, 0) + 1 + + def end_conversation(self, session_id: str) -> None: + """End the current conversation.""" + with self._lock: + if session_id in self._conversation_spans: + del self._conversation_spans[session_id] + + def has_active_conversation(self, session_id: str) -> bool: + """Check if there's an active conversation for this session.""" + with self._lock: + return session_id in self._conversation_spans + + def get_conversation_counter(self, session_id: str) -> int: + """Get the current conversation counter.""" + with self._lock: + return self._conversation_counters.get(session_id, 0) + + def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: + """Get session data.""" + with self._lock: + return self._sessions.get(session_id) + + def update_session(self, session_id: str, updates: Dict[str, Any]) -> None: + """Update session data.""" + with self._lock: + if session_id in self._sessions: + self._sessions[session_id].update(updates) + + def end_session(self, session_id: str) -> None: + """End a session.""" + with self._lock: + if session_id in self._sessions: + del self._sessions[session_id] + if session_id in self._workflow_spans: + del self._workflow_spans[session_id] + if session_id in self._agent_spans: + del self._agent_spans[session_id] + if session_id in self._conversation_spans: + del self._conversation_spans[session_id] + if session_id in self._conversation_counters: + del self._conversation_counters[session_id] + + def get_workflow_phase(self, session_id: str) -> str: + """Detect current workflow phase based on state.""" + with self._lock: + session = self._sessions.get(session_id, {}) + + if session.get("tools_executed", []): + return "executing" + elif session.get("step_count", 0) > 0: + return "executing" + else: + return "planning" + + def get_workflow_span(self, session_id: str): + """Get the active workflow span for a session.""" + with self._lock: + return self._workflow_spans.get(session_id) + + def get_agent_span(self, session_id: str): + """Get the active agent span for a session.""" + with self._lock: + return self._agent_spans.get(session_id) + + def get_conversation_span(self, session_id: str): + """Get the active conversation span for a session.""" + with self._lock: + return self._conversation_spans.get(session_id) diff --git a/agentops/instrumentation/agentic/xpander/instrumentor.py b/agentops/instrumentation/agentic/xpander/instrumentor.py new file mode 100644 index 000000000..05d66c6a8 --- /dev/null +++ b/agentops/instrumentation/agentic/xpander/instrumentor.py @@ -0,0 +1,877 @@ +"""Xpander SDK instrumentation for AgentOps. + +This module provides instrumentation for the Xpander SDK, which uses JSII to convert +TypeScript code to Python at runtime. The instrumentation tracks agent sessions, +tool executions, and LLM interactions. + +MODIFIED VERSION: Using existing AgentOps utilities where possible while keeping +runtime-specific instrumentation logic that cannot be replaced. + +REPLACEMENTS MADE: +✅ Span creation: Using tracer.make_span() instead of manual span creation +✅ Error handling: Using _finish_span_success/_finish_span_error utilities +✅ Attribute management: Using existing SpanAttributeManager +✅ Serialization: Using safe_serialize and model_to_dict utilities +✅ Attribute setting: Using _update_span utility + +RUNTIME-SPECIFIC LOGIC KEPT (Cannot be replaced): +❌ Method wrapping: Runtime method creation requires custom hooks +❌ Context persistence: XpanderContext must handle runtime object lifecycle +❌ Agent detection: Custom logic for dynamically created agents +""" + +import logging +import time +import json +from typing import Any, Optional +from opentelemetry.metrics import Meter +from opentelemetry.trace import SpanKind as OTelSpanKind +from opentelemetry import trace + +# Use existing AgentOps utilities +from agentops.instrumentation.common import ( + CommonInstrumentor, + InstrumentorConfig, + StandardMetrics, +) +from agentops.instrumentation.common.span_management import SpanAttributeManager +from agentops.instrumentation.common.wrappers import _finish_span_success, _finish_span_error, _update_span +from agentops.helpers.serialization import safe_serialize, model_to_dict +from agentops.sdk.core import tracer +from agentops.instrumentation.agentic.xpander.context import XpanderContext +from agentops.semconv import SpanAttributes, SpanKind, ToolAttributes +from agentops.semconv.message import MessageAttributes + +# Use existing OpenAI attribute extraction patterns (lazy import to avoid circular imports) +# from agentops.instrumentation.providers.openai.attributes.common import ( +# get_response_attributes, +# ) + +logger = logging.getLogger(__name__) + +_instruments = ("xpander-sdk >= 1.0.0",) + + +# Use existing AgentOps utility instead of custom implementation +def safe_set_attribute(span, key: str, value: Any) -> None: + """Set attribute on span using existing AgentOps utility.""" + try: + _update_span(span, {key: value}) + except Exception as e: + logger.warning(f"Failed to set attribute {key}: {e}") + + +class XpanderInstrumentor(CommonInstrumentor): + """Instrumentor for Xpander SDK interactions.""" + + def __init__(self, config: Optional[InstrumentorConfig] = None): + if config is None: + config = InstrumentorConfig( + library_name="xpander-sdk", library_version="1.0.0", dependencies=_instruments, metrics_enabled=True + ) + super().__init__(config) + self._context = XpanderContext() + self._tracer = None + # Use existing AgentOps attribute manager + self._attribute_manager = SpanAttributeManager("xpander-service", "production") + + def _get_session_id_from_agent(self, agent) -> str: + """Generate consistent session ID from agent.""" + # First try to get memory_thread_id from agent context if available + if hasattr(agent, "memory_thread_id"): + return f"session_{agent.memory_thread_id}" + + # Check for execution context + if hasattr(agent, "execution") and hasattr(agent.execution, "memory_thread_id"): + return f"session_{agent.execution.memory_thread_id}" + + # Fallback to agent-based ID + agent_name = getattr(agent, "name", "unknown") + agent_id = getattr(agent, "id", "unknown") + return f"agent_{agent_name}_{agent_id}" + + def _extract_session_id(self, execution, agent=None) -> str: + """Extract session ID from execution data.""" + if isinstance(execution, dict): + if "memory_thread_id" in execution: + return f"session_{execution['memory_thread_id']}" + elif "thread_id" in execution: + return f"session_{execution['thread_id']}" + elif "session_id" in execution: + return f"session_{execution['session_id']}" + + # Fallback to agent-based ID if available + if agent: + return self._get_session_id_from_agent(agent) + + # Last resort fallback + return f"session_{int(time.time())}" + + def _extract_tool_name(self, tool_call) -> str: + """Extract tool name from tool call.""" + # Handle different tool call formats + if hasattr(tool_call, "function_name"): + return tool_call.function_name + elif hasattr(tool_call, "function") and hasattr(tool_call.function, "name"): + return tool_call.function.name + elif hasattr(tool_call, "name"): + return tool_call.name + elif isinstance(tool_call, dict): + if "function" in tool_call: + return tool_call["function"].get("name", "unknown") + elif "function_name" in tool_call: + return tool_call["function_name"] + elif "name" in tool_call: + return tool_call["name"] + + # Try to extract from string representation + import re + + patterns = [ + r'function[\'"]\s*:\s*[\'"]([^\'"]+)[\'"]', + r'name[\'"]\s*:\s*[\'"]([^\'"]+)[\'"]', + r"([a-zA-Z_][a-zA-Z0-9_]*)\.tool", + r'function_name[\'"]\s*:\s*[\'"]([^\'"]+)[\'"]', + ] + + tool_str = str(tool_call) + for pattern in patterns: + match = re.search(pattern, tool_str, re.IGNORECASE) + if match: + return match.group(1) + + return "unknown" + + def _extract_tool_params(self, tool_call) -> dict: + """Extract tool parameters from tool call.""" + # Handle different parameter formats + if hasattr(tool_call, "function") and hasattr(tool_call.function, "arguments"): + try: + args = tool_call.function.arguments + if isinstance(args, str): + return json.loads(args) + elif isinstance(args, dict): + return args + except (json.JSONDecodeError, AttributeError): + pass + elif hasattr(tool_call, "arguments"): + try: + args = tool_call.arguments + if isinstance(args, str): + return json.loads(args) + elif isinstance(args, dict): + return args + except (json.JSONDecodeError, AttributeError): + pass + elif isinstance(tool_call, dict): + if "function" in tool_call: + args = tool_call["function"].get("arguments", "{}") + try: + return json.loads(args) if isinstance(args, str) else args + except json.JSONDecodeError: + pass + elif "arguments" in tool_call: + args = tool_call["arguments"] + try: + return json.loads(args) if isinstance(args, str) else args + except json.JSONDecodeError: + pass + + return {} + + def _extract_llm_data_from_messages(self, messages) -> dict: + """Extract LLM metadata from messages.""" + data = {} + + if isinstance(messages, dict): + # Direct model and usage fields + if "model" in messages: + data["model"] = messages["model"] + if "usage" in messages: + data["usage"] = messages["usage"] + + # Check in choices array (OpenAI format) + if "choices" in messages and messages["choices"]: + choice = messages["choices"][0] + if "message" in choice: + message = choice["message"] + if "model" in message: + data["model"] = message["model"] + + elif isinstance(messages, list): + # Look for assistant messages with metadata + for msg in messages: + if isinstance(msg, dict) and msg.get("role") == "assistant": + if "model" in msg: + data["model"] = msg["model"] + if "usage" in msg: + data["usage"] = msg["usage"] + break + + # Try to extract from any nested structures + if not data and hasattr(messages, "__dict__"): + msg_dict = messages.__dict__ + if "model" in msg_dict: + data["model"] = msg_dict["model"] + if "usage" in msg_dict: + data["usage"] = msg_dict["usage"] + + return data + + def _extract_and_set_openai_message_attributes(self, span, messages, result, agent=None): + """Extract and set OpenAI message attributes from messages and response.""" + try: + # Manual extraction since we don't need the OpenAI module for this + # Try to get the agent's current message history for prompts + agent_messages = [] + if agent and hasattr(agent, "messages"): + agent_messages = getattr(agent, "messages", []) + elif agent and hasattr(agent, "conversation_history"): + agent_messages = getattr(agent, "conversation_history", []) + elif agent and hasattr(agent, "history"): + agent_messages = getattr(agent, "history", []) + + # Also try to extract messages from the messages parameter itself + if isinstance(messages, list): + # If messages is a list of messages, use it directly + agent_messages.extend(messages) + elif isinstance(messages, dict) and "messages" in messages: + # If messages contains a messages key + agent_messages.extend(messages.get("messages", [])) + + # Set prompt messages (input to LLM) + prompt_index = 0 + for msg in agent_messages[-10:]: # Get last 10 messages to avoid huge context + if isinstance(msg, dict): + role = msg.get("role", "user") + content = msg.get("content", "") + + # Handle different content formats + if content and isinstance(content, str) and content.strip(): + safe_set_attribute(span, MessageAttributes.PROMPT_ROLE.format(i=prompt_index), role) + safe_set_attribute( + span, MessageAttributes.PROMPT_CONTENT.format(i=prompt_index), content[:2000] + ) + prompt_index += 1 + elif content and isinstance(content, list): + # Handle multi-modal content + content_str = str(content)[:2000] + safe_set_attribute(span, MessageAttributes.PROMPT_ROLE.format(i=prompt_index), role) + safe_set_attribute(span, MessageAttributes.PROMPT_CONTENT.format(i=prompt_index), content_str) + prompt_index += 1 + elif hasattr(msg, "content"): + # Handle object with content attribute + content = getattr(msg, "content", "") + role = getattr(msg, "role", "user") + if content and isinstance(content, str) and content.strip(): + safe_set_attribute(span, MessageAttributes.PROMPT_ROLE.format(i=prompt_index), role) + safe_set_attribute( + span, MessageAttributes.PROMPT_CONTENT.format(i=prompt_index), str(content)[:2000] + ) + prompt_index += 1 + + # Set completion messages (response from LLM) + completion_index = 0 + response_data = result if result else messages + + # Handle different response formats + if isinstance(response_data, dict): + choices = response_data.get("choices", []) + for choice in choices: + message = choice.get("message", {}) + role = message.get("role", "assistant") + content = message.get("content", "") + + if content: + safe_set_attribute(span, MessageAttributes.COMPLETION_ROLE.format(i=completion_index), role) + safe_set_attribute( + span, MessageAttributes.COMPLETION_CONTENT.format(i=completion_index), content[:2000] + ) + + # Handle tool calls in the response + tool_calls = message.get("tool_calls", []) + for j, tool_call in enumerate(tool_calls): + tool_id = tool_call.get("id", "") + tool_name = tool_call.get("function", {}).get("name", "") + tool_args = tool_call.get("function", {}).get("arguments", "") + + if tool_id: + safe_set_attribute( + span, MessageAttributes.COMPLETION_TOOL_CALL_ID.format(i=completion_index, j=j), tool_id + ) + if tool_name: + safe_set_attribute( + span, + MessageAttributes.COMPLETION_TOOL_CALL_NAME.format(i=completion_index, j=j), + tool_name, + ) + if tool_args: + safe_set_attribute( + span, + MessageAttributes.COMPLETION_TOOL_CALL_ARGUMENTS.format(i=completion_index, j=j), + tool_args[:500], + ) + safe_set_attribute( + span, + MessageAttributes.COMPLETION_TOOL_CALL_TYPE.format(i=completion_index, j=j), + "function", + ) + + completion_index += 1 + elif hasattr(response_data, "choices"): + # Handle response object with choices attribute + choices = getattr(response_data, "choices", []) + for choice in choices: + message = getattr(choice, "message", None) + if message: + role = getattr(message, "role", "assistant") + content = getattr(message, "content", "") + + if content: + safe_set_attribute(span, MessageAttributes.COMPLETION_ROLE.format(i=completion_index), role) + safe_set_attribute( + span, + MessageAttributes.COMPLETION_CONTENT.format(i=completion_index), + str(content)[:2000], + ) + + # Handle tool calls + tool_calls = getattr(message, "tool_calls", []) + for j, tool_call in enumerate(tool_calls): + tool_id = getattr(tool_call, "id", "") + function = getattr(tool_call, "function", None) + if function: + tool_name = getattr(function, "name", "") + tool_args = getattr(function, "arguments", "") + + if tool_id: + safe_set_attribute( + span, + MessageAttributes.COMPLETION_TOOL_CALL_ID.format(i=completion_index, j=j), + tool_id, + ) + if tool_name: + safe_set_attribute( + span, + MessageAttributes.COMPLETION_TOOL_CALL_NAME.format(i=completion_index, j=j), + tool_name, + ) + if tool_args: + safe_set_attribute( + span, + MessageAttributes.COMPLETION_TOOL_CALL_ARGUMENTS.format( + i=completion_index, j=j + ), + str(tool_args)[:500], + ) + safe_set_attribute( + span, + MessageAttributes.COMPLETION_TOOL_CALL_TYPE.format(i=completion_index, j=j), + "function", + ) + + completion_index += 1 + + except Exception as e: + logger.error(f"Error extracting OpenAI message attributes: {e}") + + def _wrap_init_task(self, original_method): + """Wrap init_task and add_task to create agent span hierarchy.""" + instrumentor = self + + def wrapper(self, execution=None, input=None, **kwargs): + # Normalize parameters - handle both add_task(input=...) and init_task(execution=...) + if execution is None and input is not None: + # add_task call with input parameter - normalize to execution format + if isinstance(input, str): + execution = {"input": {"text": input}} + else: + execution = {"input": input} + elif execution is None: + # Neither execution nor input provided - create empty execution + execution = {} + + # Extract session ID and agent info + session_id = instrumentor._extract_session_id(execution) + agent_name = getattr(self, "name", "unknown") + agent_id = getattr(self, "id", "unknown") + + # Check if session already exists + existing_session = instrumentor._context.get_session(session_id) + if existing_session: + # Session already exists, just continue + # Call with original parameters + if input is not None: + result = original_method(self, input=input, **kwargs) + else: + result = original_method(self, execution) + return result + + # Extract task input + task_input = None + if isinstance(execution, dict): + if "input" in execution: + input_data = execution["input"] + if isinstance(input_data, dict) and "text" in input_data: + task_input = input_data["text"] + elif isinstance(input_data, str): + task_input = input_data + + # Create top-level conversation/session span - this is the ROOT span + conversation_span_attributes = { + SpanAttributes.AGENTOPS_ENTITY_NAME: f"Session - {agent_name}", + "xpander.span.type": "session", + "xpander.session.name": f"Session - {agent_name}", + "xpander.agent.name": agent_name, + "xpander.agent.id": agent_id, + "xpander.session.id": session_id, + } + session_span, session_ctx, session_token = tracer.make_span( + operation_name=f"session.{agent_name}", + span_kind=SpanKind.AGENT, # Use AGENT kind for the root session span + attributes=conversation_span_attributes, + ) + + # Set task input on session span + if task_input: + safe_set_attribute(session_span, SpanAttributes.AGENTOPS_ENTITY_INPUT, task_input[:1000]) + safe_set_attribute(session_span, "xpander.session.initial_input", task_input[:500]) + + # Create workflow span as child of session span (this will be the main execution span) + trace.set_span_in_context(session_span) + workflow_span_attributes = { + "xpander.span.type": "workflow", + "xpander.workflow.phase": "planning", + "xpander.agent.name": agent_name, + "xpander.agent.id": agent_id, + "xpander.session.id": session_id, + "agent.name": agent_name, + "agent.id": agent_id, + } + workflow_span, workflow_ctx, workflow_token = tracer.make_span( + operation_name=f"workflow.{agent_name}", + span_kind=SpanKind.WORKFLOW, + attributes=workflow_span_attributes, + ) + + # No separate agent span - workflow span contains all agent info + + # Initialize workflow state with persistent spans + agent_info = { + "agent_name": agent_name, + "agent_id": agent_id, + "task_input": task_input, + "thread_id": execution.get("memory_thread_id") if isinstance(execution, dict) else None, + } + instrumentor._context.start_session(session_id, agent_info, workflow_span, None) # No agent span + # Store the session span as well + instrumentor._context.start_conversation(session_id, session_span) + + try: + # Execute original method - don't end agent span here, it will be ended in retrieve_execution_result + # Call with original parameters + if input is not None: + result = original_method(self, input=input, **kwargs) + else: + result = original_method(self, execution) + return result + except Exception as e: + # Use existing AgentOps error handling utilities + _finish_span_error(workflow_span, e) + raise + + return wrapper + + def _wrap_run_tools(self, original_method): + """Wrap run_tools to create execution phase tool spans.""" + instrumentor = self + + def wrapper(self, tool_calls, payload_extension=None): + session_id = instrumentor._get_session_id_from_agent(self) + current_session = instrumentor._context.get_session(session_id) + + # Update workflow state + step_num = (current_session.get("step_count", 0) + 1) if current_session else 1 + instrumentor._context.update_session( + session_id, + { + "step_count": step_num, + "phase": "executing", + "tools_executed": (current_session.get("tools_executed", []) if current_session else []) + + [instrumentor._extract_tool_name(tc) for tc in tool_calls], + }, + ) + + # Get current span context (should be the LLM span) + current_span = trace.get_current_span() + + # Create execution phase span as child of current LLM span + execution_span_context = trace.set_span_in_context(current_span) if current_span else None + + with instrumentor._tracer.start_as_current_span( + "xpander.execution", + kind=OTelSpanKind.INTERNAL, + context=execution_span_context, + attributes={ + SpanAttributes.AGENTOPS_SPAN_KIND: SpanKind.TASK, + "xpander.span.type": "execution", + "xpander.workflow.phase": "executing", + "xpander.step.number": step_num, + "xpander.step.tool_count": len(tool_calls), + "xpander.session.id": session_id, + }, + ) as execution_span: + # Execute tools and create individual tool spans + results = [] + conversation_finished = False + + for i, tool_call in enumerate(tool_calls): + tool_name = instrumentor._extract_tool_name(tool_call) + tool_params = instrumentor._extract_tool_params(tool_call) + + # Check if this is the conversation finish tool + if tool_name == "xpfinish-agent-execution-finished": + conversation_finished = True + + start_time = time.time() + + # Create tool span as child of execution span + tool_span_context = trace.set_span_in_context(execution_span) + + with instrumentor._tracer.start_as_current_span( + f"tool.{tool_name}", + kind=OTelSpanKind.CLIENT, + context=tool_span_context, + attributes={ + SpanAttributes.AGENTOPS_SPAN_KIND: SpanKind.TOOL, + ToolAttributes.TOOL_NAME: tool_name, + ToolAttributes.TOOL_PARAMETERS: str(tool_params)[:500], + "xpander.span.type": "tool", + "xpander.workflow.phase": "executing", + "xpander.tool.step": step_num, + "xpander.tool.index": i, + }, + ) as tool_span: + # Execute single tool + single_result = original_method(self, [tool_call], payload_extension) + results.extend(single_result) + + # Record tool execution details + execution_time = time.time() - start_time + safe_set_attribute(tool_span, "xpander.tool.execution_time", execution_time) + + # Add tool result if available + if single_result: + result_summary = f"Executed successfully with {len(single_result)} results" + safe_set_attribute(tool_span, "xpander.tool.result_summary", result_summary) + + # Store actual result data using existing AgentOps utilities + try: + result_content = "" + + for i, result_item in enumerate(single_result): + # Handle xpander_sdk.ToolCallResult objects specifically + if hasattr(result_item, "__class__") and "ToolCallResult" in str(type(result_item)): + # Extract the actual result content from ToolCallResult + try: + if hasattr(result_item, "result") and result_item.result is not None: + actual_result = result_item.result + if isinstance(actual_result, str): + result_content += actual_result[:1000] + "\n" + else: + result_content += safe_serialize(actual_result)[:1000] + "\n" + elif hasattr(result_item, "data") and result_item.data is not None: + result_content += safe_serialize(result_item.data)[:1000] + "\n" + else: + # Fallback: try to find any content attribute + for attr_name in ["content", "output", "value", "response"]: + if hasattr(result_item, attr_name): + attr_value = getattr(result_item, attr_name) + if attr_value is not None: + result_content += safe_serialize(attr_value)[:1000] + "\n" + break + else: + # If no content attributes found, indicate this + result_content += "ToolCallResult object (no extractable content)\n" + except Exception as attr_e: + logger.debug(f"Error extracting from ToolCallResult: {attr_e}") + result_content += "ToolCallResult object (extraction failed)\n" + + # Handle regular objects and primitives + elif isinstance(result_item, (str, int, float, bool)): + result_content += str(result_item)[:1000] + "\n" + elif hasattr(result_item, "__dict__"): + # Convert objects to dict using existing utility + result_dict = model_to_dict(result_item) + result_content += safe_serialize(result_dict)[:1000] + "\n" + else: + # Use safe_serialize for consistent conversion + result_content += safe_serialize(result_item)[:1000] + "\n" + + if result_content.strip(): + final_content = result_content.strip()[:2000] + safe_set_attribute(tool_span, ToolAttributes.TOOL_RESULT, final_content) + else: + safe_set_attribute( + tool_span, ToolAttributes.TOOL_RESULT, "No extractable content found" + ) + + except Exception as e: + logger.error(f"Error setting tool result: {e}") + safe_set_attribute( + tool_span, ToolAttributes.TOOL_RESULT, f"Error capturing result: {e}" + ) + else: + safe_set_attribute(tool_span, "xpander.tool.result_summary", "No results returned") + + # If conversation is finished, mark for session closure + if conversation_finished: + # Since session span is now the conversation span, we need to close all spans + # when the conversation finishes + pass # Session closure will be handled in retrieve_execution_result + + return results + + return wrapper + + def _wrap_add_messages(self, original_method): + """Wrap add_messages to create LLM spans with proper parent-child relationship.""" + instrumentor = self + + def wrapper(self, messages): + session_id = instrumentor._get_session_id_from_agent(self) + current_session = instrumentor._context.get_session(session_id) + current_phase = instrumentor._context.get_workflow_phase(session_id) + workflow_span = instrumentor._context.get_workflow_span(session_id) + + # Create LLM span as child of workflow span (not conversation span) + # The hierarchy should be: session -> agent/workflow -> LLM -> execution -> tools + llm_span_context = trace.set_span_in_context(workflow_span) if workflow_span else None + + # Call original method first to get the actual OpenAI response + result = original_method(self, messages) + + # Now create a span that captures the LLM interaction with the actual response data + with instrumentor._tracer.start_as_current_span( + f"llm.{current_phase}", + kind=OTelSpanKind.CLIENT, + context=llm_span_context, + attributes={ + SpanAttributes.AGENTOPS_SPAN_KIND: SpanKind.LLM, + "xpander.span.type": "llm", + "xpander.workflow.phase": current_phase, + "xpander.session.id": session_id, + }, + ) as llm_span: + # Extract and set OpenAI message data from the messages and response + instrumentor._extract_and_set_openai_message_attributes(llm_span, messages, result, self) + + # Extract and set LLM metadata from the result if possible + llm_data = instrumentor._extract_llm_data_from_messages(result if result else messages) + if llm_data: + if "model" in llm_data: + safe_set_attribute(llm_span, SpanAttributes.LLM_REQUEST_MODEL, llm_data["model"]) + safe_set_attribute(llm_span, SpanAttributes.LLM_RESPONSE_MODEL, llm_data["model"]) + + if "usage" in llm_data: + usage = llm_data["usage"] + if "prompt_tokens" in usage: + safe_set_attribute(llm_span, SpanAttributes.LLM_USAGE_PROMPT_TOKENS, usage["prompt_tokens"]) + if "completion_tokens" in usage: + safe_set_attribute( + llm_span, SpanAttributes.LLM_USAGE_COMPLETION_TOKENS, usage["completion_tokens"] + ) + if "total_tokens" in usage: + safe_set_attribute(llm_span, SpanAttributes.LLM_USAGE_TOTAL_TOKENS, usage["total_tokens"]) + # Update workflow state + instrumentor._context.update_session( + session_id, + { + "total_tokens": (current_session.get("total_tokens", 0) if current_session else 0) + + usage["total_tokens"] + }, + ) + + return result + + return wrapper + + def _wrap_is_finished(self, original_method): + """Wrap is_finished to track workflow completion.""" + instrumentor = self + + def wrapper(self): + result = original_method(self) + + if result: + session_id = instrumentor._get_session_id_from_agent(self) + + # Update session to finished state + instrumentor._context.update_session(session_id, {"phase": "finished", "end_time": time.time()}) + + return result + + return wrapper + + def _wrap_extract_tool_calls(self, original_method): + """Wrap extract_tool_calls to track tool planning.""" + + def wrapper(self, messages): + result = original_method(self, messages) + return result + + return wrapper + + def _wrap_report_execution_metrics(self, original_method): + """Wrap report_execution_metrics to track metrics.""" + + def wrapper(self, llm_tokens=None, ai_model=None): + result = original_method(self, llm_tokens, ai_model) + return result + + return wrapper + + def _wrap_retrieve_execution_result(self, original_method): + """Wrap retrieve_execution_result to finalize agent and workflow spans.""" + instrumentor = self + + def wrapper(self): + session_id = instrumentor._get_session_id_from_agent(self) + current_session = instrumentor._context.get_session(session_id) + workflow_span = instrumentor._context.get_workflow_span(session_id) + session_span = instrumentor._context.get_conversation_span(session_id) # This is now the root session span + + try: + # Execute and capture result + result = original_method(self) + + # Add workflow summary to the persistent workflow span + if workflow_span and current_session: + safe_set_attribute( + workflow_span, "xpander.workflow.total_steps", current_session.get("step_count", 0) + ) + safe_set_attribute( + workflow_span, "xpander.workflow.total_tokens", current_session.get("total_tokens", 0) + ) + safe_set_attribute( + workflow_span, "xpander.workflow.tools_used", len(current_session.get("tools_executed", [])) + ) + + # Calculate total execution time + start_time = current_session.get("start_time", time.time()) + execution_time = time.time() - start_time + safe_set_attribute(workflow_span, "xpander.workflow.execution_time", execution_time) + safe_set_attribute(workflow_span, "xpander.workflow.phase", "completed") + + # Set result details on session and workflow spans + if result: + result_content = "" + if hasattr(result, "result"): + result_content = str(result.result)[:1000] + + # Set on session span (root span) + if session_span and result_content: + safe_set_attribute(session_span, SpanAttributes.AGENTOPS_ENTITY_OUTPUT, result_content) + safe_set_attribute(session_span, "xpander.session.final_result", result_content) + if hasattr(result, "memory_thread_id"): + safe_set_attribute(session_span, "xpander.session.thread_id", result.memory_thread_id) + + if workflow_span: + if result_content: + safe_set_attribute(workflow_span, "xpander.result.content", result_content) + if hasattr(result, "memory_thread_id"): + safe_set_attribute(workflow_span, "xpander.result.thread_id", result.memory_thread_id) + + # Add session summary to session span + if session_span and current_session: + safe_set_attribute( + session_span, "xpander.session.total_steps", current_session.get("step_count", 0) + ) + safe_set_attribute( + session_span, "xpander.session.total_tokens", current_session.get("total_tokens", 0) + ) + safe_set_attribute( + session_span, "xpander.session.tools_used", len(current_session.get("tools_executed", [])) + ) + + start_time = current_session.get("start_time", time.time()) + execution_time = time.time() - start_time + safe_set_attribute(session_span, "xpander.session.execution_time", execution_time) + + # Close all spans - session span should be closed last + if workflow_span: + _finish_span_success(workflow_span) + workflow_span.end() + + if session_span: + _finish_span_success(session_span) + session_span.end() + + return result + + except Exception as e: + # Mark spans as failed and close them in proper order + if workflow_span: + _finish_span_error(workflow_span, e) + workflow_span.end() + + if session_span: + _finish_span_error(session_span, e) + session_span.end() + raise + finally: + # Clean up session + instrumentor._context.end_session(session_id) + + return wrapper + + def _instrument(self, **kwargs): + """Instrument the Xpander SDK.""" + try: + # Import xpander modules + from xpander_sdk import Agent + + # Set up tracing using existing AgentOps tracer + self._tracer = tracer.get_tracer() + # Attribute manager already initialized in __init__ + + # Wrap Agent methods + Agent.add_task = self._wrap_init_task(Agent.add_task) + Agent.init_task = self._wrap_init_task(Agent.init_task) # Also wrap init_task for completeness + Agent.run_tools = self._wrap_run_tools(Agent.run_tools) + Agent.add_messages = self._wrap_add_messages(Agent.add_messages) + Agent.is_finished = self._wrap_is_finished(Agent.is_finished) + Agent.extract_tool_calls = self._wrap_extract_tool_calls(Agent.extract_tool_calls) + Agent.report_execution_metrics = self._wrap_report_execution_metrics(Agent.report_execution_metrics) + Agent.retrieve_execution_result = self._wrap_retrieve_execution_result(Agent.retrieve_execution_result) + + except ImportError: + logger.debug("Xpander SDK not available") + except Exception as e: + logger.error(f"Failed to instrument Xpander SDK: {e}") + + def _uninstrument(self, **kwargs): + """Uninstrument the Xpander SDK.""" + pass + + def _create_metrics(self, meter: Meter) -> StandardMetrics: + """Create metrics for Xpander instrumentation.""" + return StandardMetrics( + requests_active=meter.create_up_down_counter( + name="xpander_requests_active", + description="Number of active Xpander requests", + ), + requests_duration=meter.create_histogram( + name="xpander_requests_duration", + description="Duration of Xpander requests", + unit="s", + ), + requests_total=meter.create_counter( + name="xpander_requests_total", + description="Total number of Xpander requests", + ), + requests_error=meter.create_counter( + name="xpander_requests_error", + description="Number of Xpander request errors", + ), + ) diff --git a/agentops/instrumentation/agentic/xpander/trace_probe.py b/agentops/instrumentation/agentic/xpander/trace_probe.py new file mode 100644 index 000000000..339352fda --- /dev/null +++ b/agentops/instrumentation/agentic/xpander/trace_probe.py @@ -0,0 +1,86 @@ +"""Xpander trace probe for automatic instrumentation activation. + +This module provides automatic instrumentation for Xpander SDK when imported. +It should be imported early in the application lifecycle to ensure all +Xpander interactions are captured. +""" + +import logging +from agentops.instrumentation.agentic.xpander.instrumentor import XpanderInstrumentor + +logger = logging.getLogger(__name__) + +# Global instrumentor instance +_instrumentor = None + + +def activate_xpander_instrumentation(): + """Activate Xpander instrumentation.""" + global _instrumentor + + if _instrumentor is None: + try: + _instrumentor = XpanderInstrumentor() + _instrumentor.instrument() + logger.info("Xpander instrumentation activated successfully") + except Exception as e: + logger.error(f"Failed to activate Xpander instrumentation: {e}") + _instrumentor = None + + return _instrumentor + + +def deactivate_xpander_instrumentation(): + """Deactivate Xpander instrumentation.""" + global _instrumentor + + if _instrumentor is not None: + try: + _instrumentor.uninstrument() + logger.info("Xpander instrumentation deactivated successfully") + except Exception as e: + logger.error(f"Failed to deactivate Xpander instrumentation: {e}") + finally: + _instrumentor = None + + +def get_instrumentor(): + """Get the active instrumentor instance.""" + return _instrumentor + + +# Stub functions for backward compatibility +def wrap_openai_call_for_xpander(openai_call_func, purpose="general"): + """Backward compatibility stub - functionality now handled by auto-instrumentation.""" + logger.debug(f"wrap_openai_call_for_xpander called with purpose: {purpose}") + return openai_call_func + + +def is_xpander_session_active(): + """Check if xpander session is active.""" + return _instrumentor is not None + + +def get_active_xpander_session(): + """Get active xpander session.""" + return _instrumentor._context if _instrumentor else None + + +# Convenience functions for cleaner OpenAI integration +def wrap_openai_analysis(openai_call_func): + """Wrap OpenAI calls for analysis/reasoning steps.""" + return wrap_openai_call_for_xpander(openai_call_func, "analysis") + + +def wrap_openai_planning(openai_call_func): + """Wrap OpenAI calls for planning steps.""" + return wrap_openai_call_for_xpander(openai_call_func, "planning") + + +def wrap_openai_synthesis(openai_call_func): + """Wrap OpenAI calls for synthesis/summary steps.""" + return wrap_openai_call_for_xpander(openai_call_func, "synthesis") + + +# Note: Auto-activation is now handled by the main AgentOps instrumentation system +# activate_xpander_instrumentation() diff --git a/agentops/instrumentation/agentic/xpander/version.py b/agentops/instrumentation/agentic/xpander/version.py new file mode 100644 index 000000000..60d516f2f --- /dev/null +++ b/agentops/instrumentation/agentic/xpander/version.py @@ -0,0 +1,3 @@ +"""Version information for xpander instrumentation.""" + +__version__ = "1.0.0" diff --git a/docs/mint.json b/docs/mint.json index ac0052bae..c995598b9 100644 --- a/docs/mint.json +++ b/docs/mint.json @@ -28,16 +28,25 @@ "icon": "lines-leaning" }, "anchors": [], - "versions": ["v2", "v1", "v0"], + "versions": [ + "v2", + "v1", + "v0" + ], "navigation": [ { "group": "", - "pages": ["v1/introduction"], + "pages": [ + "v1/introduction" + ], "version": "v1" }, { "group": "Getting Started", - "pages": ["v1/quickstart", "v1/examples/examples"], + "pages": [ + "v1/quickstart", + "v1/examples/examples" + ], "version": "v1" }, { @@ -94,17 +103,26 @@ }, { "group": "Other Info", - "pages": ["v1/concepts/host-env"], + "pages": [ + "v1/concepts/host-env" + ], "version": "v1" }, { "group": "", - "pages": ["v2/introduction"], + "pages": [ + "v2/introduction" + ], "version": "v2" }, { "group": "Getting Started", - "pages": ["v2/quickstart", "v2/examples/examples", "v2/usage/mcp-docs", "v2/usage/mcp-server"], + "pages": [ + "v2/quickstart", + "v2/examples/examples", + "v2/usage/mcp-docs", + "v2/usage/mcp-server" + ], "version": "v2" }, { @@ -138,7 +156,8 @@ "v2/integrations/openai_agents_js", "v2/integrations/smolagents", "v2/integrations/ibm_watsonx_ai", - "v2/integrations/xai" + "v2/integrations/xai", + "v2/integrations/xpander" ], "version": "v2" }, @@ -162,7 +181,9 @@ }, { "group": "Other Info", - "pages": ["v2/concepts/host-env"], + "pages": [ + "v2/concepts/host-env" + ], "version": "v2" } ], @@ -177,4 +198,4 @@ "apiHost": "https://us.i.posthog.com" } } -} +} \ No newline at end of file diff --git a/docs/v2/examples/xpander.mdx b/docs/v2/examples/xpander.mdx new file mode 100644 index 000000000..62d76219a --- /dev/null +++ b/docs/v2/examples/xpander.mdx @@ -0,0 +1,38 @@ +--- +title: 'Xpander' +description: 'Xpander coding agent example with AgentOps' +--- +{/* SOURCE_FILE: examples/xpander/coding_agent.py */} + +_View Python Example on Github_ + +This example demonstrates a complete Xpander coding agent implementation with AgentOps instrumentation using callback handlers. The example shows how to build a single-file agent that combines the MyAgent class and XpanderEventListener for comprehensive monitoring. + +## Key Features + +- **Single-file implementation** combining agent logic and event handling +- **Callback-based instrumentation** using XpanderEventListener +- **Async tool execution** with proper error handling and logging +- **AgentOps integration** with custom trace names and tags + +## Running the Example + +1. Set up your environment variables in a `.env` file or export them: + ```bash + AGENTOPS_API_KEY=your_agentops_api_key + XPANDER_API_KEY=your_xpander_api_key + XPANDER_AGENT_ID=your_agent_id + OPENAI_API_KEY=your_openai_api_key + ``` + +2. Run the agent: + ```bash + python examples/xpander/coding_agent.py + ``` + +The agent will start an interactive session where you can ask coding questions and see the results tracked in your AgentOps dashboard. + + + + + \ No newline at end of file diff --git a/docs/v2/integrations/xpander.mdx b/docs/v2/integrations/xpander.mdx new file mode 100644 index 000000000..5407e5b5f --- /dev/null +++ b/docs/v2/integrations/xpander.mdx @@ -0,0 +1,266 @@ +--- +title: 'Xpander' +description: 'Monitor and analyze your Xpander agent workflows with automatic AgentOps instrumentation' +--- + +[Xpander](https://xpander.ai/) is a powerful platform for building and deploying AI agents with sophisticated workflow management capabilities. AgentOps provides seamless integration with the Xpander SDK, automatically instrumenting all agent activities, tool executions, and LLM interactions without any manual setup. + +## Installation + +Install AgentOps and the Xpander SDK, along with the required dependencies: + + + ```bash pip + pip install agentops xpander-sdk xpander-utils openai python-dotenv loguru + ``` + ```bash poetry + poetry add agentops xpander-sdk xpander-utils openai python-dotenv loguru + ``` + ```bash uv + uv add agentops xpander-sdk xpander-utils openai python-dotenv loguru + ``` + + +## Setting Up API Keys + +You'll need API keys for AgentOps, Xpander, and OpenAI: +- **AGENTOPS_API_KEY**: From your [AgentOps Dashboard](https://app.agentops.ai/) +- **XPANDER_API_KEY**: From your [Xpander Dashboard](https://app.xpander.ai/) +- **XPANDER_AGENT_ID**: The ID of your Xpander agent +- **OPENAI_API_KEY**: From the [OpenAI Platform](https://platform.openai.com/api-keys) + +Set these as environment variables or in a `.env` file: + + + ```bash Export to CLI + export AGENTOPS_API_KEY="your_agentops_api_key_here" + export XPANDER_API_KEY="your_xpander_api_key_here" + export XPANDER_AGENT_ID="your_xpander_agent_id_here" + export OPENAI_API_KEY="your_openai_api_key_here" + ``` + ```txt Set in .env file + AGENTOPS_API_KEY="your_agentops_api_key_here" + XPANDER_API_KEY="your_xpander_api_key_here" + XPANDER_AGENT_ID="your_xpander_agent_id_here" + OPENAI_API_KEY="your_openai_api_key_here" + ``` + + +You can also store your configuration in a `xpander_config.json` file: + +```json +{ + "api_key": "your_xpander_api_key_here", + "agent_id": "your_xpander_agent_id_here" +} +``` + +## Quick Start + +The key to AgentOps + Xpander integration is **initialization order**: Initialize AgentOps **before** importing the Xpander SDK to enable automatic instrumentation. + + +The following example shows the callback-based integration pattern. For a complete working example, see our [Xpander example](/v2/examples/xpander). + + +```python +# ruff: noqa: E402 +import os +import json +import asyncio +from pathlib import Path +from dotenv import load_dotenv + +# Load environment variables first +load_dotenv() + +# 1. Initialize AgentOps FIRST (this enables auto-instrumentation) +import agentops +agentops.init( + api_key=os.getenv("AGENTOPS_API_KEY"), + trace_name="my-xpander-coding-agent-callbacks", + default_tags=["xpander", "coding-agent", "callbacks"], +) + +# 2. Now import Xpander SDK (instrumentation will automatically activate) +from xpander_sdk import XpanderClient, LLMProvider, LLMTokens, Tokens, Agent, ExecutionStatus +from xpander_utils.events import XpanderEventListener, AgentExecutionResult, AgentExecution +from openai import AsyncOpenAI + +class MyAgent: + def __init__(self): + # Load config + config_path = Path(__file__).parent / "xpander_config.json" + config = json.loads(config_path.read_text()) + + # Get API keys + xpander_key = config.get("api_key") or os.getenv("XPANDER_API_KEY") + agent_id = config.get("agent_id") or os.getenv("XPANDER_AGENT_ID") + openai_key = os.getenv("OPENAI_API_KEY") + + # Initialize clients + self.openai = AsyncOpenAI(api_key=openai_key) + xpander_client = XpanderClient(api_key=xpander_key) + self.agent_backend: Agent = xpander_client.agents.get(agent_id=agent_id) + self.agent_backend.select_llm_provider(LLMProvider.OPEN_AI) + + async def run(self, user_input: str) -> dict: + tokens = Tokens(worker=LLMTokens(0, 0, 0)) + + while not self.agent_backend.is_finished(): + # Call LLM + response = await self.openai.chat.completions.create( + model="gpt-4", + messages=self.agent_backend.messages, + tools=self.agent_backend.get_tools(), + tool_choice=self.agent_backend.tool_choice, + temperature=0, + ) + + # Track tokens + if hasattr(response, "usage"): + tokens.worker.prompt_tokens += response.usage.prompt_tokens + tokens.worker.completion_tokens += response.usage.completion_tokens + tokens.worker.total_tokens += response.usage.total_tokens + + # Add response to agent context + self.agent_backend.add_messages(response.model_dump()) + self.agent_backend.report_execution_metrics(llm_tokens=tokens, ai_model="gpt-4") + + # Execute any tool calls + tool_calls = self.agent_backend.extract_tool_calls(response.model_dump()) + if tool_calls: + tool_results = await asyncio.to_thread(self.agent_backend.run_tools, tool_calls) + + result = self.agent_backend.retrieve_execution_result() + return {"result": result.result, "thread_id": result.memory_thread_id} + +# Set up event listener with callback handlers +listener = XpanderEventListener( + api_key=os.getenv("XPANDER_API_KEY"), + agent_id=os.getenv("XPANDER_AGENT_ID") +) + +async def on_execution_request(execution_task: AgentExecution) -> AgentExecutionResult: + agent = MyAgent() + agent.agent_backend.init_task(execution=execution_task.model_dump()) + + try: + await agent.run(execution_task.input.text) + execution_result = agent.agent_backend.retrieve_execution_result() + return AgentExecutionResult( + result=execution_result.result, + is_success=execution_result.status == ExecutionStatus.COMPLETED, + ) + except Exception as e: + print(f"Error: {e}") + raise + +# Register the callback +listener.register(on_execution_request=on_execution_request) +``` + +## What's Automatically Tracked + +AgentOps automatically captures comprehensive telemetry from your Xpander agents: + +### 🤖 Agent Activities +- Agent initialization and configuration +- Task lifecycle (start, execution steps, completion) +- Workflow phase transitions (planning → executing → finished) +- Session management and context persistence + +### 🧠 LLM Interactions +- All OpenAI API calls with full request/response data +- Token usage and cost tracking across models +- Conversation history and context management +- Model parameters and settings + +### 🛠️ Tool Executions +- Tool call detection with parameters and arguments +- Tool execution results and success/failure status +- Tool performance metrics and timing +- Tool call hierarchies and dependencies + +### 📊 Performance Metrics +- End-to-end execution duration and timing +- Step-by-step workflow progression +- Resource utilization and efficiency metrics +- Error handling and exception tracking + +## Key Features + +### ✅ Zero-Configuration Setup +No manual trace creation or span management required. Simply initialize AgentOps before importing Xpander SDK. + +### ✅ Complete Workflow Visibility +Track the entire agent execution flow from task initiation to completion, including all intermediate steps. + +### ✅ Real-time Monitoring +View your agent activities in real-time on the AgentOps dashboard as they execute. + +### ✅ Tool Execution Insights +Monitor which tools are being called, their parameters, execution time, and results. + +### ✅ Cost Tracking +Automatic token usage tracking for all LLM interactions with cost analysis. + +## Callback Handler Pattern + +The Xpander integration supports two main patterns: + +1. **Direct Integration**: Directly instrument your agent code (shown above) +2. **Callback Handler**: Use XpanderEventListener for webhook-style integration + +The callback handler pattern is particularly useful for: +- Production deployments with centralized monitoring +- Multi-agent orchestration systems +- Event-driven architectures + +## Runtime-Specific Instrumentation + +Xpander SDK uses JSII to create methods at runtime, which requires specialized instrumentation. AgentOps handles this automatically by: + +- **Method Wrapping**: Dynamically wrapping agent methods as they're created +- **Context Persistence**: Maintaining session context across runtime object lifecycle +- **Agent Detection**: Automatically detecting and instrumenting new agent instances +- **Tool Result Extraction**: Properly extracting results from JSII object references + +## Troubleshooting + +### Import Order Issues +If you're not seeing traces, ensure AgentOps is initialized before importing Xpander SDK: + +```python +# ✅ Correct order +import agentops +agentops.init() +from xpander_sdk import XpanderClient + +# ❌ Incorrect order +from xpander_sdk import XpanderClient +import agentops +agentops.init() # Too late - instrumentation won't activate +``` + +### Missing Tool Results +If tool results show `{"__jsii_ref__": "..."}` instead of actual content, ensure you're using the latest version of AgentOps, which includes improved JSII object handling. + +### Import Errors (E402) +If you see linting errors about imports not being at the top of the file, this is expected for Xpander integration. Add `# ruff: noqa: E402` at the top of your file to suppress these warnings, as the import order is required for proper instrumentation. + +## Examples + + + + Complete single-file implementation with callback handlers + + + View the complete source code and configuration files + + + + + + + \ No newline at end of file diff --git a/examples/xpander/coding_agent.py b/examples/xpander/coding_agent.py new file mode 100644 index 000000000..77bf72d03 --- /dev/null +++ b/examples/xpander/coding_agent.py @@ -0,0 +1,178 @@ +""" +Copyright (c) 2025 Xpander, Inc. All rights reserved. +Modified to use AgentOps callback handlers for tool instrumentation. +Single-file implementation combining MyAgent and XpanderEventListener. +""" +# ruff: noqa: E402 + +import asyncio +import json +import os +import sys +import time +from pathlib import Path +from dotenv import load_dotenv +from loguru import logger + +load_dotenv() + +import agentops + +print("🔧 Initializing AgentOps...") +agentops.init( + api_key=os.getenv("AGENTOPS_API_KEY"), + trace_name="my-xpander-coding-agent-callbacks", + default_tags=["xpander", "coding-agent", "callbacks"], +) +print("✅ AgentOps initialized") + +print("📦 Importing xpander_sdk...") +from xpander_sdk import XpanderClient, LLMProvider, LLMTokens, Tokens, Agent +from xpander_utils.events import XpanderEventListener, AgentExecutionResult, AgentExecution, ExecutionStatus +from openai import AsyncOpenAI + +# Simple logger setup +logger.remove() +logger.add(sys.stderr, format="{time:HH:mm:ss} | {message}", level="INFO") + + +class MyAgent: + def __init__(self): + logger.info("🚀 Initializing MyAgent...") + + # Load config + config_path = Path(__file__).parent / "xpander_config.json" + config = json.loads(config_path.read_text()) + + # Get API keys + xpander_key = config.get("api_key") or os.getenv("XPANDER_API_KEY") + agent_id = config.get("agent_id") or os.getenv("XPANDER_AGENT_ID") + openai_key = os.getenv("OPENAI_API_KEY") + + if not all([xpander_key, agent_id, openai_key]): + raise ValueError("Missing required API keys") + + # Initialize + self.openai = AsyncOpenAI(api_key=openai_key) + xpander_client = XpanderClient(api_key=xpander_key) + self.agent_backend: Agent = xpander_client.agents.get(agent_id=agent_id) + self.agent_backend.select_llm_provider(LLMProvider.OPEN_AI) + + logger.info(f"Agent: {self.agent_backend.name}") + logger.info(f"Tools: {len(self.agent_backend.tools)} available") + logger.info("✅ Ready!") + + async def run(self, user_txt_input: str) -> dict: + step = 0 + start_time = time.perf_counter() + tokens = Tokens(worker=LLMTokens(0, 0, 0)) + try: + while not self.agent_backend.is_finished(): + step += 1 + logger.info(f"Step {step} - Calling LLM...") + response = await self.openai.chat.completions.create( + model="gpt-4.1", + messages=self.agent_backend.messages, + tools=self.agent_backend.get_tools(), + tool_choice=self.agent_backend.tool_choice, + temperature=0, + ) + if hasattr(response, "usage"): + tokens.worker.prompt_tokens += response.usage.prompt_tokens + tokens.worker.completion_tokens += response.usage.completion_tokens + tokens.worker.total_tokens += response.usage.total_tokens + + self.agent_backend.add_messages(response.model_dump()) + self.agent_backend.report_execution_metrics(llm_tokens=tokens, ai_model="gpt-4.1") + tool_calls = self.agent_backend.extract_tool_calls(response.model_dump()) + + if tool_calls: + logger.info(f"Executing {len(tool_calls)} tools...") + tool_results = await asyncio.to_thread(self.agent_backend.run_tools, tool_calls) + for res in tool_results: + emoji = "✅" if res.is_success else "❌" + logger.info(f"Tool result: {emoji} {res.function_name}") + + duration = time.perf_counter() - start_time + logger.info(f"Done! Duration: {duration:.1f}s | Total tokens: {tokens.worker.total_tokens}") + result = self.agent_backend.retrieve_execution_result() + return {"result": result.result, "thread_id": result.memory_thread_id} + except Exception as e: + logger.error(f"Exception: {e}") + raise + + +# === Load Configuration === +logger.info("[xpander_handler] Loading xpander_config.json") +config_path = Path(__file__).parent / "xpander_config.json" +with open(config_path, "r") as config_file: + xpander_config: dict = json.load(config_file) +logger.info(f"[xpander_handler] Loaded config: {xpander_config}") + +# === Initialize Event Listener === +logger.info(f"[xpander_handler] Initializing XpanderEventListener with config: {xpander_config}") +listener = XpanderEventListener(**xpander_config) +logger.info(f"[xpander_handler] Listener initialized: {listener}") + + +# === Define Execution Handler === +async def on_execution_request(execution_task: AgentExecution) -> AgentExecutionResult: + logger.info(f"[on_execution_request] Called with execution_task: {execution_task}") + my_agent = MyAgent() + logger.info(f"[on_execution_request] Instantiated MyAgent: {my_agent}") + + user_info = "" + user = getattr(execution_task.input, "user", None) + + if user: + name = f"{user.first_name} {user.last_name}".strip() + email = getattr(user, "email", "") + user_info = f"👤 From user: {name}\n📧 Email: {email}" + + IncomingEvent = f"\n📨 Incoming message: {execution_task.input.text}\n" f"{user_info}" + + logger.info(f"[on_execution_request] IncomingEvent: {IncomingEvent}") + logger.info(f"[on_execution_request] Calling agent_backend.init_task with execution={execution_task.model_dump()}") + my_agent.agent_backend.init_task(execution=execution_task.model_dump()) + + # extract just the text input for quick start purpose. for more robust use the object + user_txt_input = execution_task.input.text + logger.info(f"[on_execution_request] Running agent with user_txt_input: {user_txt_input}") + try: + await my_agent.run(user_txt_input) + logger.info("[on_execution_request] Agent run completed") + execution_result = my_agent.agent_backend.retrieve_execution_result() + logger.info(f"[on_execution_request] Execution result: {execution_result}") + result_obj = AgentExecutionResult( + result=execution_result.result, + is_success=execution_result.status == ExecutionStatus.COMPLETED, + ) + logger.info(f"[on_execution_request] Returning AgentExecutionResult: {result_obj}") + return result_obj + except Exception as e: + logger.error(f"[on_execution_request] Exception: {e}") + raise + finally: + logger.info("[on_execution_request] Exiting handler") + + +# === Register Callback === +logger.info("[xpander_handler] Registering on_execution_request callback") +listener.register(on_execution_request=on_execution_request) +logger.info("[xpander_handler] Callback registered") + + +# Example usage for direct interaction +if __name__ == "__main__": + + async def main(): + agent = MyAgent() + while True: + task = input("\nAsk Anything (Type exit to end) \nInput: ") + if task.lower() == "exit": + break + agent.agent_backend.add_task(input=task) + result = await agent.run(task) + print(f"\nResult: {result['result']}") + + asyncio.run(main())