Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 116 additions & 129 deletions agentops/instrumentation/agentic/agno/instrumentor.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,37 +86,8 @@ def clear_all(self) -> None:


# Methods to wrap for instrumentation
WRAPPED_METHODS: List[WrapConfig] = [
# Workflow session methods
WrapConfig(
trace_name="agno.workflow.session.load_session",
package="agno.workflow.workflow",
class_name="Workflow",
method_name="load_session",
handler=get_workflow_session_attributes,
),
WrapConfig(
trace_name="agno.workflow.session.new_session",
package="agno.workflow.workflow",
class_name="Workflow",
method_name="new_session",
handler=get_workflow_session_attributes,
),
WrapConfig(
trace_name="agno.workflow.session.read_from_storage",
package="agno.workflow.workflow",
class_name="Workflow",
method_name="read_from_storage",
handler=get_workflow_session_attributes,
),
WrapConfig(
trace_name="agno.workflow.session.write_to_storage",
package="agno.workflow.workflow",
class_name="Workflow",
method_name="write_to_storage",
handler=get_workflow_session_attributes,
),
]
# Empty list - all wrapping will be done in _custom_wrap to avoid circular imports
WRAPPED_METHODS: List[WrapConfig] = []


class StreamingResultWrapper:
Expand Down Expand Up @@ -917,113 +888,23 @@ class AgnoInstrumentor(CommonInstrumentor):

def __init__(self):
"""Initialize the Agno instrumentor."""
# Create instrumentor config
self._streaming_context_manager = StreamingContextManager()

# Create instrumentor config with populated wrapped methods
config = InstrumentorConfig(
library_name="agentops.instrumentation.agno",
library_version="0.1.0",
wrapped_methods=[], # We'll populate this in _get_wrapped_methods
wrapped_methods=self._get_initial_wrapped_methods(),
metrics_enabled=True,
dependencies=["agno >= 0.1.0"],
)

super().__init__(config)
self._streaming_context_manager = StreamingContextManager()

def _get_wrapped_methods(self) -> List[WrapConfig]:
"""Return list of methods to be wrapped."""
# Combine standard wrapped methods with custom streaming wraps
wrapped_methods = WRAPPED_METHODS.copy()

# Add streaming method configurations
wrapped_methods.extend(
[
# Streaming agent methods
WrapConfig(
trace_name="agno.agent.run.agent",
package="agno.agent",
class_name="Agent",
method_name="run",
handler=self._create_streaming_agent_wrapper,
),
WrapConfig(
trace_name="agno.agent.run.agent",
package="agno.agent",
class_name="Agent",
method_name="arun",
handler=self._create_streaming_agent_async_wrapper,
),
# Streaming workflow methods
WrapConfig(
trace_name="agno.workflow.run.workflow",
package="agno.workflow.workflow",
class_name="Workflow",
method_name="run_workflow",
handler=self._create_streaming_workflow_wrapper,
),
WrapConfig(
trace_name="agno.workflow.run.workflow",
package="agno.workflow.workflow",
class_name="Workflow",
method_name="arun_workflow",
handler=self._create_streaming_workflow_async_wrapper,
),
# Streaming tool execution
WrapConfig(
trace_name="agno.tool.execute.tool_usage",
package="agno.tools.function",
class_name="FunctionCall",
method_name="execute",
handler=self._create_streaming_tool_wrapper,
),
# Metrics wrapper
WrapConfig(
trace_name="agno.agent.metrics",
package="agno.agent",
class_name="Agent",
method_name="_set_session_metrics",
handler=self._create_metrics_wrapper,
),
# Team methods
WrapConfig(
trace_name="agno.team.run.agent",
package="agno.team.team",
class_name="Team",
method_name="run",
handler=self._create_team_wrapper,
),
WrapConfig(
trace_name="agno.team.run.agent",
package="agno.team.team",
class_name="Team",
method_name="arun",
handler=self._create_team_async_wrapper,
),
WrapConfig(
trace_name="agno.team.run.agent",
package="agno.team.team",
class_name="Team",
method_name="print_response",
handler=self._create_team_wrapper,
),
# Team internal methods with special handling
WrapConfig(
trace_name="agno.team.run.workflow",
package="agno.team.team",
class_name="Team",
method_name="_run",
handler=self._create_team_internal_wrapper,
),
WrapConfig(
trace_name="agno.team.run.workflow",
package="agno.team.team",
class_name="Team",
method_name="_arun",
handler=self._create_team_internal_async_wrapper,
),
]
)

return wrapped_methods
def _get_initial_wrapped_methods(self) -> List[WrapConfig]:
"""Return list of methods to be wrapped during initialization."""
# Only return the standard wrapped methods that don't need custom wrappers
return WRAPPED_METHODS.copy()

def _create_metrics(self, meter: Meter) -> Dict[str, Any]:
"""Create metrics for the instrumentor.
Expand All @@ -1036,6 +917,112 @@ def _create_metrics(self, meter: Meter) -> Dict[str, Any]:
def _initialize(self, **kwargs):
"""Perform custom initialization."""
logger.info("Agno instrumentation installed successfully")
# Schedule wrapping to happen after imports are complete
import threading

threading.Timer(0.1, self._delayed_wrap).start()

def _delayed_wrap(self):
"""Perform wrapping after a delay to avoid circular imports."""
try:
self._perform_wrapping()
except Exception as e:
logger.error(f"Failed to perform delayed wrapping: {e}")

def _custom_wrap(self, **kwargs):
"""Skip custom wrapping during initialization - it will be done in _delayed_wrap."""
pass

def _perform_wrapping(self):
"""Actually perform the wrapping - called after imports are complete."""
if not self._tracer:
logger.debug("No tracer available for Agno wrapping")
return

from agentops.instrumentation.common.wrappers import wrap_function_wrapper, WrapConfig, wrap

# Import Agno modules now that they should be fully loaded
try:
import agno.agent
import agno.workflow.workflow
import agno.tools.function
import agno.team.team # Noqa: F401
except ImportError as e:
logger.error(f"Failed to import Agno modules for wrapping: {e}")
return

# First wrap the standard workflow session methods using the standard wrapper
session_methods = [
WrapConfig(
trace_name="agno.workflow.session.load_session",
package="agno.workflow.workflow",
class_name="Workflow",
method_name="load_session",
handler=get_workflow_session_attributes,
),
WrapConfig(
trace_name="agno.workflow.session.new_session",
package="agno.workflow.workflow",
class_name="Workflow",
method_name="new_session",
handler=get_workflow_session_attributes,
),
WrapConfig(
trace_name="agno.workflow.session.read_from_storage",
package="agno.workflow.workflow",
class_name="Workflow",
method_name="read_from_storage",
handler=get_workflow_session_attributes,
),
WrapConfig(
trace_name="agno.workflow.session.write_to_storage",
package="agno.workflow.workflow",
class_name="Workflow",
method_name="write_to_storage",
handler=get_workflow_session_attributes,
),
]

wrapped_count = 0
for wrap_config in session_methods:
try:
wrap(wrap_config, self._tracer)
wrapped_count += 1
except Exception as e:
logger.debug(f"Failed to wrap {wrap_config}: {e}")

# Now wrap the streaming methods that need custom wrappers
streaming_methods = [
# Streaming agent methods
("agno.agent", "Agent.run", self._create_streaming_agent_wrapper()),
("agno.agent", "Agent.arun", self._create_streaming_agent_async_wrapper()),
# Streaming workflow methods
("agno.workflow.workflow", "Workflow.run_workflow", self._create_streaming_workflow_wrapper()),
("agno.workflow.workflow", "Workflow.arun_workflow", self._create_streaming_workflow_async_wrapper()),
# Streaming tool execution
("agno.tools.function", "FunctionCall.execute", self._create_streaming_tool_wrapper()),
# Metrics wrapper
("agno.agent", "Agent._set_session_metrics", self._create_metrics_wrapper()),
# Team methods
("agno.team.team", "Team.run", self._create_team_wrapper()),
("agno.team.team", "Team.arun", self._create_team_async_wrapper()),
("agno.team.team", "Team.print_response", self._create_team_wrapper()),
# Team internal methods with special handling
("agno.team.team", "Team._run", self._create_team_internal_wrapper()),
("agno.team.team", "Team._arun", self._create_team_internal_async_wrapper()),
]

for package, method, wrapper in streaming_methods:
try:
wrap_function_wrapper(package, method, wrapper)
wrapped_count += 1
except Exception as e:
logger.debug(f"Failed to wrap {package}.{method}: {e}")

if wrapped_count > 0:
logger.info(f"Agno instrumentation: Successfully wrapped {wrapped_count} methods")
else:
logger.warning("Agno instrumentation: No methods were successfully wrapped")

def _custom_unwrap(self, **kwargs):
"""Perform custom unwrapping."""
Expand Down
Loading