diff --git a/agentops/instrumentation/agentic/agno/instrumentor.py b/agentops/instrumentation/agentic/agno/instrumentor.py index 1f5fab321..8c7cdd9ff 100644 --- a/agentops/instrumentation/agentic/agno/instrumentor.py +++ b/agentops/instrumentation/agentic/agno/instrumentor.py @@ -86,37 +86,8 @@ def clear_all(self) -> None: # Methods to wrap for instrumentation -WRAPPED_METHODS: List[WrapConfig] = [ - # Workflow session methods - WrapConfig( - trace_name="agno.workflow.session.load_session", - package="agno.workflow.workflow", - class_name="Workflow", - method_name="load_session", - handler=get_workflow_session_attributes, - ), - WrapConfig( - trace_name="agno.workflow.session.new_session", - package="agno.workflow.workflow", - class_name="Workflow", - method_name="new_session", - handler=get_workflow_session_attributes, - ), - WrapConfig( - trace_name="agno.workflow.session.read_from_storage", - package="agno.workflow.workflow", - class_name="Workflow", - method_name="read_from_storage", - handler=get_workflow_session_attributes, - ), - WrapConfig( - trace_name="agno.workflow.session.write_to_storage", - package="agno.workflow.workflow", - class_name="Workflow", - method_name="write_to_storage", - handler=get_workflow_session_attributes, - ), -] +# Empty list - all wrapping will be done in _custom_wrap to avoid circular imports +WRAPPED_METHODS: List[WrapConfig] = [] class StreamingResultWrapper: @@ -917,113 +888,23 @@ class AgnoInstrumentor(CommonInstrumentor): def __init__(self): """Initialize the Agno instrumentor.""" - # Create instrumentor config + self._streaming_context_manager = StreamingContextManager() + + # Create instrumentor config with populated wrapped methods config = InstrumentorConfig( library_name="agentops.instrumentation.agno", library_version="0.1.0", - wrapped_methods=[], # We'll populate this in _get_wrapped_methods + wrapped_methods=self._get_initial_wrapped_methods(), metrics_enabled=True, dependencies=["agno >= 0.1.0"], ) super().__init__(config) - self._streaming_context_manager = StreamingContextManager() - - def _get_wrapped_methods(self) -> List[WrapConfig]: - """Return list of methods to be wrapped.""" - # Combine standard wrapped methods with custom streaming wraps - wrapped_methods = WRAPPED_METHODS.copy() - - # Add streaming method configurations - wrapped_methods.extend( - [ - # Streaming agent methods - WrapConfig( - trace_name="agno.agent.run.agent", - package="agno.agent", - class_name="Agent", - method_name="run", - handler=self._create_streaming_agent_wrapper, - ), - WrapConfig( - trace_name="agno.agent.run.agent", - package="agno.agent", - class_name="Agent", - method_name="arun", - handler=self._create_streaming_agent_async_wrapper, - ), - # Streaming workflow methods - WrapConfig( - trace_name="agno.workflow.run.workflow", - package="agno.workflow.workflow", - class_name="Workflow", - method_name="run_workflow", - handler=self._create_streaming_workflow_wrapper, - ), - WrapConfig( - trace_name="agno.workflow.run.workflow", - package="agno.workflow.workflow", - class_name="Workflow", - method_name="arun_workflow", - handler=self._create_streaming_workflow_async_wrapper, - ), - # Streaming tool execution - WrapConfig( - trace_name="agno.tool.execute.tool_usage", - package="agno.tools.function", - class_name="FunctionCall", - method_name="execute", - handler=self._create_streaming_tool_wrapper, - ), - # Metrics wrapper - WrapConfig( - trace_name="agno.agent.metrics", - package="agno.agent", - class_name="Agent", - method_name="_set_session_metrics", - handler=self._create_metrics_wrapper, - ), - # Team methods - WrapConfig( - trace_name="agno.team.run.agent", - package="agno.team.team", - class_name="Team", - method_name="run", - handler=self._create_team_wrapper, - ), - WrapConfig( - trace_name="agno.team.run.agent", - package="agno.team.team", - class_name="Team", - method_name="arun", - handler=self._create_team_async_wrapper, - ), - WrapConfig( - trace_name="agno.team.run.agent", - package="agno.team.team", - class_name="Team", - method_name="print_response", - handler=self._create_team_wrapper, - ), - # Team internal methods with special handling - WrapConfig( - trace_name="agno.team.run.workflow", - package="agno.team.team", - class_name="Team", - method_name="_run", - handler=self._create_team_internal_wrapper, - ), - WrapConfig( - trace_name="agno.team.run.workflow", - package="agno.team.team", - class_name="Team", - method_name="_arun", - handler=self._create_team_internal_async_wrapper, - ), - ] - ) - return wrapped_methods + def _get_initial_wrapped_methods(self) -> List[WrapConfig]: + """Return list of methods to be wrapped during initialization.""" + # Only return the standard wrapped methods that don't need custom wrappers + return WRAPPED_METHODS.copy() def _create_metrics(self, meter: Meter) -> Dict[str, Any]: """Create metrics for the instrumentor. @@ -1036,6 +917,112 @@ def _create_metrics(self, meter: Meter) -> Dict[str, Any]: def _initialize(self, **kwargs): """Perform custom initialization.""" logger.info("Agno instrumentation installed successfully") + # Schedule wrapping to happen after imports are complete + import threading + + threading.Timer(0.1, self._delayed_wrap).start() + + def _delayed_wrap(self): + """Perform wrapping after a delay to avoid circular imports.""" + try: + self._perform_wrapping() + except Exception as e: + logger.error(f"Failed to perform delayed wrapping: {e}") + + def _custom_wrap(self, **kwargs): + """Skip custom wrapping during initialization - it will be done in _delayed_wrap.""" + pass + + def _perform_wrapping(self): + """Actually perform the wrapping - called after imports are complete.""" + if not self._tracer: + logger.debug("No tracer available for Agno wrapping") + return + + from agentops.instrumentation.common.wrappers import wrap_function_wrapper, WrapConfig, wrap + + # Import Agno modules now that they should be fully loaded + try: + import agno.agent + import agno.workflow.workflow + import agno.tools.function + import agno.team.team # Noqa: F401 + except ImportError as e: + logger.error(f"Failed to import Agno modules for wrapping: {e}") + return + + # First wrap the standard workflow session methods using the standard wrapper + session_methods = [ + WrapConfig( + trace_name="agno.workflow.session.load_session", + package="agno.workflow.workflow", + class_name="Workflow", + method_name="load_session", + handler=get_workflow_session_attributes, + ), + WrapConfig( + trace_name="agno.workflow.session.new_session", + package="agno.workflow.workflow", + class_name="Workflow", + method_name="new_session", + handler=get_workflow_session_attributes, + ), + WrapConfig( + trace_name="agno.workflow.session.read_from_storage", + package="agno.workflow.workflow", + class_name="Workflow", + method_name="read_from_storage", + handler=get_workflow_session_attributes, + ), + WrapConfig( + trace_name="agno.workflow.session.write_to_storage", + package="agno.workflow.workflow", + class_name="Workflow", + method_name="write_to_storage", + handler=get_workflow_session_attributes, + ), + ] + + wrapped_count = 0 + for wrap_config in session_methods: + try: + wrap(wrap_config, self._tracer) + wrapped_count += 1 + except Exception as e: + logger.debug(f"Failed to wrap {wrap_config}: {e}") + + # Now wrap the streaming methods that need custom wrappers + streaming_methods = [ + # Streaming agent methods + ("agno.agent", "Agent.run", self._create_streaming_agent_wrapper()), + ("agno.agent", "Agent.arun", self._create_streaming_agent_async_wrapper()), + # Streaming workflow methods + ("agno.workflow.workflow", "Workflow.run_workflow", self._create_streaming_workflow_wrapper()), + ("agno.workflow.workflow", "Workflow.arun_workflow", self._create_streaming_workflow_async_wrapper()), + # Streaming tool execution + ("agno.tools.function", "FunctionCall.execute", self._create_streaming_tool_wrapper()), + # Metrics wrapper + ("agno.agent", "Agent._set_session_metrics", self._create_metrics_wrapper()), + # Team methods + ("agno.team.team", "Team.run", self._create_team_wrapper()), + ("agno.team.team", "Team.arun", self._create_team_async_wrapper()), + ("agno.team.team", "Team.print_response", self._create_team_wrapper()), + # Team internal methods with special handling + ("agno.team.team", "Team._run", self._create_team_internal_wrapper()), + ("agno.team.team", "Team._arun", self._create_team_internal_async_wrapper()), + ] + + for package, method, wrapper in streaming_methods: + try: + wrap_function_wrapper(package, method, wrapper) + wrapped_count += 1 + except Exception as e: + logger.debug(f"Failed to wrap {package}.{method}: {e}") + + if wrapped_count > 0: + logger.info(f"Agno instrumentation: Successfully wrapped {wrapped_count} methods") + else: + logger.warning("Agno instrumentation: No methods were successfully wrapped") def _custom_unwrap(self, **kwargs): """Perform custom unwrapping."""