Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion agentops/instrumentation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class InstrumentorConfig(TypedDict):
"agno": {
"module_name": "agentops.instrumentation.agentic.agno",
"class_name": "AgnoInstrumentor",
"min_version": "0.1.0",
"min_version": "1.5.8",
},
"smolagents": {
"module_name": "agentops.instrumentation.agentic.smolagents",
Expand Down
2 changes: 1 addition & 1 deletion agentops/instrumentation/agentic/agno/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
logger = logging.getLogger(__name__)

# Library information
_library_info = LibraryInfo(name="agno")
_library_info = LibraryInfo(name="agno", default_version="1.5.8")
LIBRARY_NAME = _library_info.name
LIBRARY_VERSION = _library_info.version

Expand Down
10 changes: 8 additions & 2 deletions agentops/instrumentation/agentic/agno/attributes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
"""Agno Agent attributes package for span instrumentation."""
"""Agno instrumentation attribute handlers."""

from .agent import get_agent_run_attributes
from .metrics import get_metrics_attributes
from .team import get_team_run_attributes
from .tool import get_tool_execution_attributes
from .workflow import get_workflow_run_attributes, get_workflow_session_attributes
from .workflow import get_workflow_run_attributes, get_workflow_session_attributes, get_workflow_cache_attributes
from .storage import get_storage_read_attributes, get_storage_write_attributes

__all__ = [
"get_agent_run_attributes",
"get_metrics_attributes",
"get_team_run_attributes",
"get_tool_execution_attributes",
"get_workflow_run_attributes",
"get_workflow_session_attributes",
"get_workflow_cache_attributes",
"get_storage_read_attributes",
"get_storage_write_attributes",
]
226 changes: 93 additions & 133 deletions agentops/instrumentation/agentic/agno/attributes/agent.py

Large diffs are not rendered by default.

27 changes: 5 additions & 22 deletions agentops/instrumentation/agentic/agno/attributes/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def get_metrics_attributes(
attributes[SpanAttributes.LLM_SYSTEM] = "agno"
attributes[SpanAttributes.AGENTOPS_ENTITY_NAME] = "LLM"

# Initialize usage tracking variables (but don't set attributes yet)
# Initialize usage tracking variables
usage_data = {}

# Initialize counters for indexed messages
Expand Down Expand Up @@ -66,7 +66,6 @@ def get_metrics_attributes(
model_class = model.__class__.__name__
attributes["agno.model.class"] = model_class

# === EXTRACT CONVERSATION STRUCTURE ===
if hasattr(run_messages, "messages") and run_messages.messages:
messages = run_messages.messages

Expand All @@ -82,12 +81,10 @@ def get_metrics_attributes(
for i, msg in enumerate(messages):
# Extract message content for prompts/completions
if hasattr(msg, "role") and hasattr(msg, "content"):
# Only set content if it's not None/empty
# Only process messages with actual content
if msg.content is not None and str(msg.content).strip() != "" and str(msg.content) != "None":
content = str(msg.content)
# Truncate very long content to avoid oversized attributes
if len(content) > 1000:
content = content[:997] + "..."
# No truncation - keep full content for observability

if msg.role == "user":
attributes[f"{SpanAttributes.LLM_PROMPTS}.{prompt_count}.role"] = "user"
Expand All @@ -101,17 +98,6 @@ def get_metrics_attributes(
attributes[f"{SpanAttributes.LLM_PROMPTS}.{prompt_count}.role"] = "system"
attributes[f"{SpanAttributes.LLM_PROMPTS}.{prompt_count}.content"] = content
prompt_count += 1
else:
# For messages with None content, still set the role but skip content
if msg.role == "user":
attributes[f"{SpanAttributes.LLM_PROMPTS}.{prompt_count}.role"] = "user"
prompt_count += 1
elif msg.role == "assistant":
attributes[f"{SpanAttributes.LLM_COMPLETIONS}.{completion_count}.role"] = "assistant"
completion_count += 1
elif msg.role == "system":
attributes[f"{SpanAttributes.LLM_PROMPTS}.{prompt_count}.role"] = "system"
prompt_count += 1

# Extract token metrics from message
if hasattr(msg, "metrics") and msg.metrics:
Expand All @@ -124,15 +110,15 @@ def get_metrics_attributes(
total_completion_tokens += metrics.completion_tokens
if hasattr(metrics, "total_tokens") and metrics.total_tokens > 0:
total_tokens += metrics.total_tokens
# For messages that only have output_tokens (like Anthropic)
# For messages that only have output_tokens
if hasattr(metrics, "output_tokens") and metrics.output_tokens > 0:
total_output_tokens += metrics.output_tokens
if hasattr(metrics, "input_tokens") and metrics.input_tokens > 0:
total_input_tokens += metrics.input_tokens
if hasattr(metrics, "time") and metrics.time:
total_time += metrics.time

# === TOKEN METRICS FROM AGENT SESSION METRICS ===
# Token metrics from agent session metrics
if hasattr(agent, "session_metrics") and agent.session_metrics:
session_metrics = agent.session_metrics

Expand Down Expand Up @@ -191,7 +177,6 @@ def get_metrics_attributes(
if hasattr(session_metrics, "reasoning_tokens") and session_metrics.reasoning_tokens > 0:
usage_data["reasoning_tokens"] = session_metrics.reasoning_tokens

# === FALLBACK TO MESSAGE AGGREGATION IF SESSION METRICS ARE EMPTY ===
# If we don't have token data from session metrics, try message aggregation
if "total_tokens" not in usage_data:
# Set aggregated token usage from messages
Expand All @@ -207,8 +192,6 @@ def get_metrics_attributes(
user_msg = run_messages.user_message
if hasattr(user_msg, "content"):
content = str(user_msg.content)
if len(content) > 1000:
content = content[:997] + "..."
attributes["agno.metrics.user_input"] = content

# Set individual LLM usage attributes only for values we actually have
Expand Down
158 changes: 158 additions & 0 deletions agentops/instrumentation/agentic/agno/attributes/storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
"""Storage operation attribute handlers for Agno workflow instrumentation."""

import json
from typing import Any, Dict, Optional, Tuple
from opentelemetry.util.types import AttributeValue

from agentops.semconv.span_attributes import SpanAttributes
from agentops.semconv.span_kinds import SpanKind as AgentOpsSpanKind
from agentops.instrumentation.common.attributes import get_common_attributes


def get_storage_read_attributes(
args: Tuple[Any, ...] = (),
kwargs: Optional[Dict[str, Any]] = None,
return_value: Optional[Any] = None,
) -> Dict[str, AttributeValue]:
"""Extract attributes from storage read operations.

Args:
args: Positional arguments passed to read_from_storage
kwargs: Keyword arguments passed to read_from_storage
return_value: Return value from read_from_storage (the cached data or None)

Returns:
Dictionary of OpenTelemetry attributes for storage read operations
"""
attributes = get_common_attributes()
kwargs = kwargs or {}

# Mark this as a storage operation within workflow context
attributes["storage.operation"] = "read"
attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = AgentOpsSpanKind.WORKFLOW

if args and len(args) > 0:
workflow = args[0]

# Get workflow information
if hasattr(workflow, "workflow_id") and workflow.workflow_id:
attributes["storage.workflow_id"] = str(workflow.workflow_id)
if hasattr(workflow, "session_id") and workflow.session_id:
attributes["storage.session_id"] = str(workflow.session_id)

# Get storage type
if hasattr(workflow, "storage") and workflow.storage:
storage_type = type(workflow.storage).__name__
attributes["storage.backend"] = storage_type

# Get session state info for context
if hasattr(workflow, "session_state") and isinstance(workflow.session_state, dict):
# Get all cache keys
cache_keys = list(workflow.session_state.keys())
attributes["storage.cache_size"] = len(cache_keys)
if cache_keys:
attributes["storage.cache_keys"] = json.dumps(cache_keys)

# Analyze the return value to determine cache hit/miss
if return_value is not None:
# Cache hit
attributes["storage.cache_hit"] = True
attributes["storage.result"] = "hit"

# Get data type and size
data_type = type(return_value).__name__
attributes["storage.data_type"] = data_type

# For dict/list, show structure
if isinstance(return_value, dict):
attributes["storage.data_keys"] = json.dumps(list(return_value.keys()))
attributes["storage.data_size"] = len(return_value)
elif isinstance(return_value, (list, tuple)):
attributes["storage.data_size"] = len(return_value)
elif isinstance(return_value, str):
attributes["storage.data_size"] = len(return_value)
# Show full string data without truncation
attributes["storage.data_preview"] = return_value
else:
# Cache miss
attributes["storage.cache_hit"] = False
attributes["storage.result"] = "miss"

return attributes


def get_storage_write_attributes(
args: Tuple[Any, ...] = (),
kwargs: Optional[Dict[str, Any]] = None,
return_value: Optional[Any] = None,
) -> Dict[str, AttributeValue]:
"""Extract attributes from storage write operations.

Args:
args: Positional arguments passed to write_to_storage
kwargs: Keyword arguments passed to write_to_storage
return_value: Return value from write_to_storage (usually None or success indicator)

Returns:
Dictionary of OpenTelemetry attributes for storage write operations
"""
attributes = get_common_attributes()
kwargs = kwargs or {}

# Mark this as a storage operation within workflow context
attributes["storage.operation"] = "write"
attributes[SpanAttributes.AGENTOPS_SPAN_KIND] = AgentOpsSpanKind.WORKFLOW

if args and len(args) > 0:
workflow = args[0]

# Get workflow information
if hasattr(workflow, "workflow_id") and workflow.workflow_id:
attributes["storage.workflow_id"] = str(workflow.workflow_id)
if hasattr(workflow, "session_id") and workflow.session_id:
attributes["storage.session_id"] = str(workflow.session_id)

# Get storage type
if hasattr(workflow, "storage") and workflow.storage:
storage_type = type(workflow.storage).__name__
attributes["storage.backend"] = storage_type

# Get session state info to see what's being written
if hasattr(workflow, "session_state") and isinstance(workflow.session_state, dict):
# Get cache state after write
cache_keys = list(workflow.session_state.keys())
attributes["storage.cache_size"] = len(cache_keys)
if cache_keys:
attributes["storage.cache_keys"] = json.dumps(cache_keys)

# Try to identify what was written (the newest/changed data)
# This is a heuristic - in practice you might need to track state changes
if cache_keys:
# Show the last key as likely the one just written
last_key = cache_keys[-1]
attributes["storage.written_key"] = last_key

# Get value preview
value = workflow.session_state.get(last_key)
if value is not None:
value_type = type(value).__name__
attributes["storage.written_value_type"] = value_type

if isinstance(value, str):
if len(value) > 100:
attributes["storage.written_value_preview"] = value[:100] + "..."
else:
attributes["storage.written_value_preview"] = value
attributes["storage.written_value_size"] = len(value)
elif isinstance(value, (dict, list)):
attributes["storage.written_value_size"] = len(value)
attributes["storage.written_value_preview"] = f"{value_type} with {len(value)} items"

# Check write result
if return_value is not None:
attributes["storage.write_success"] = True
else:
# Most storage writes return None on success, so this is normal
attributes["storage.write_success"] = True

return attributes
Loading
Loading