diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index 2df5c7a9a..f46bc192a 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -110,7 +110,7 @@ class InstrumentorConfig(TypedDict): "agno": { "module_name": "agentops.instrumentation.agentic.agno", "class_name": "AgnoInstrumentor", - "min_version": "0.1.0", + "min_version": "1.5.8", }, "smolagents": { "module_name": "agentops.instrumentation.agentic.smolagents", diff --git a/agentops/instrumentation/agentic/agno/__init__.py b/agentops/instrumentation/agentic/agno/__init__.py index 4d56ff728..4cf71d2f5 100644 --- a/agentops/instrumentation/agentic/agno/__init__.py +++ b/agentops/instrumentation/agentic/agno/__init__.py @@ -8,7 +8,7 @@ logger = logging.getLogger(__name__) # Library information -_library_info = LibraryInfo(name="agno") +_library_info = LibraryInfo(name="agno", default_version="1.5.8") LIBRARY_NAME = _library_info.name LIBRARY_VERSION = _library_info.version diff --git a/agentops/instrumentation/agentic/agno/attributes/__init__.py b/agentops/instrumentation/agentic/agno/attributes/__init__.py index 377f465a7..a4fc7f09a 100644 --- a/agentops/instrumentation/agentic/agno/attributes/__init__.py +++ b/agentops/instrumentation/agentic/agno/attributes/__init__.py @@ -1,14 +1,20 @@ -"""Agno Agent attributes package for span instrumentation.""" +"""Agno instrumentation attribute handlers.""" from .agent import get_agent_run_attributes +from .metrics import get_metrics_attributes from .team import get_team_run_attributes from .tool import get_tool_execution_attributes -from .workflow import get_workflow_run_attributes, get_workflow_session_attributes +from .workflow import get_workflow_run_attributes, get_workflow_session_attributes, get_workflow_cache_attributes +from .storage import get_storage_read_attributes, get_storage_write_attributes __all__ = [ "get_agent_run_attributes", + "get_metrics_attributes", "get_team_run_attributes", "get_tool_execution_attributes", "get_workflow_run_attributes", "get_workflow_session_attributes", + "get_workflow_cache_attributes", + "get_storage_read_attributes", + "get_storage_write_attributes", ] diff --git a/agentops/instrumentation/agentic/agno/attributes/agent.py b/agentops/instrumentation/agentic/agno/attributes/agent.py index 5d0f3181c..acbd36ac6 100644 --- a/agentops/instrumentation/agentic/agno/attributes/agent.py +++ b/agentops/instrumentation/agentic/agno/attributes/agent.py @@ -1,10 +1,10 @@ """Agno Agent run attributes handler.""" from typing import Optional, Tuple, Dict, Any - from agentops.instrumentation.common.attributes import AttributeMap -from agentops.semconv import SpanAttributes, WorkflowAttributes, AgentAttributes, ToolAttributes +from agentops.semconv import SpanAttributes, AgentAttributes, ToolAttributes from agentops.semconv.span_kinds import SpanKind as AgentOpsSpanKind +import json def get_agent_run_attributes( @@ -28,174 +28,164 @@ def get_agent_run_attributes( agent_name = None # Base attributes - attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = AgentOpsSpanKind.WORKFLOW + attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = AgentOpsSpanKind.AGENT attributes[SpanAttributes.LLM_SYSTEM] = "agno" - attributes[SpanAttributes.LLM_REQUEST_STREAMING] = "False" - # AgentOps entity attributes (matching CrewAI pattern) - attributes[SpanAttributes.AGENTOPS_ENTITY_NAME] = "Agent" + # AgentOps entity attributes + attributes[SpanAttributes.AGENTOPS_ENTITY_NAME] = "agent" # Extract agent information from args[0] (self) if args and len(args) >= 1: agent = args[0] - # Core agent identification using AgentAttributes + # Core agent identification - set directly at root level if hasattr(agent, "agent_id") and agent.agent_id: agent_id = str(agent.agent_id) attributes[AgentAttributes.AGENT_ID] = agent_id - attributes["agno.agent.id"] = agent_id if hasattr(agent, "name") and agent.name: agent_name = str(agent.name) attributes[AgentAttributes.AGENT_NAME] = agent_name - attributes["agno.agent.name"] = agent_name if hasattr(agent, "role") and agent.role: agent_role = str(agent.role) attributes[AgentAttributes.AGENT_ROLE] = agent_role - attributes["agno.agent.role"] = agent_role # Check if agent is part of a team if hasattr(agent, "_team") and agent._team: team = agent._team if hasattr(team, "name") and team.name: - attributes["agno.agent.parent_team"] = str(team.name) - attributes["agno.agent.parent_team_display"] = f"Under {team.name}" + attributes["agent.parent_team"] = str(team.name) + attributes["agent.parent_team_display"] = f"Under {team.name}" if hasattr(team, "team_id") and team.team_id: - attributes["agno.agent.parent_team_id"] = str(team.team_id) + attributes["agent.parent_team_id"] = str(team.team_id) - # Model information using AgentAttributes + # Model information - if hasattr(agent, "model") and agent.model: model = agent.model if hasattr(model, "id"): model_id = str(model.id) - attributes[AgentAttributes.AGENT_MODELS] = model_id - attributes["agno.agent.model_id"] = model_id + attributes[SpanAttributes.LLM_REQUEST_MODEL] = model_id attributes[SpanAttributes.LLM_RESPONSE_MODEL] = model_id if hasattr(model, "provider"): model_provider = str(model.provider) - attributes["agno.agent.model_provider"] = model_provider - attributes[SpanAttributes.LLM_REQUEST_MODEL] = model_id if hasattr(model, "id") else "unknown" - - # Agent configuration details - agent_config = {} + attributes["agent.model_provider"] = model_provider + # Agent configuration details - set directly at root level if hasattr(agent, "description") and agent.description: - agent_config["description"] = str(agent.description)[:500] # Limit length - + attributes["agent.description"] = str(agent.description) if hasattr(agent, "goal") and agent.goal: - agent_config["goal"] = str(agent.goal)[:500] # Limit length + attributes["agent.goal"] = str(agent.goal) if hasattr(agent, "instructions") and agent.instructions: if isinstance(agent.instructions, list): - agent_config["instructions"] = " | ".join(str(i) for i in agent.instructions[:3]) # First 3 + attributes["agent.instruction"] = " | ".join(str(i) for i in agent.instructions) else: - agent_config["instructions"] = str(agent.instructions)[:500] - + attributes["agent.instruction"] = str(agent.instructions) if hasattr(agent, "expected_output") and agent.expected_output: - agent_config["expected_output"] = str(agent.expected_output)[:300] + attributes["agent.expected_output"] = str(agent.expected_output) if hasattr(agent, "markdown"): - agent_config["markdown"] = str(agent.markdown) + attributes["agent.markdown"] = str(agent.markdown) if hasattr(agent, "reasoning"): - agent_config["reasoning"] = str(agent.reasoning) + attributes[AgentAttributes.AGENT_REASONING] = str(agent.reasoning) if hasattr(agent, "stream"): - agent_config["stream"] = str(agent.stream) - - if hasattr(agent, "retries"): - agent_config["max_retry_limit"] = str(agent.retries) - - if hasattr(agent, "response_model") and agent.response_model: - agent_config[SpanAttributes.LLM_RESPONSE_MODEL] = str(agent.response_model.__name__) + attributes["agent.stream"] = str(agent.stream) if hasattr(agent, "show_tool_calls"): - agent_config["show_tool_calls"] = str(agent.show_tool_calls) + attributes["agent.show_tool_calls"] = str(agent.show_tool_calls) if hasattr(agent, "tool_call_limit") and agent.tool_call_limit: - agent_config["tool_call_limit"] = str(agent.tool_call_limit) - - # Add agent config to attributes - for key, value in agent_config.items(): - attributes[f"agno.agent.{key}"] = value + attributes["agent.tool_call_limit"] = str(agent.tool_call_limit) # Tools information if hasattr(agent, "tools") and agent.tools: - tools_info = [] - tool_names = [] + # Set tool count based on actual number of tools + attributes["agent.tools_count"] = str(len(agent.tools)) - for tool in agent.tools: - tool_dict = {} + # Collect all tool names for the AGENT_TOOLS attribute + tool_names = [] + if len(agent.tools) == 1: + # Single tool - set directly at root level + tool = agent.tools[0] + tool_name = None if hasattr(tool, "name"): tool_name = str(tool.name) - tool_dict["name"] = tool_name - tool_names.append(tool_name) + attributes[ToolAttributes.TOOL_NAME] = tool_name elif hasattr(tool, "__name__"): tool_name = str(tool.__name__) - tool_dict["name"] = tool_name - tool_names.append(tool_name) + attributes[ToolAttributes.TOOL_NAME] = tool_name elif callable(tool): tool_name = getattr(tool, "__name__", "unknown_tool") - tool_dict["name"] = tool_name - tool_names.append(tool_name) - - if hasattr(tool, "description"): - description = str(tool.description) - if len(description) > 200: - description = description[:197] + "..." - tool_dict["description"] = description + attributes[ToolAttributes.TOOL_NAME] = tool_name - if tool_dict: # Only add if we have some info - tools_info.append(tool_dict) + if tool_name: + tool_names.append(tool_name) - # Set tool attributes + if hasattr(tool, "description") and tool.description: + attributes[ToolAttributes.TOOL_DESCRIPTION] = str(tool.description) + elif hasattr(tool, "__doc__") and tool.__doc__: + # Fallback to docstring if no description attribute + attributes[ToolAttributes.TOOL_DESCRIPTION] = str(tool.__doc__).strip() + else: + # Multiple tools - use indexed format + for i, tool in enumerate(agent.tools): + tool_name = None + if hasattr(tool, "name"): + tool_name = str(tool.name) + attributes[f"tool.{i}.name"] = tool_name + elif hasattr(tool, "__name__"): + tool_name = str(tool.__name__) + attributes[f"tool.{i}.name"] = tool_name + elif callable(tool): + tool_name = getattr(tool, "__name__", "unknown_tool") + attributes[f"tool.{i}.name"] = tool_name + + if tool_name: + tool_names.append(tool_name) + + if hasattr(tool, "description") and tool.description: + attributes[f"tool.{i}.description"] = str(tool.description) + elif hasattr(tool, "__doc__") and tool.__doc__: + # Fallback to docstring if no description attribute + attributes[f"tool.{i}.description"] = str(tool.__doc__).strip() + + # Set the AGENT_TOOLS attribute with all tool names if tool_names: - attributes["agno.agent.tools_count"] = str(len(tool_names)) - - if tools_info: - # Instead of storing as JSON blob, set individual tool attributes - for i, tool in enumerate(tools_info): - prefix = f"agent.tool.{i}" - if "name" in tool: - attributes[f"{prefix}.{ToolAttributes.TOOL_NAME}"] = tool["name"] - if "description" in tool: - attributes[f"{prefix}.{ToolAttributes.TOOL_DESCRIPTION}"] = tool["description"] - - # Memory and knowledge information - if hasattr(agent, "memory") and agent.memory: - memory_type = type(agent.memory).__name__ - attributes["agno.agent.memory_type"] = memory_type + attributes[AgentAttributes.AGENT_TOOLS] = json.dumps(tool_names) if hasattr(agent, "knowledge") and agent.knowledge: knowledge_type = type(agent.knowledge).__name__ - attributes["agno.agent.knowledge_type"] = knowledge_type + attributes["agent.knowledge_type"] = knowledge_type if hasattr(agent, "storage") and agent.storage: storage_type = type(agent.storage).__name__ - attributes["agno.agent.storage_type"] = storage_type + attributes["agent.storage_type"] = storage_type # Session information if hasattr(agent, "session_id") and agent.session_id: session_id = str(agent.session_id) - attributes["agno.agent.session_id"] = session_id + attributes["agent.session_id"] = session_id if hasattr(agent, "user_id") and agent.user_id: user_id = str(agent.user_id) - attributes["agno.agent.user_id"] = user_id + attributes["agent.user_id"] = user_id + + # Output key if present + if hasattr(agent, "output_key") and agent.output_key: + attributes["agent.output_key"] = str(agent.output_key) # Extract run input information if args and len(args) >= 2: message = args[1] # The message argument if message: message_str = str(message) - if len(message_str) > 500: - message_str = message_str[:497] + "..." - attributes[WorkflowAttributes.WORKFLOW_INPUT] = message_str - attributes["agno.agent.input"] = message_str - # AgentOps entity input (matching CrewAI pattern) + attributes["agent.input"] = message_str + # AgentOps entity input attributes[SpanAttributes.AGENTOPS_ENTITY_INPUT] = message_str # Extract kwargs information @@ -204,87 +194,57 @@ def get_agent_run_attributes( attributes[SpanAttributes.LLM_REQUEST_STREAMING] = str(kwargs["stream"]) if kwargs.get("session_id"): - attributes["agno.agent.run_session_id"] = str(kwargs["session_id"]) + attributes["agent.run_session_id"] = str(kwargs["session_id"]) if kwargs.get("user_id"): - attributes["agno.agent.run_user_id"] = str(kwargs["user_id"]) + attributes["agent.run_user_id"] = str(kwargs["user_id"]) # Extract return value information if return_value: if hasattr(return_value, "run_id") and return_value.run_id: run_id = str(return_value.run_id) - attributes["agno.agent.run_id"] = run_id - - if hasattr(return_value, "session_id") and return_value.session_id: - session_id = str(return_value.session_id) - attributes["agno.agent.response_session_id"] = session_id - - if hasattr(return_value, "agent_id") and return_value.agent_id: - agent_id = str(return_value.agent_id) - attributes["agno.agent.response_agent_id"] = agent_id + attributes["agent.run_id"] = run_id if hasattr(return_value, "content") and return_value.content: content = str(return_value.content) - if len(content) > 500: - content = content[:497] + "..." - attributes[WorkflowAttributes.WORKFLOW_OUTPUT] = content - attributes["agno.agent.output"] = content + attributes["agent.output"] = content if hasattr(return_value, "event") and return_value.event: event = str(return_value.event) - attributes["agno.agent.event"] = event + attributes["agent.event"] = event # Tool executions from the response if hasattr(return_value, "tools") and return_value.tools: - tool_executions = [] - for tool_exec in return_value.tools: - tool_exec_dict = {} + # Track the number of tool executions + attributes["agent.tool_executions_count"] = str(len(return_value.tools)) + for i, tool_exec in enumerate(return_value.tools): # No limit - show all tools if hasattr(tool_exec, "tool_name") and tool_exec.tool_name: - tool_exec_dict["name"] = str(tool_exec.tool_name) + attributes[f"tool.{i}.name"] = str(tool_exec.tool_name) if hasattr(tool_exec, "tool_args") and tool_exec.tool_args: try: - import json - args_str = json.dumps(tool_exec.tool_args) - if len(args_str) > 200: - args_str = args_str[:197] + "..." - tool_exec_dict["parameters"] = args_str + attributes[f"tool.{i}.parameters"] = args_str except: - tool_exec_dict["parameters"] = str(tool_exec.tool_args) + attributes[f"tool.{i}.parameters"] = str(tool_exec.tool_args) if hasattr(tool_exec, "result") and tool_exec.result: result_str = str(tool_exec.result) - if len(result_str) > 200: - result_str = result_str[:197] + "..." - tool_exec_dict["result"] = result_str + attributes[f"tool.{i}.result"] = result_str if hasattr(tool_exec, "tool_call_error") and tool_exec.tool_call_error: - tool_exec_dict["error"] = str(tool_exec.tool_call_error) - - tool_exec_dict["status"] = "success" # Default to success - - if tool_exec_dict: - tool_executions.append(tool_exec_dict) - - if tool_executions: - # Add tool executions (limit to first 3) - limited_executions = tool_executions[:3] - for i, tool_exec in enumerate(limited_executions): - for key, value in tool_exec.items(): - attributes[f"agno.agent.tool_execution.{i}.{key}"] = value + attributes[f"tool.{i}.error"] = str(tool_exec.tool_call_error) - # Workflow type - attributes[WorkflowAttributes.WORKFLOW_TYPE] = "agent_run" + attributes[f"tool.{i}.status"] = "success" # Default to success # Add display name for better UI visualization if agent_name: # Check if we have parent team info - parent_team = attributes.get("agno.agent.parent_team") + parent_team = attributes.get("agent.parent_team") if parent_team: - attributes["agno.agent.display_name"] = f"{agent_name} (Agent under {parent_team})" + attributes["agent.display_name"] = f"{agent_name} (Agent under {parent_team})" else: - attributes["agno.agent.display_name"] = f"{agent_name} (Agent)" + attributes["agent.display_name"] = f"{agent_name}" return attributes diff --git a/agentops/instrumentation/agentic/agno/attributes/metrics.py b/agentops/instrumentation/agentic/agno/attributes/metrics.py index b8d3a9ac1..f60c27e82 100644 --- a/agentops/instrumentation/agentic/agno/attributes/metrics.py +++ b/agentops/instrumentation/agentic/agno/attributes/metrics.py @@ -28,7 +28,7 @@ def get_metrics_attributes( attributes[SpanAttributes.LLM_SYSTEM] = "agno" attributes[SpanAttributes.AGENTOPS_ENTITY_NAME] = "LLM" - # Initialize usage tracking variables (but don't set attributes yet) + # Initialize usage tracking variables usage_data = {} # Initialize counters for indexed messages @@ -66,7 +66,6 @@ def get_metrics_attributes( model_class = model.__class__.__name__ attributes["agno.model.class"] = model_class - # === EXTRACT CONVERSATION STRUCTURE === if hasattr(run_messages, "messages") and run_messages.messages: messages = run_messages.messages @@ -82,12 +81,10 @@ def get_metrics_attributes( for i, msg in enumerate(messages): # Extract message content for prompts/completions if hasattr(msg, "role") and hasattr(msg, "content"): - # Only set content if it's not None/empty + # Only process messages with actual content if msg.content is not None and str(msg.content).strip() != "" and str(msg.content) != "None": content = str(msg.content) - # Truncate very long content to avoid oversized attributes - if len(content) > 1000: - content = content[:997] + "..." + # No truncation - keep full content for observability if msg.role == "user": attributes[f"{SpanAttributes.LLM_PROMPTS}.{prompt_count}.role"] = "user" @@ -101,17 +98,6 @@ def get_metrics_attributes( attributes[f"{SpanAttributes.LLM_PROMPTS}.{prompt_count}.role"] = "system" attributes[f"{SpanAttributes.LLM_PROMPTS}.{prompt_count}.content"] = content prompt_count += 1 - else: - # For messages with None content, still set the role but skip content - if msg.role == "user": - attributes[f"{SpanAttributes.LLM_PROMPTS}.{prompt_count}.role"] = "user" - prompt_count += 1 - elif msg.role == "assistant": - attributes[f"{SpanAttributes.LLM_COMPLETIONS}.{completion_count}.role"] = "assistant" - completion_count += 1 - elif msg.role == "system": - attributes[f"{SpanAttributes.LLM_PROMPTS}.{prompt_count}.role"] = "system" - prompt_count += 1 # Extract token metrics from message if hasattr(msg, "metrics") and msg.metrics: @@ -124,7 +110,7 @@ def get_metrics_attributes( total_completion_tokens += metrics.completion_tokens if hasattr(metrics, "total_tokens") and metrics.total_tokens > 0: total_tokens += metrics.total_tokens - # For messages that only have output_tokens (like Anthropic) + # For messages that only have output_tokens if hasattr(metrics, "output_tokens") and metrics.output_tokens > 0: total_output_tokens += metrics.output_tokens if hasattr(metrics, "input_tokens") and metrics.input_tokens > 0: @@ -132,7 +118,7 @@ def get_metrics_attributes( if hasattr(metrics, "time") and metrics.time: total_time += metrics.time - # === TOKEN METRICS FROM AGENT SESSION METRICS === + # Token metrics from agent session metrics if hasattr(agent, "session_metrics") and agent.session_metrics: session_metrics = agent.session_metrics @@ -191,7 +177,6 @@ def get_metrics_attributes( if hasattr(session_metrics, "reasoning_tokens") and session_metrics.reasoning_tokens > 0: usage_data["reasoning_tokens"] = session_metrics.reasoning_tokens - # === FALLBACK TO MESSAGE AGGREGATION IF SESSION METRICS ARE EMPTY === # If we don't have token data from session metrics, try message aggregation if "total_tokens" not in usage_data: # Set aggregated token usage from messages @@ -207,8 +192,6 @@ def get_metrics_attributes( user_msg = run_messages.user_message if hasattr(user_msg, "content"): content = str(user_msg.content) - if len(content) > 1000: - content = content[:997] + "..." attributes["agno.metrics.user_input"] = content # Set individual LLM usage attributes only for values we actually have diff --git a/agentops/instrumentation/agentic/agno/attributes/storage.py b/agentops/instrumentation/agentic/agno/attributes/storage.py new file mode 100644 index 000000000..ef9b80514 --- /dev/null +++ b/agentops/instrumentation/agentic/agno/attributes/storage.py @@ -0,0 +1,158 @@ +"""Storage operation attribute handlers for Agno workflow instrumentation.""" + +import json +from typing import Any, Dict, Optional, Tuple +from opentelemetry.util.types import AttributeValue + +from agentops.semconv.span_attributes import SpanAttributes +from agentops.semconv.span_kinds import SpanKind as AgentOpsSpanKind +from agentops.instrumentation.common.attributes import get_common_attributes + + +def get_storage_read_attributes( + args: Tuple[Any, ...] = (), + kwargs: Optional[Dict[str, Any]] = None, + return_value: Optional[Any] = None, +) -> Dict[str, AttributeValue]: + """Extract attributes from storage read operations. + + Args: + args: Positional arguments passed to read_from_storage + kwargs: Keyword arguments passed to read_from_storage + return_value: Return value from read_from_storage (the cached data or None) + + Returns: + Dictionary of OpenTelemetry attributes for storage read operations + """ + attributes = get_common_attributes() + kwargs = kwargs or {} + + # Mark this as a storage operation within workflow context + attributes["storage.operation"] = "read" + attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = AgentOpsSpanKind.WORKFLOW + + if args and len(args) > 0: + workflow = args[0] + + # Get workflow information + if hasattr(workflow, "workflow_id") and workflow.workflow_id: + attributes["storage.workflow_id"] = str(workflow.workflow_id) + if hasattr(workflow, "session_id") and workflow.session_id: + attributes["storage.session_id"] = str(workflow.session_id) + + # Get storage type + if hasattr(workflow, "storage") and workflow.storage: + storage_type = type(workflow.storage).__name__ + attributes["storage.backend"] = storage_type + + # Get session state info for context + if hasattr(workflow, "session_state") and isinstance(workflow.session_state, dict): + # Get all cache keys + cache_keys = list(workflow.session_state.keys()) + attributes["storage.cache_size"] = len(cache_keys) + if cache_keys: + attributes["storage.cache_keys"] = json.dumps(cache_keys) + + # Analyze the return value to determine cache hit/miss + if return_value is not None: + # Cache hit + attributes["storage.cache_hit"] = True + attributes["storage.result"] = "hit" + + # Get data type and size + data_type = type(return_value).__name__ + attributes["storage.data_type"] = data_type + + # For dict/list, show structure + if isinstance(return_value, dict): + attributes["storage.data_keys"] = json.dumps(list(return_value.keys())) + attributes["storage.data_size"] = len(return_value) + elif isinstance(return_value, (list, tuple)): + attributes["storage.data_size"] = len(return_value) + elif isinstance(return_value, str): + attributes["storage.data_size"] = len(return_value) + # Show full string data without truncation + attributes["storage.data_preview"] = return_value + else: + # Cache miss + attributes["storage.cache_hit"] = False + attributes["storage.result"] = "miss" + + return attributes + + +def get_storage_write_attributes( + args: Tuple[Any, ...] = (), + kwargs: Optional[Dict[str, Any]] = None, + return_value: Optional[Any] = None, +) -> Dict[str, AttributeValue]: + """Extract attributes from storage write operations. + + Args: + args: Positional arguments passed to write_to_storage + kwargs: Keyword arguments passed to write_to_storage + return_value: Return value from write_to_storage (usually None or success indicator) + + Returns: + Dictionary of OpenTelemetry attributes for storage write operations + """ + attributes = get_common_attributes() + kwargs = kwargs or {} + + # Mark this as a storage operation within workflow context + attributes["storage.operation"] = "write" + attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = AgentOpsSpanKind.WORKFLOW + + if args and len(args) > 0: + workflow = args[0] + + # Get workflow information + if hasattr(workflow, "workflow_id") and workflow.workflow_id: + attributes["storage.workflow_id"] = str(workflow.workflow_id) + if hasattr(workflow, "session_id") and workflow.session_id: + attributes["storage.session_id"] = str(workflow.session_id) + + # Get storage type + if hasattr(workflow, "storage") and workflow.storage: + storage_type = type(workflow.storage).__name__ + attributes["storage.backend"] = storage_type + + # Get session state info to see what's being written + if hasattr(workflow, "session_state") and isinstance(workflow.session_state, dict): + # Get cache state after write + cache_keys = list(workflow.session_state.keys()) + attributes["storage.cache_size"] = len(cache_keys) + if cache_keys: + attributes["storage.cache_keys"] = json.dumps(cache_keys) + + # Try to identify what was written (the newest/changed data) + # This is a heuristic - in practice you might need to track state changes + if cache_keys: + # Show the last key as likely the one just written + last_key = cache_keys[-1] + attributes["storage.written_key"] = last_key + + # Get value preview + value = workflow.session_state.get(last_key) + if value is not None: + value_type = type(value).__name__ + attributes["storage.written_value_type"] = value_type + + if isinstance(value, str): + if len(value) > 100: + attributes["storage.written_value_preview"] = value[:100] + "..." + else: + attributes["storage.written_value_preview"] = value + attributes["storage.written_value_size"] = len(value) + elif isinstance(value, (dict, list)): + attributes["storage.written_value_size"] = len(value) + attributes["storage.written_value_preview"] = f"{value_type} with {len(value)} items" + + # Check write result + if return_value is not None: + attributes["storage.write_success"] = True + else: + # Most storage writes return None on success, so this is normal + attributes["storage.write_success"] = True + + return attributes diff --git a/agentops/instrumentation/agentic/agno/attributes/team.py b/agentops/instrumentation/agentic/agno/attributes/team.py index b3cc68081..da08a1776 100644 --- a/agentops/instrumentation/agentic/agno/attributes/team.py +++ b/agentops/instrumentation/agentic/agno/attributes/team.py @@ -1,7 +1,7 @@ """Agno Team run attributes handler.""" from typing import Optional, Tuple, Dict, Any - +import json from agentops.instrumentation.common.attributes import AttributeMap from agentops.semconv import SpanAttributes, WorkflowAttributes from agentops.semconv.span_kinds import SpanKind as AgentOpsSpanKind @@ -12,12 +12,12 @@ def get_team_run_attributes( kwargs: Optional[Dict] = None, return_value: Optional[Any] = None, ) -> AttributeMap: - """Extract span attributes for Team._run method calls. + """Extract span attributes for Team method calls (both internal and public). Args: - args: Positional arguments passed to the Team._run method - kwargs: Keyword arguments passed to the Team._run method - return_value: The return value from the Team._run method + args: Positional arguments passed to the Team method + kwargs: Keyword arguments passed to the Team method + return_value: The return value from the Team method Returns: A dictionary of span attributes to be set on the workflow span @@ -35,17 +35,17 @@ def get_team_run_attributes( # Team identification if hasattr(team, "name") and team.name: - attributes["agno.team.name"] = str(team.name) - attributes["agno.team.display_name"] = f"{team.name} (Team)" + attributes["team.name"] = str(team.name) + attributes["team.display_name"] = f"{team.name} (Team)" if hasattr(team, "team_id") and team.team_id: - attributes["agno.team.team_id"] = str(team.team_id) + attributes["team.team_id"] = str(team.team_id) if hasattr(team, "mode") and team.mode: - attributes["agno.team.mode"] = str(team.mode) + attributes["team.mode"] = str(team.mode) if hasattr(team, "members") and team.members: - attributes["agno.team.members_count"] = str(len(team.members)) + attributes["team.members_count"] = str(len(team.members)) # Add detailed member information member_agents = [] @@ -67,25 +67,26 @@ def get_team_run_attributes( # Also add individual member attributes for key, value in member_info.items(): - attributes[f"agno.team.member.{i}.{key}"] = value + attributes[f"team.member.{i}.{key}"] = value # Add aggregated member list if member_agents: - import json - try: - attributes["agno.team.members"] = json.dumps(member_agents) + attributes["team.members"] = json.dumps(member_agents) # Also add a simple list of member names member_names = [m.get("name", "Unknown") for m in member_agents] - attributes["agno.team.member_names"] = ", ".join(member_names) + attributes["team.member_names"] = ", ".join(member_names) except: - attributes["agno.team.members"] = str(member_agents) + attributes["team.members"] = str(member_agents) - # Process input arguments from the run_messages parameter + # Process input arguments - handle both internal and public method signatures if args and len(args) >= 2: - # args[0] is run_response, args[1] is run_messages - run_messages = args[1] - if hasattr(run_messages, "messages") and run_messages.messages: + input_arg = args[1] + + # Check if it's internal method (has run_messages) or public method (direct message) + if hasattr(input_arg, "messages"): + # Internal method: args[1] is run_messages + run_messages = input_arg # Get the user message for workflow input user_messages = [msg for msg in run_messages.messages if hasattr(msg, "role") and msg.role == "user"] if user_messages: @@ -93,9 +94,23 @@ def get_team_run_attributes( if hasattr(last_user_msg, "content"): attributes[WorkflowAttributes.WORKFLOW_INPUT] = str(last_user_msg.content) attributes[WorkflowAttributes.WORKFLOW_INPUT_TYPE] = "message" - # Count total messages - attributes["agno.team.messages_count"] = str(len(run_messages.messages)) + attributes["team.messages_count"] = str(len(run_messages.messages)) + else: + # Public method: args[1] is the message directly + message = input_arg + if message is not None: + if isinstance(message, str): + message_content = message + elif hasattr(message, "content"): + message_content = str(message.content) + elif hasattr(message, "get_content_string"): + message_content = message.get_content_string() + else: + message_content = str(message) + + attributes[WorkflowAttributes.WORKFLOW_INPUT] = message_content + attributes[WorkflowAttributes.WORKFLOW_INPUT_TYPE] = "message" # Process keyword arguments if kwargs: @@ -103,178 +118,32 @@ def get_team_run_attributes( attributes[SpanAttributes.LLM_USER] = kwargs["user_id"] if kwargs.get("session_id"): - attributes["agno.team.session_id"] = kwargs["session_id"] + attributes["team.session_id"] = kwargs["session_id"] if kwargs.get("response_format"): - attributes["agno.team.response_format"] = str(type(kwargs["response_format"]).__name__) - - # Process return value (TeamRunResponse) - if return_value: - if hasattr(return_value, "content"): - content = str(return_value.content) - # Truncate if too long - if len(content) > 1000: - content = content[:997] + "..." - attributes[WorkflowAttributes.WORKFLOW_OUTPUT] = content - attributes[WorkflowAttributes.WORKFLOW_OUTPUT_TYPE] = "team_run_response" - else: - output = str(return_value) - if len(output) > 1000: - output = output[:997] + "..." - attributes[WorkflowAttributes.WORKFLOW_OUTPUT] = output - attributes[WorkflowAttributes.WORKFLOW_OUTPUT_TYPE] = type(return_value).__name__ - - # Set additional team response attributes - if hasattr(return_value, "run_id"): - attributes["agno.team.run_id"] = str(return_value.run_id) - - if hasattr(return_value, "session_id"): - attributes["agno.team.response_session_id"] = str(return_value.session_id) - - if hasattr(return_value, "team_id"): - attributes["agno.team.response_team_id"] = str(return_value.team_id) - - if hasattr(return_value, "model"): - attributes[SpanAttributes.LLM_RESPONSE_MODEL] = str(return_value.model) - - if hasattr(return_value, "model_provider"): - attributes["agno.team.model_provider"] = str(return_value.model_provider) - - if hasattr(return_value, "event"): - attributes["agno.team.event"] = str(return_value.event) - - # Team-specific attributes - if hasattr(return_value, "content_type"): - attributes["agno.team.response_content_type"] = str(return_value.content_type) - - return attributes - - -def get_team_public_run_attributes( - args: Optional[Tuple] = None, - kwargs: Optional[Dict] = None, - return_value: Optional[Any] = None, -) -> AttributeMap: - """Extract span attributes for Team.run method calls (public API). - - Args: - args: Positional arguments passed to the Team.run method (self, message, ...) - kwargs: Keyword arguments passed to the Team.run method - return_value: The return value from the Team.run method - - Returns: - A dictionary of span attributes to be set on the workflow span - """ - attributes: AttributeMap = {} - - # Base attributes - attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = AgentOpsSpanKind.WORKFLOW - attributes[SpanAttributes.LLM_SYSTEM] = "agno" - attributes[WorkflowAttributes.WORKFLOW_TYPE] = "team_run" - - # Extract team information from instance - if args and len(args) > 0: - team = args[0] # self (Team instance) - - # Team identification - if hasattr(team, "name") and team.name: - attributes["agno.team.name"] = str(team.name) - attributes["agno.team.display_name"] = f"{team.name} (Team)" - - if hasattr(team, "team_id") and team.team_id: - attributes["agno.team.team_id"] = str(team.team_id) - - if hasattr(team, "mode") and team.mode: - attributes["agno.team.mode"] = str(team.mode) - - if hasattr(team, "members") and team.members: - attributes["agno.team.members_count"] = str(len(team.members)) - - # Add detailed member information - member_agents = [] - for i, member in enumerate(team.members): - member_info = {} - if hasattr(member, "name") and member.name: - member_info["name"] = str(member.name) - if hasattr(member, "agent_id") and member.agent_id: - member_info["id"] = str(member.agent_id) - if hasattr(member, "role") and member.role: - member_info["role"] = str(member.role) - if hasattr(member, "model") and member.model: - if hasattr(member.model, "id"): - member_info["model"] = str(member.model.id) - - # Add member info to list - if member_info: - member_agents.append(member_info) - - # Also add individual member attributes - for key, value in member_info.items(): - attributes[f"agno.team.member.{i}.{key}"] = value - - # Add aggregated member list - if member_agents: - import json - - try: - attributes["agno.team.members"] = json.dumps(member_agents) - # Also add a simple list of member names - member_names = [m.get("name", "Unknown") for m in member_agents] - attributes["agno.team.member_names"] = ", ".join(member_names) - except: - attributes["agno.team.members"] = str(member_agents) - - # Process input arguments from Team.run() method - if args and len(args) >= 2: - # args[0] is self (Team instance), args[1] is message - message = args[1] - - # Extract workflow input from message - if message is not None: - if isinstance(message, str): - message_content = message - elif hasattr(message, "content"): - message_content = str(message.content) - elif hasattr(message, "get_content_string"): - message_content = message.get_content_string() - else: - message_content = str(message) - - # Truncate if too long - if len(message_content) > 1000: - message_content = message_content[:997] + "..." - attributes[WorkflowAttributes.WORKFLOW_INPUT] = message_content - attributes[WorkflowAttributes.WORKFLOW_INPUT_TYPE] = "message" - - # Process keyword arguments - if kwargs: - if kwargs.get("user_id"): - attributes[SpanAttributes.LLM_USER] = kwargs["user_id"] - - if kwargs.get("session_id"): - attributes["agno.team.session_id"] = kwargs["session_id"] + attributes["team.response_format"] = str(type(kwargs["response_format"]).__name__) if kwargs.get("stream"): - attributes["agno.team.streaming"] = str(kwargs["stream"]) + attributes["team.streaming"] = str(kwargs["stream"]) if kwargs.get("stream_intermediate_steps"): - attributes["agno.team.stream_intermediate_steps"] = str(kwargs["stream_intermediate_steps"]) + attributes["team.stream_intermediate_steps"] = str(kwargs["stream_intermediate_steps"]) if kwargs.get("retries"): - attributes["agno.team.retries"] = str(kwargs["retries"]) + attributes["team.retries"] = str(kwargs["retries"]) # Media attachments if kwargs.get("audio"): - attributes["agno.team.has_audio"] = "true" + attributes["team.has_audio"] = "true" if kwargs.get("images"): - attributes["agno.team.has_images"] = "true" + attributes["team.has_images"] = "true" if kwargs.get("videos"): - attributes["agno.team.has_videos"] = "true" + attributes["team.has_videos"] = "true" if kwargs.get("files"): - attributes["agno.team.has_files"] = "true" + attributes["team.has_files"] = "true" if kwargs.get("knowledge_filters"): - attributes["agno.team.has_knowledge_filters"] = "true" + attributes["team.has_knowledge_filters"] = "true" # Process return value (TeamRunResponse or Iterator) if return_value: @@ -282,44 +151,45 @@ def get_team_public_run_attributes( if hasattr(return_value, "__iter__") and not isinstance(return_value, str): # It's an iterator for streaming attributes[WorkflowAttributes.WORKFLOW_OUTPUT_TYPE] = "team_run_response_stream" - attributes["agno.team.is_streaming"] = "true" - elif hasattr(return_value, "content"): - # It's a TeamRunResponse - content = str(return_value.content) - # Truncate if too long - if len(content) > 1000: - content = content[:997] + "..." - attributes[WorkflowAttributes.WORKFLOW_OUTPUT] = content - attributes[WorkflowAttributes.WORKFLOW_OUTPUT_TYPE] = "team_run_response" - - # Set additional team response attributes - if hasattr(return_value, "run_id"): - attributes["agno.team.run_id"] = str(return_value.run_id) - - if hasattr(return_value, "session_id"): - attributes["agno.team.response_session_id"] = str(return_value.session_id) - - if hasattr(return_value, "team_id"): - attributes["agno.team.response_team_id"] = str(return_value.team_id) - - if hasattr(return_value, "model"): - attributes[SpanAttributes.LLM_RESPONSE_MODEL] = str(return_value.model) - - if hasattr(return_value, "model_provider"): - attributes["agno.team.model_provider"] = str(return_value.model_provider) - - if hasattr(return_value, "event"): - attributes["agno.team.event"] = str(return_value.event) - - # Team-specific attributes - if hasattr(return_value, "content_type"): - attributes["agno.team.response_content_type"] = str(return_value.content_type) + attributes["team.is_streaming"] = "true" else: - # Unknown return type - output = str(return_value) - if len(output) > 1000: - output = output[:997] + "..." - attributes[WorkflowAttributes.WORKFLOW_OUTPUT] = output - attributes[WorkflowAttributes.WORKFLOW_OUTPUT_TYPE] = type(return_value).__name__ + # Non-streaming response + if hasattr(return_value, "content"): + # It's a TeamRunResponse with content + content = str(return_value.content) + attributes[WorkflowAttributes.WORKFLOW_OUTPUT] = content + attributes[WorkflowAttributes.WORKFLOW_OUTPUT_TYPE] = "team_run_response" + else: + # Unknown return type or response without content + output = str(return_value) + attributes[WorkflowAttributes.WORKFLOW_OUTPUT] = output + attributes[WorkflowAttributes.WORKFLOW_OUTPUT_TYPE] = type(return_value).__name__ + + # Set additional team response attributes (for both streaming and non-streaming) + if hasattr(return_value, "run_id"): + attributes["team.run_id"] = str(return_value.run_id) + + if hasattr(return_value, "session_id"): + attributes["team.response_session_id"] = str(return_value.session_id) + + if hasattr(return_value, "team_id"): + attributes["team.response_team_id"] = str(return_value.team_id) + + if hasattr(return_value, "model"): + attributes[SpanAttributes.LLM_RESPONSE_MODEL] = str(return_value.model) + + if hasattr(return_value, "model_provider"): + attributes["team.model_provider"] = str(return_value.model_provider) + + if hasattr(return_value, "event"): + attributes["team.event"] = str(return_value.event) + + # Team-specific attributes + if hasattr(return_value, "content_type"): + attributes["team.response_content_type"] = str(return_value.content_type) return attributes + + +# Keep the public function as an alias for backward compatibility +get_team_public_run_attributes = get_team_run_attributes diff --git a/agentops/instrumentation/agentic/agno/attributes/tool.py b/agentops/instrumentation/agentic/agno/attributes/tool.py index 92f9e1ad9..03a2e40ef 100644 --- a/agentops/instrumentation/agentic/agno/attributes/tool.py +++ b/agentops/instrumentation/agentic/agno/attributes/tool.py @@ -2,7 +2,7 @@ import json from typing import Optional, Tuple, Dict, Any - +import time from agentops.instrumentation.common.attributes import AttributeMap from agentops.semconv import SpanAttributes from agentops.semconv.span_kinds import SpanKind as AgentOpsSpanKind @@ -29,14 +29,16 @@ def get_tool_execution_attributes( # Base attributes attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = AgentOpsSpanKind.TOOL attributes[SpanAttributes.LLM_SYSTEM] = "agno" - attributes["agno.tool.operation"] = "execute" + + # AgentOps entity attributes + attributes[SpanAttributes.AGENTOPS_ENTITY_NAME] = "tool" # Process the FunctionCall object (self in execute method) if args and len(args) > 0: function_call = args[0] # Add detailed function call information - attributes["agno.tool.function_call_type"] = str(type(function_call).__name__) + attributes["tool.function_call_type"] = str(type(function_call).__name__) # Extract tool information if hasattr(function_call, "function") and function_call.function: @@ -45,39 +47,39 @@ def get_tool_execution_attributes( # Get function name and add display name if hasattr(function, "__name__"): func_name = function.__name__ - attributes["agno.tool.function_name"] = func_name - attributes["agno.tool.display_name"] = f"{func_name} (Tool)" + attributes["tool.function_name"] = func_name + attributes["tool.display_name"] = f"{func_name} (Tool)" tool_name = getattr(function, "name", "unknown_tool") # Set span attributes for the tool execution span attributes[ToolAttributes.TOOL_NAME] = tool_name - attributes["agno.tool.function_name"] = tool_name + attributes["tool.function_name"] = tool_name # Function details and context if hasattr(function, "description"): description = getattr(function, "description", "") if description: attributes[ToolAttributes.TOOL_DESCRIPTION] = description - attributes["agno.tool.function_description"] = description + attributes["tool.function_description"] = description # Function source information if hasattr(function, "entrypoint") and function.entrypoint: entrypoint = function.entrypoint if hasattr(entrypoint, "__module__"): - attributes["agno.tool.function_module"] = str(entrypoint.__module__) + attributes["tool.function_module"] = str(entrypoint.__module__) if hasattr(entrypoint, "__name__"): - attributes["agno.tool.function_method"] = str(entrypoint.__name__) + attributes["tool.function_method"] = str(entrypoint.__name__) if hasattr(entrypoint, "__qualname__"): - attributes["agno.tool.function_qualname"] = str(entrypoint.__qualname__) + attributes["tool.function_qualname"] = str(entrypoint.__qualname__) # Tool capabilities if hasattr(function, "requires_confirmation"): - attributes["agno.tool.requires_confirmation"] = str(function.requires_confirmation) + attributes["tool.requires_confirmation"] = str(function.requires_confirmation) if hasattr(function, "show_result"): - attributes["agno.tool.show_result"] = str(function.show_result) + attributes["tool.show_result"] = str(function.show_result) if hasattr(function, "stop_after_tool_call"): - attributes["agno.tool.stop_after_tool_call"] = str(function.stop_after_tool_call) + attributes["tool.stop_after_tool_call"] = str(function.stop_after_tool_call) # Extract tool arguments with better formatting if hasattr(function_call, "arguments") and function_call.arguments: @@ -94,45 +96,43 @@ def get_tool_execution_attributes( formatted_args.append(f"{key}={value_str}") attributes[ToolAttributes.TOOL_PARAMETERS] = json.dumps(args_dict) - attributes["agno.tool.formatted_args"] = ", ".join(formatted_args) - attributes["agno.tool.args_count"] = str(len(args_dict)) + attributes["tool.formatted_args"] = ", ".join(formatted_args) + attributes["tool.args_count"] = str(len(args_dict)) except Exception as e: attributes[ToolAttributes.TOOL_PARAMETERS] = str(function_call.arguments) - attributes["agno.tool.args_parse_error"] = str(e) + attributes["tool.args_parse_error"] = str(e) # Extract call ID and metadata if hasattr(function_call, "tool_call_id"): - attributes["agno.tool.call_id"] = str(function_call.tool_call_id) + attributes["tool.call_id"] = str(function_call.tool_call_id) # Check for any agent context if hasattr(function_call, "_agent") and function_call._agent: agent = function_call._agent if hasattr(agent, "name"): - attributes["agno.tool.calling_agent_name"] = str(agent.name) + attributes["tool.calling_agent_name"] = str(agent.name) if hasattr(agent, "agent_id"): - attributes["agno.tool.calling_agent_id"] = str(agent.agent_id) + attributes["tool.calling_agent_id"] = str(agent.agent_id) # Process return value if return_value is not None: # Add timing information - import time - - attributes["agno.tool.execution_timestamp"] = str(int(time.time() * 1000)) + attributes["tool.execution_timestamp"] = str(int(time.time() * 1000)) # Determine execution status and result information if hasattr(return_value, "value"): # FunctionExecutionResult with value result_value = return_value.value - attributes["agno.tool.execution_status"] = "success" + attributes["tool.execution_status"] = "success" else: # Direct return value result_value = return_value - attributes["agno.tool.execution_status"] = "success" + attributes["tool.execution_status"] = "success" # Process result value if result_value is not None: result_type = type(result_value).__name__ - attributes["agno.tool.execution_result_status"] = str(result_type) + attributes["tool.execution_result_status"] = str(result_type) # Handle FunctionExecutionResult objects specifically if hasattr(result_value, "status") and hasattr(result_value, "result"): @@ -141,20 +141,20 @@ def get_tool_execution_attributes( actual_result = getattr(result_value, "result", None) error = getattr(result_value, "error", None) - attributes["agno.tool.execution_result_status"] = str(status) + attributes["tool.execution_result_status"] = str(status) attributes[ToolAttributes.TOOL_STATUS] = str(status) if error: - attributes["agno.tool.execution_error"] = str(error) + attributes["tool.execution_error"] = str(error) attributes["tool.error"] = str(error) if actual_result is not None: actual_result_type = type(actual_result).__name__ - attributes["agno.tool.actual_result_type"] = actual_result_type + attributes["tool.actual_result_type"] = actual_result_type # Enhanced generator handling if hasattr(actual_result, "__iter__") and hasattr(actual_result, "__next__"): - attributes["agno.tool.result_is_generator"] = "true" + attributes["tool.result_is_generator"] = "true" # Try to get more meaningful information about the generator generator_info = [] @@ -162,40 +162,9 @@ def get_tool_execution_attributes( # Get function name from the generator if hasattr(actual_result, "gi_code"): func_name = actual_result.gi_code.co_name - attributes["agno.tool.generator_function"] = func_name + attributes["tool.generator_function"] = func_name generator_info.append(f"function={func_name}") - # Get local variables from generator frame for context - if hasattr(actual_result, "gi_frame") and actual_result.gi_frame: - try: - locals_dict = actual_result.gi_frame.f_locals - # Look for interesting variables that give context - context_vars = [ - "task_description", - "expected_output", - "member_agent", - "agent_name", - "team", - "message", - ] - for var_name in context_vars: - if var_name in locals_dict: - value = str(locals_dict[var_name]) - generator_info.append(f"{var_name}={value}") - attributes[f"agno.tool.generator_{var_name}"] = value - - # Count total local variables for debugging - attributes["agno.tool.generator_locals_count"] = str(len(locals_dict)) - except Exception as e: - attributes["agno.tool.generator_locals_error"] = str(e) - - # Try to identify what type of transfer this is - generator_str = str(actual_result) - if "transfer_task_to_member" in generator_str: - attributes["agno.tool.transfer_type"] = "task_to_member" - elif "transfer" in generator_str.lower(): - attributes["agno.tool.transfer_type"] = "general_transfer" - if generator_info: result_str = f"Generator<{actual_result_type}>({', '.join(generator_info)})" else: @@ -209,11 +178,11 @@ def get_tool_execution_attributes( # Not a FunctionExecutionResult, handle as direct result if hasattr(result_value, "__iter__") and hasattr(result_value, "__next__"): # It's a generator - attributes["agno.tool.result_is_generator"] = "true" + attributes["tool.result_is_generator"] = "true" if hasattr(result_value, "gi_code"): func_name = result_value.gi_code.co_name - attributes["agno.tool.generator_function"] = func_name + attributes["tool.generator_function"] = func_name result_str = f"Generator<{result_type}> function={func_name} - {str(result_value)}" else: result_str = f"Generator<{result_type}> - {str(result_value)}" @@ -227,7 +196,7 @@ def get_tool_execution_attributes( attributes[ToolAttributes.TOOL_RESULT] = result_str # Add additional analysis attributes - attributes["agno.tool.result_length"] = str(len(result_str)) + attributes["tool.result_length"] = str(len(result_str)) # Set final execution status if not attributes.get(ToolAttributes.TOOL_STATUS): @@ -235,7 +204,7 @@ def get_tool_execution_attributes( # Add execution summary for debugging tool_name = attributes.get(ToolAttributes.TOOL_NAME, "unknown") - call_type = attributes.get("agno.tool.transfer_type", "unknown") - attributes["agno.tool.execution_summary"] = f"Tool '{tool_name}' executed with type '{call_type}'" + call_type = attributes.get("tool.transfer_type", "unknown") + attributes["tool.execution_summary"] = f"Tool '{tool_name}' executed with type '{call_type}'" return attributes diff --git a/agentops/instrumentation/agentic/agno/attributes/workflow.py b/agentops/instrumentation/agentic/agno/attributes/workflow.py index 384cb616b..c848f64c9 100644 --- a/agentops/instrumentation/agentic/agno/attributes/workflow.py +++ b/agentops/instrumentation/agentic/agno/attributes/workflow.py @@ -2,6 +2,7 @@ from typing import Any, Dict, Optional, Tuple from opentelemetry.util.types import AttributeValue +import json from agentops.semconv.instrumentation import InstrumentationAttributes from agentops.semconv.span_kinds import SpanKind as AgentOpsSpanKind @@ -196,3 +197,58 @@ def get_workflow_session_attributes( attributes[InstrumentationAttributes.INSTRUMENTATION_TYPE] = AgentOpsSpanKind.WORKFLOW return attributes + + +def get_workflow_cache_attributes( + args: Tuple[Any, ...] = (), + kwargs: Optional[Dict[str, Any]] = None, + return_value: Optional[Any] = None, +) -> Dict[str, AttributeValue]: + """Extract attributes from workflow cache operations. + + Args: + args: Positional arguments passed to the cache method + kwargs: Keyword arguments passed to the cache method + return_value: Return value from the cache method + + Returns: + Dictionary of OpenTelemetry attributes for cache operations + """ + attributes = get_common_attributes() + kwargs = kwargs or {} + + if args and len(args) > 0: + workflow = args[0] + + # Get workflow information + if hasattr(workflow, "workflow_id") and workflow.workflow_id: + attributes["cache.workflow_id"] = str(workflow.workflow_id) + if hasattr(workflow, "session_id") and workflow.session_id: + attributes["cache.session_id"] = str(workflow.session_id) + + # Get cache state + if hasattr(workflow, "session_state") and isinstance(workflow.session_state, dict): + attributes["cache.size"] = len(workflow.session_state) + attributes["cache.keys"] = json.dumps(list(workflow.session_state.keys())) + + # Determine cache operation type and result + if len(args) > 1: + cache_key = str(args[1]) + attributes["cache.key"] = cache_key + + if return_value is not None: + attributes["cache.hit"] = True + attributes["cache.result"] = "hit" + + # Add value info + if isinstance(return_value, str): + attributes["cache.value_size"] = len(return_value) + if len(return_value) <= 100: + attributes["cache.value"] = return_value + else: + attributes["cache.value_preview"] = return_value[:100] + "..." + else: + attributes["cache.hit"] = False + attributes["cache.result"] = "miss" + + return attributes diff --git a/agentops/instrumentation/agentic/agno/instrumentor.py b/agentops/instrumentation/agentic/agno/instrumentor.py index 8c7cdd9ff..4e2cd44a4 100644 --- a/agentops/instrumentation/agentic/agno/instrumentor.py +++ b/agentops/instrumentation/agentic/agno/instrumentor.py @@ -3,16 +3,6 @@ This module provides instrumentation for the Agno Agent library, implementing OpenTelemetry instrumentation for agent workflows and LLM model calls. -We focus on instrumenting the following key endpoints: -- Agent.run/arun - Main agent workflow execution (sync/async) -- Team._run/_arun - Team workflow execution (sync/async) -- Team._run_stream/_arun_stream - Team streaming workflow execution (sync/async) -- FunctionCall.execute/aexecute - Tool execution when agents call tools (sync/async) -- Agent._run_tool/_arun_tool - Agent internal tool execution (sync/async) -- Agent._set_session_metrics - Session metrics capture for token usage and timing -- Workflow.run_workflow/arun_workflow - Workflow execution (sync/async) -- Workflow session management methods - Session lifecycle operations - This provides clean visibility into agent workflows and actual tool usage with proper parent-child span relationships. """ @@ -22,6 +12,7 @@ from opentelemetry.trace import Status, StatusCode from opentelemetry.metrics import Meter import threading +import json from agentops.logging import logger from agentops.instrumentation.common import ( @@ -32,13 +23,15 @@ from agentops.instrumentation.common.wrappers import WrapConfig # Import attribute handlers -from agentops.instrumentation.agentic.agno.attributes.agent import get_agent_run_attributes -from agentops.instrumentation.agentic.agno.attributes.team import get_team_run_attributes -from agentops.instrumentation.agentic.agno.attributes.tool import get_tool_execution_attributes -from agentops.instrumentation.agentic.agno.attributes.metrics import get_metrics_attributes -from agentops.instrumentation.agentic.agno.attributes.workflow import ( +from agentops.instrumentation.agentic.agno.attributes import ( + get_agent_run_attributes, + get_metrics_attributes, + get_team_run_attributes, + get_tool_execution_attributes, get_workflow_run_attributes, get_workflow_session_attributes, + get_storage_read_attributes, + get_storage_write_attributes, ) @@ -130,12 +123,16 @@ def wrapper(wrapped, instance, args, kwargs): workflow_id = getattr(instance, "workflow_id", None) or getattr(instance, "id", None) or id(instance) workflow_id = str(workflow_id) + # Get workflow name for span naming + workflow_name = getattr(instance, "name", None) or type(instance).__name__ + span_name = f"{workflow_name}.agno.workflow.run.workflow" if workflow_name else "agno.workflow.run.workflow" + # Check if streaming is enabled is_streaming = kwargs.get("stream", getattr(instance, "stream", False)) # For streaming, manually manage span lifecycle if is_streaming: - span = tracer.start_span("agno.workflow.run.workflow") + span = tracer.start_span(span_name) try: # Set workflow attributes @@ -176,7 +173,7 @@ def wrapper(wrapped, instance, args, kwargs): raise else: # For non-streaming, use normal context manager - with tracer.start_as_current_span("agno.workflow.run.workflow") as span: + with tracer.start_as_current_span(span_name) as span: try: # Set workflow attributes attributes = get_workflow_run_attributes(args=(instance,) + args, kwargs=kwargs) @@ -213,12 +210,16 @@ async def wrapper(wrapped, instance, args, kwargs): workflow_id = getattr(instance, "workflow_id", None) or getattr(instance, "id", None) or id(instance) workflow_id = str(workflow_id) + # Get workflow name for span naming + workflow_name = getattr(instance, "name", None) or type(instance).__name__ + span_name = f"{workflow_name}.agno.workflow.run.workflow" if workflow_name else "agno.workflow.run.workflow" + # Check if streaming is enabled is_streaming = kwargs.get("stream", getattr(instance, "stream", False)) # For streaming, manually manage span lifecycle if is_streaming: - span = tracer.start_span("agno.workflow.run.workflow") + span = tracer.start_span(span_name) try: # Set workflow attributes @@ -259,7 +260,7 @@ async def wrapper(wrapped, instance, args, kwargs): raise else: # For non-streaming, use normal context manager - with tracer.start_as_current_span("agno.workflow.run.workflow") as span: + with tracer.start_as_current_span(span_name) as span: try: # Set workflow attributes attributes = get_workflow_run_attributes(args=(instance,) + args, kwargs=kwargs) @@ -299,12 +300,16 @@ def wrapper(wrapped, instance, args, kwargs): # Get session ID for context mapping session_id = getattr(instance, "session_id", None) + # Get agent name for span naming + agent_name = getattr(instance, "name", None) + span_name = f"{agent_name}.agno.agent.run.agent" if agent_name else "agno.agent.run.agent" + # Check if streaming is enabled is_streaming = kwargs.get("stream", getattr(instance, "stream", False)) # For streaming, manually manage span lifecycle if is_streaming: - span = tracer.start_span("agno.agent.run.agent") + span = tracer.start_span(span_name) try: # Set agent attributes @@ -354,7 +359,7 @@ def wrapper(wrapped, instance, args, kwargs): raise else: # For non-streaming, use normal context manager - with tracer.start_as_current_span("agno.agent.run.agent") as span: + with tracer.start_as_current_span(span_name) as span: try: # Set agent attributes attributes = get_agent_run_attributes(args=(instance,) + args, kwargs=kwargs) @@ -394,12 +399,16 @@ async def wrapper(wrapped, instance, args, kwargs): # Get session ID for context mapping session_id = getattr(instance, "session_id", None) + # Get agent name for span naming + agent_name = getattr(instance, "name", None) + span_name = f"{agent_name}.agno.agent.run.agent" if agent_name else "agno.agent.run.agent" + # Check if streaming is enabled is_streaming = kwargs.get("stream", getattr(instance, "stream", False)) # For streaming, manually manage span lifecycle if is_streaming: - span = tracer.start_span("agno.agent.run.agent") + span = tracer.start_span(span_name) try: # Set agent attributes @@ -449,7 +458,7 @@ async def wrapper(wrapped, instance, args, kwargs): raise else: # For non-streaming, use normal context manager - with tracer.start_as_current_span("agno.agent.run.agent") as span: + with tracer.start_as_current_span(span_name) as span: try: # Set agent attributes attributes = get_agent_run_attributes(args=(instance,) + args, kwargs=kwargs) @@ -620,6 +629,10 @@ def wrapper(wrapped, instance, args, kwargs): team_id = getattr(instance, "team_id", None) or getattr(instance, "id", None) or id(instance) team_id = str(team_id) + # Get team name for span naming + team_name = getattr(instance, "name", None) + span_name = f"{team_name}.agno.team.run.workflow" if team_name else "agno.team.run.workflow" + # Check if we already have a team context (from print_response) existing_context = streaming_context_manager.get_context(team_id) @@ -630,7 +643,7 @@ def wrapper(wrapped, instance, args, kwargs): # Execute within the existing team context context_token = otel_context.attach(parent_context) try: - with tracer.start_as_current_span("agno.team.run.workflow") as span: + with tracer.start_as_current_span(span_name) as span: try: # Set workflow attributes attributes = get_team_run_attributes(args=(instance,) + args, kwargs=kwargs) @@ -656,7 +669,7 @@ def wrapper(wrapped, instance, args, kwargs): otel_context.detach(context_token) else: # Direct call to _run, create new team span - with tracer.start_as_current_span("agno.team.run.workflow") as span: + with tracer.start_as_current_span(span_name) as span: try: # Set workflow attributes attributes = get_team_run_attributes(args=(instance,) + args, kwargs=kwargs) @@ -685,6 +698,10 @@ async def wrapper(wrapped, instance, args, kwargs): team_id = getattr(instance, "team_id", None) or getattr(instance, "id", None) or id(instance) team_id = str(team_id) + # Get team name for span naming + team_name = getattr(instance, "name", None) + span_name = f"{team_name}.agno.team.run.workflow" if team_name else "agno.team.run.workflow" + # Check if we already have a team context (from print_response) existing_context = streaming_context_manager.get_context(team_id) @@ -695,7 +712,7 @@ async def wrapper(wrapped, instance, args, kwargs): # Execute within the existing team context context_token = otel_context.attach(parent_context) try: - with tracer.start_as_current_span("agno.team.run.workflow") as span: + with tracer.start_as_current_span(span_name) as span: try: # Set workflow attributes attributes = get_team_run_attributes(args=(instance,) + args, kwargs=kwargs) @@ -721,7 +738,7 @@ async def wrapper(wrapped, instance, args, kwargs): otel_context.detach(context_token) else: # Direct call to _arun, create new team span - with tracer.start_as_current_span("agno.team.run.workflow") as span: + with tracer.start_as_current_span(span_name) as span: try: # Set workflow attributes attributes = get_team_run_attributes(args=(instance,) + args, kwargs=kwargs) @@ -753,11 +770,15 @@ def wrapper(wrapped, instance, args, kwargs): # Check if streaming is enabled is_streaming = kwargs.get("stream", getattr(instance, "stream", False)) + # Get team name for span naming + team_name = getattr(instance, "name", None) + base_span_name = f"{team_name}.agno.team.run.workflow" if team_name else "agno.team.run.workflow" + # For print_response, we need to wrap the internal _run method instead # because print_response returns immediately if wrapped.__name__ == "print_response": # Create team span but don't manage it here - span = tracer.start_span("agno.team.run.agent") + span = tracer.start_span(base_span_name) try: # Set team attributes @@ -782,7 +803,7 @@ def wrapper(wrapped, instance, args, kwargs): raise else: # For run/arun methods, use standard span management - span = tracer.start_span("agno.team.run.agent") + span = tracer.start_span(base_span_name) try: # Set team attributes @@ -832,8 +853,12 @@ async def wrapper(wrapped, instance, args, kwargs): # Check if streaming is enabled is_streaming = kwargs.get("stream", getattr(instance, "stream", False)) + # Get team name for span naming + team_name = getattr(instance, "name", None) + span_name = f"{team_name}.agno.team.run.workflow" if team_name else "agno.team.run.workflow" + # Create team span - span = tracer.start_span("agno.team.run.agent") + span = tracer.start_span(span_name) try: # Set team attributes @@ -869,6 +894,233 @@ async def wrapper(wrapped, instance, args, kwargs): return wrapper +def create_storage_read_wrapper(tracer, streaming_context_manager): + """Create a wrapper for storage read operations with cache-aware span naming.""" + + def wrapper(wrapped, instance, args, kwargs): + # Start with a basic span name + span_name = "agno.workflow.storage.read" + + with tracer.start_as_current_span(span_name) as span: + try: + # Set flag to indicate we're in a storage operation + SessionStateProxy._set_storage_operation(True) + + # Set initial attributes + attributes = get_storage_read_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Execute the original function + result = wrapped(*args, **kwargs) + + # Set result attributes including cache hit/miss + result_attributes = get_storage_read_attributes( + args=(instance,) + args, kwargs=kwargs, return_value=result + ) + for key, value in result_attributes.items(): + if key not in attributes: # Avoid duplicates + span.set_attribute(key, value) + + # Update span name based on result and cache state + if hasattr(instance, "session_state") and isinstance(instance.session_state, dict): + cache_size = len(instance.session_state) + if result is not None: + span.update_name(f"Storage.Read.Hit[cache:{cache_size}]") + else: + span.update_name(f"Storage.Read.Miss[cache:{cache_size}]") + else: + # No cache info available + if result is not None: + span.update_name("Storage.Read.Hit") + else: + span.update_name("Storage.Read.Miss") + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + finally: + # Clear the flag when done + SessionStateProxy._set_storage_operation(False) + + return wrapper + + +def create_storage_write_wrapper(tracer, streaming_context_manager): + """Create a wrapper for storage write operations with descriptive span naming.""" + + def wrapper(wrapped, instance, args, kwargs): + # Start with a basic span name + span_name = "agno.workflow.storage.write" + + with tracer.start_as_current_span(span_name) as span: + try: + # Set flag to indicate we're in a storage operation + SessionStateProxy._set_storage_operation(True) + + # Set initial attributes + attributes = get_storage_write_attributes(args=(instance,) + args, kwargs=kwargs) + for key, value in attributes.items(): + span.set_attribute(key, value) + + # Execute the original function + result = wrapped(*args, **kwargs) + + # Set result attributes + result_attributes = get_storage_write_attributes( + args=(instance,) + args, kwargs=kwargs, return_value=result + ) + for key, value in result_attributes.items(): + if key not in attributes: # Avoid duplicates + span.set_attribute(key, value) + + # Update span name to show cache state after write + if hasattr(instance, "session_state") and isinstance(instance.session_state, dict): + cache_size = len(instance.session_state) + span.update_name(f"Storage.Write[cache:{cache_size}]") + else: + span.update_name("Storage.Write") + + span.set_status(Status(StatusCode.OK)) + return result + + except Exception as e: + span.set_status(Status(StatusCode.ERROR, str(e))) + span.record_exception(e) + raise + finally: + # Clear the flag when done + SessionStateProxy._set_storage_operation(False) + + return wrapper + + +class SessionStateProxy(dict): + """Proxy class for session_state that instruments cache operations.""" + + # Thread-local storage to track if we're in a storage operation + _thread_local = threading.local() + + def __init__(self, original_dict, workflow, tracer): + super().__init__(original_dict) + self._workflow = workflow + self._tracer = tracer + + @classmethod + def _in_storage_operation(cls): + """Check if we're currently in a storage operation.""" + return getattr(cls._thread_local, "in_storage_operation", False) + + @classmethod + def _set_storage_operation(cls, value): + """Set whether we're in a storage operation.""" + cls._thread_local.in_storage_operation = value + + def get(self, key, default=None): + """Instrumented get method for cache checking.""" + # Check if we're already in a storage operation to avoid nested spans + if self._in_storage_operation(): + # We're inside a storage operation, skip instrumentation + return super().get(key, default) + + span_name = "Cache.Check" + + with self._tracer.start_as_current_span(span_name) as span: + # Set cache attributes + span.set_attribute("cache.key", str(key)) + span.set_attribute("cache.size", len(self)) + span.set_attribute("cache.keys", json.dumps(list(self.keys()))) + + # Get workflow info + if hasattr(self._workflow, "workflow_id") and self._workflow.workflow_id: + span.set_attribute("cache.workflow_id", str(self._workflow.workflow_id)) + if hasattr(self._workflow, "session_id") and self._workflow.session_id: + span.set_attribute("cache.session_id", str(self._workflow.session_id)) + + # Call the original method + result = super().get(key, default) + + # Update span based on result + if result is not None and result != default: + span.set_attribute("cache.hit", True) + span.set_attribute("cache.result", "hit") + span.update_name(f"Cache.Hit[{len(self)} entries]") + + # Add value info + if isinstance(result, str): + span.set_attribute("cache.value_size", len(result)) + if len(result) <= 100: + span.set_attribute("cache.value", result) + else: + span.set_attribute("cache.value_preview", result[:100] + "...") + else: + span.set_attribute("cache.hit", False) + span.set_attribute("cache.result", "miss") + span.update_name(f"Cache.Miss[{len(self)} entries]") + + span.set_status(Status(StatusCode.OK)) + return result + + def __setitem__(self, key, value): + """Instrumented setitem method for cache storing.""" + # Check if we're already in a storage operation to avoid nested spans + if self._in_storage_operation(): + # We're inside a storage operation, skip instrumentation + return super().__setitem__(key, value) + + span_name = "Cache.Store" + + with self._tracer.start_as_current_span(span_name) as span: + # Set cache attributes + span.set_attribute("cache.key", str(key)) + + # Get workflow info + if hasattr(self._workflow, "workflow_id") and self._workflow.workflow_id: + span.set_attribute("cache.workflow_id", str(self._workflow.workflow_id)) + if hasattr(self._workflow, "session_id") and self._workflow.session_id: + span.set_attribute("cache.session_id", str(self._workflow.session_id)) + + # Call the original method + super().__setitem__(key, value) + + # Set post-store attributes + span.set_attribute("cache.size", len(self)) + span.set_attribute("cache.keys", json.dumps(list(self.keys()))) + + # Add value info + if isinstance(value, str): + span.set_attribute("cache.value_size", len(value)) + if len(value) <= 100: + span.set_attribute("cache.value", value) + else: + span.set_attribute("cache.value_preview", value[:100] + "...") + + span.update_name(f"Cache.Store[{len(self)} entries]") + span.set_status(Status(StatusCode.OK)) + + +def create_workflow_init_wrapper(tracer): + """Wrapper to instrument workflow initialization and wrap session_state.""" + + def wrapper(wrapped, instance, args, kwargs): + # Call the original __init__ + result = wrapped(*args, **kwargs) + + # Wrap session_state if it exists + if hasattr(instance, "session_state") and isinstance(instance.session_state, dict): + # Replace session_state with our proxy + original_state = instance.session_state + instance.session_state = SessionStateProxy(original_state, instance, tracer) + + return result + + return wrapper + + def get_agent_context_for_llm(): """Helper function for LLM instrumentation to get current agent context.""" current_context = otel_context.get_current() @@ -916,21 +1168,16 @@ def _create_metrics(self, meter: Meter) -> Dict[str, Any]: def _initialize(self, **kwargs): """Perform custom initialization.""" - logger.info("Agno instrumentation installed successfully") - # Schedule wrapping to happen after imports are complete - import threading - - threading.Timer(0.1, self._delayed_wrap).start() - - def _delayed_wrap(self): - """Perform wrapping after a delay to avoid circular imports.""" + logger.info("Agno instrumentation: Beginning immediate instrumentation") + # Perform wrapping immediately instead of with a delay try: self._perform_wrapping() + logger.info("Agno instrumentation: Immediate instrumentation completed successfully") except Exception as e: - logger.error(f"Failed to perform delayed wrapping: {e}") + logger.error(f"Failed to perform immediate wrapping: {e}") def _custom_wrap(self, **kwargs): - """Skip custom wrapping during initialization - it will be done in _delayed_wrap.""" + """Skip custom wrapping during initialization - it's done in _initialize.""" pass def _perform_wrapping(self): @@ -967,20 +1214,7 @@ def _perform_wrapping(self): method_name="new_session", handler=get_workflow_session_attributes, ), - WrapConfig( - trace_name="agno.workflow.session.read_from_storage", - package="agno.workflow.workflow", - class_name="Workflow", - method_name="read_from_storage", - handler=get_workflow_session_attributes, - ), - WrapConfig( - trace_name="agno.workflow.session.write_to_storage", - package="agno.workflow.workflow", - class_name="Workflow", - method_name="write_to_storage", - handler=get_workflow_session_attributes, - ), + # Note: read_from_storage and write_to_storage use custom wrappers below ] wrapped_count = 0 @@ -1003,13 +1237,18 @@ def _perform_wrapping(self): ("agno.tools.function", "FunctionCall.execute", self._create_streaming_tool_wrapper()), # Metrics wrapper ("agno.agent", "Agent._set_session_metrics", self._create_metrics_wrapper()), - # Team methods + # Team methods - wrap all public and internal methods + ("agno.team.team", "Team.print_response", self._create_team_wrapper()), ("agno.team.team", "Team.run", self._create_team_wrapper()), ("agno.team.team", "Team.arun", self._create_team_async_wrapper()), - ("agno.team.team", "Team.print_response", self._create_team_wrapper()), # Team internal methods with special handling ("agno.team.team", "Team._run", self._create_team_internal_wrapper()), ("agno.team.team", "Team._arun", self._create_team_internal_async_wrapper()), + # Storage methods with custom wrappers for cache-aware naming + ("agno.workflow.workflow", "Workflow.read_from_storage", self._create_storage_read_wrapper()), + ("agno.workflow.workflow", "Workflow.write_to_storage", self._create_storage_write_wrapper()), + # Workflow init wrapper to instrument session_state + ("agno.workflow.workflow", "Workflow.__init__", self._create_workflow_init_wrapper()), ] for package, method, wrapper in streaming_methods: @@ -1070,3 +1309,15 @@ def _create_team_internal_wrapper(self, args=None, kwargs=None, return_value=Non def _create_team_internal_async_wrapper(self, args=None, kwargs=None, return_value=None): """Wrapper function for async team internal methods.""" return create_team_internal_async_wrapper(self._tracer, self._streaming_context_manager) + + def _create_storage_read_wrapper(self, args=None, kwargs=None, return_value=None): + """Wrapper function for storage read operations.""" + return create_storage_read_wrapper(self._tracer, self._streaming_context_manager) + + def _create_storage_write_wrapper(self, args=None, kwargs=None, return_value=None): + """Wrapper function for storage write operations.""" + return create_storage_write_wrapper(self._tracer, self._streaming_context_manager) + + def _create_workflow_init_wrapper(self, args=None, kwargs=None, return_value=None): + """Wrapper function for workflow initialization to instrument session_state.""" + return create_workflow_init_wrapper(self._tracer) diff --git a/agentops/semconv/workflow.py b/agentops/semconv/workflow.py index d0d506d8d..95b9666b3 100644 --- a/agentops/semconv/workflow.py +++ b/agentops/semconv/workflow.py @@ -25,20 +25,30 @@ class WorkflowAttributes: # Configuration WORKFLOW_MAX_TURNS = "workflow.max_turns" # Maximum number of turns in a workflow WORKFLOW_DEBUG_MODE = "workflow.debug_mode" # Whether debug mode is enabled + WORKFLOW_MONITORING = "workflow.monitoring" # Whether monitoring is enabled + WORKFLOW_TELEMETRY = "workflow.telemetry" # Whether telemetry is enabled + + # Memory and Storage + WORKFLOW_MEMORY_TYPE = "workflow.memory_type" # Type of memory used by the workflow + WORKFLOW_STORAGE_TYPE = "workflow.storage_type" # Type of storage used by the workflow # Session context (simplified) WORKFLOW_SESSION_ID = "workflow.session_id" # Session ID for the workflow execution + WORKFLOW_SESSION_NAME = "workflow.session_name" # Session name for the workflow WORKFLOW_USER_ID = "workflow.user_id" # User ID associated with the workflow WORKFLOW_APP_ID = "workflow.app_id" # Application ID associated with the workflow # Input metadata WORKFLOW_INPUT_PARAMETER_COUNT = "workflow.input.parameter_count" # Number of input parameters + WORKFLOW_INPUT_PARAMETER_KEYS = "workflow.input.parameter_keys" # Keys of input parameters WORKFLOW_METHOD_PARAMETER_COUNT = "workflow.method.parameter_count" # Number of method parameters WORKFLOW_METHOD_RETURN_TYPE = "workflow.method.return_type" # Return type of the workflow method # Output metadata (commonly used) WORKFLOW_OUTPUT_CONTENT_TYPE = "workflow.output.content_type" # Content type of the output + WORKFLOW_OUTPUT_EVENT = "workflow.output.event" # Event type of the output WORKFLOW_OUTPUT_MODEL = "workflow.output.model" # Model used for the output + WORKFLOW_OUTPUT_MODEL_PROVIDER = "workflow.output.model_provider" # Model provider for the output WORKFLOW_OUTPUT_MESSAGE_COUNT = "workflow.output.message_count" # Number of messages in output WORKFLOW_OUTPUT_TOOL_COUNT = "workflow.output.tool_count" # Number of tools in output WORKFLOW_OUTPUT_IS_STREAMING = "workflow.output.is_streaming" # Whether output is streaming @@ -51,3 +61,9 @@ class WorkflowAttributes: # Session-specific attributes (used by agno) WORKFLOW_SESSION_WORKFLOW_ID = "workflow.session.workflow_id" # Workflow ID in session context WORKFLOW_SESSION_USER_ID = "workflow.session.user_id" # User ID in session context + WORKFLOW_SESSION_STATE_KEYS = "workflow.session.state_keys" # Keys in session state + WORKFLOW_SESSION_STATE_SIZE = "workflow.session.state_size" # Size of session state + WORKFLOW_SESSION_STORAGE_TYPE = "workflow.session.storage_type" # Storage type for session + WORKFLOW_SESSION_RETURNED_SESSION_ID = "workflow.session.returned_session_id" # Returned session ID + WORKFLOW_SESSION_CREATED_AT = "workflow.session.created_at" # Session creation timestamp + WORKFLOW_SESSION_UPDATED_AT = "workflow.session.updated_at" # Session update timestamp