diff --git a/src/backend/app_kernel.py b/src/backend/app_kernel.py index 8a1afebe8..b300d41ba 100644 --- a/src/backend/app_kernel.py +++ b/src/backend/app_kernel.py @@ -1,6 +1,9 @@ # app_kernel.py import logging +from contextlib import asynccontextmanager + + from azure.monitor.opentelemetry import configure_azure_monitor from common.config.app_config import config from common.models.messages_kernel import UserLanguage @@ -16,6 +19,32 @@ # Azure monitoring # Semantic Kernel imports +from v3.config.agent_registry import agent_registry + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Manage FastAPI application lifecycle - startup and shutdown.""" + logger = logging.getLogger(__name__) + + # Startup + logger.info("๐Ÿš€ Starting MACAE application...") + yield + + # Shutdown + logger.info("๐Ÿ›‘ Shutting down MACAE application...") + try: + # Clean up all agents from Azure AI Foundry when container stops + await agent_registry.cleanup_all_agents() + logger.info("โœ… Agent cleanup completed successfully") + + except ImportError as ie: + logger.error(f"โŒ Could not import agent_registry: {ie}") + except Exception as e: + logger.error(f"โŒ Error during shutdown cleanup: {e}") + + logger.info("๐Ÿ‘‹ MACAE application shutdown complete") + # Check if the Application Insights Instrumentation Key is set in the environment variables connection_string = config.APPLICATIONINSIGHTS_CONNECTION_STRING @@ -46,7 +75,7 @@ ) # Initialize the FastAPI app -app = FastAPI() +app = FastAPI(lifespan=lifespan) frontend_url = config.FRONTEND_SITE_NAME diff --git a/src/backend/common/utils/utils_kernel.py b/src/backend/common/utils/utils_kernel.py index db4a19ea2..85fc2fac0 100644 --- a/src/backend/common/utils/utils_kernel.py +++ b/src/backend/common/utils/utils_kernel.py @@ -7,6 +7,8 @@ from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent from v3.magentic_agents.foundry_agent import FoundryAgentTemplate +from v3.config.agent_registry import agent_registry + logging.basicConfig(level=logging.INFO) # Cache for agent instances by session @@ -46,6 +48,12 @@ async def create_RAI_agent() -> FoundryAgentTemplate: ) await agent.open() + + try: + agent_registry.register_agent(agent) + + except Exception as registry_error: + logging.warning(f"Failed to register agent '{agent.agent_name}' with registry: {registry_error}") return agent diff --git a/src/backend/v3/config/agent_registry.py b/src/backend/v3/config/agent_registry.py new file mode 100644 index 000000000..564beb136 --- /dev/null +++ b/src/backend/v3/config/agent_registry.py @@ -0,0 +1,140 @@ +# Copyright (c) Microsoft. All rights reserved. +"""Global agent registry for tracking and managing agent lifecycles across the application.""" + +import asyncio +import logging +import threading +from typing import List, Dict, Any, Optional +from weakref import WeakSet + + +class AgentRegistry: + """Global registry for tracking and managing all agent instances across the application.""" + + def __init__(self): + self.logger = logging.getLogger(__name__) + self._lock = threading.Lock() + self._all_agents: WeakSet = WeakSet() + self._agent_metadata: Dict[int, Dict[str, Any]] = {} + + def register_agent(self, agent: Any, user_id: Optional[str] = None) -> None: + """Register an agent instance for tracking and lifecycle management.""" + with self._lock: + try: + self._all_agents.add(agent) + agent_id = id(agent) + self._agent_metadata[agent_id] = { + 'type': type(agent).__name__, + 'user_id': user_id, + 'name': getattr(agent, 'agent_name', getattr(agent, 'name', 'Unknown')) + } + self.logger.info(f"Registered agent: {type(agent).__name__} (ID: {agent_id}, User: {user_id})") + except Exception as e: + self.logger.error(f"Failed to register agent: {e}") + + def unregister_agent(self, agent: Any) -> None: + """Unregister an agent instance.""" + with self._lock: + try: + agent_id = id(agent) + self._all_agents.discard(agent) + if agent_id in self._agent_metadata: + metadata = self._agent_metadata.pop(agent_id) + self.logger.info(f"Unregistered agent: {metadata.get('type', 'Unknown')} (ID: {agent_id})") + except Exception as e: + self.logger.error(f"Failed to unregister agent: {e}") + + def get_all_agents(self) -> List[Any]: + """Get all currently registered agents.""" + with self._lock: + return list(self._all_agents) + + def get_agent_count(self) -> int: + """Get the total number of registered agents.""" + with self._lock: + return len(self._all_agents) + + async def cleanup_all_agents(self) -> None: + """Clean up all registered agents across all users.""" + all_agents = self.get_all_agents() + + if not all_agents: + self.logger.info("No agents to clean up") + return + + self.logger.info(f"๐Ÿงน Starting cleanup of {len(all_agents)} total agents") + + # Log agent details for debugging + for i, agent in enumerate(all_agents): + agent_name = getattr(agent, 'agent_name', getattr(agent, 'name', type(agent).__name__)) + agent_type = type(agent).__name__ + has_close = hasattr(agent, 'close') + self.logger.info(f"Agent {i + 1}: {agent_name} (Type: {agent_type}, Has close(): {has_close})") + + # Clean up agents concurrently + cleanup_tasks = [] + for agent in all_agents: + if hasattr(agent, 'close'): + cleanup_tasks.append(self._safe_close_agent(agent)) + else: + agent_name = getattr(agent, 'agent_name', getattr(agent, 'name', type(agent).__name__)) + self.logger.warning(f"โš ๏ธ Agent {agent_name} has no close() method - just unregistering from registry") + self.unregister_agent(agent) + + if cleanup_tasks: + self.logger.info(f"๐Ÿ”„ Executing {len(cleanup_tasks)} cleanup tasks...") + results = await asyncio.gather(*cleanup_tasks, return_exceptions=True) + + # Log any exceptions that occurred during cleanup + success_count = 0 + for i, result in enumerate(results): + if isinstance(result, Exception): + self.logger.error(f"โŒ Error cleaning up agent {i}: {result}") + else: + success_count += 1 + + self.logger.info(f"โœ… Successfully cleaned up {success_count}/{len(cleanup_tasks)} agents") + + # Clear all tracking + with self._lock: + self._all_agents.clear() + self._agent_metadata.clear() + + self.logger.info("๐ŸŽ‰ Completed cleanup of all agents") + + async def _safe_close_agent(self, agent: Any) -> None: + """Safely close an agent with error handling.""" + try: + agent_name = getattr(agent, 'agent_name', getattr(agent, 'name', type(agent).__name__)) + self.logger.info(f"Closing agent: {agent_name}") + + # Call the agent's close method - it should handle Azure deletion and registry cleanup + if asyncio.iscoroutinefunction(agent.close): + await agent.close() + else: + agent.close() + + self.logger.info(f"Successfully closed agent: {agent_name}") + + except Exception as e: + agent_name = getattr(agent, 'agent_name', getattr(agent, 'name', type(agent).__name__)) + self.logger.error(f"Failed to close agent {agent_name}: {e}") + + def get_registry_status(self) -> Dict[str, Any]: + """Get current status of the agent registry for debugging and monitoring.""" + with self._lock: + status = { + 'total_agents': len(self._all_agents), + 'agent_types': {} + } + + # Count agents by type + for agent in self._all_agents: + agent_type = type(agent).__name__ + status['agent_types'][agent_type] = status['agent_types'].get(agent_type, 0) + 1 + + return status + + +# Global registry instance +agent_registry = AgentRegistry() diff --git a/src/backend/v3/magentic_agents/common/lifecycle.py b/src/backend/v3/magentic_agents/common/lifecycle.py index 860dd5666..a9d91983c 100644 --- a/src/backend/v3/magentic_agents/common/lifecycle.py +++ b/src/backend/v3/magentic_agents/common/lifecycle.py @@ -6,6 +6,7 @@ from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent from semantic_kernel.connectors.mcp import MCPStreamableHttpPlugin from v3.magentic_agents.models.agent_models import MCPConfig +from v3.config.agent_registry import agent_registry class MCPEnabledBase: @@ -118,5 +119,32 @@ async def open(self) -> "AzureAgentBase": return self async def close(self) -> None: - await self.creds.close() + """ + Close the agent and clean up Azure AI Foundry resources. + This method deletes the agent from Azure AI Foundry and closes credentials. + """ + + try: + # Delete agent from Azure AI Foundry if we have the necessary information + if hasattr(self, '_agent') and self._agent and hasattr(self._agent, 'definition'): + agent_id = getattr(self._agent.definition, 'id', None) + + if agent_id and self.client: + try: + await self.client.agents.delete_agent(agent_id) + except Exception: + pass + # Unregister from agent registry + try: + agent_registry.unregister_agent(self) + except Exception: + pass + except Exception: + pass + # Always close credentials and parent resources + try: + if hasattr(self, 'creds') and self.creds: + await self.creds.close() + except Exception: + pass await super().close() diff --git a/src/backend/v3/magentic_agents/foundry_agent.py b/src/backend/v3/magentic_agents/foundry_agent.py index d1bab2336..85620861d 100644 --- a/src/backend/v3/magentic_agents/foundry_agent.py +++ b/src/backend/v3/magentic_agents/foundry_agent.py @@ -8,6 +8,8 @@ from v3.magentic_agents.common.lifecycle import AzureAgentBase from v3.magentic_agents.models.agent_models import MCPConfig, SearchConfig +from v3.config.agent_registry import agent_registry + # from v3.magentic_agents.models.agent_models import (BingConfig, MCPConfig, # SearchConfig) @@ -143,6 +145,15 @@ async def _after_open(self) -> None: # Try to get existing agent definition from Foundry definition = await self._get_azure_ai_agent_definition(self.agent_name) + + # Check if existing definition uses the same connection name + if definition is not None: + connection_compatible = await self._check_connection_compatibility(definition) + if not connection_compatible: + await self.client.agents.delete_agent(definition.id) + self.logger.info(f"Existing agent '{self.agent_name}' uses different connection. Creating new agent definition.") + definition = None + # If not found in Foundry, create a new one if definition is None: # Collect all tools @@ -171,6 +182,13 @@ async def _after_open(self) -> None: self.logger.error("Failed to create AzureAIAgent: %s", ex) raise + # Register agent with global registry for tracking and cleanup + try: + agent_registry.register_agent(self) + self.logger.info(f"๐Ÿ“ Registered agent '{self.agent_name}' with global registry") + except Exception as registry_error: + self.logger.warning(f"โš ๏ธ Failed to register agent '{self.agent_name}' with registry: {registry_error}") + # # After self._agent creation in _after_open: # # Diagnostics # try: @@ -205,9 +223,67 @@ async def fetch_run_details(self, thread_id: str, run_id: str): except Exception as ex: self.logger.error("Could not fetch run details: %s", ex) + async def _check_connection_compatibility(self, existing_definition) -> bool: + """ + Check if the existing agent definition uses the same connection name as the current configuration. + + Args: + existing_definition: The existing agent definition from Azure AI Foundry + + Returns: + bool: True if connections are compatible, False otherwise + """ + try: + # Check if we have search configuration to compare + if not self.search or not self.search.connection_name: + self.logger.info("No search configuration to compare") + return True + + # Get tool resources from existing definition + if not hasattr(existing_definition, 'tool_resources') or not existing_definition.tool_resources: + self.logger.info("Existing definition has no tool resources") + return not self.search.connection_name # Compatible if we also don't need search + + # Check Azure AI Search tool resources + azure_ai_search_resources = existing_definition.tool_resources.get('azure_ai_search', {}) + if not azure_ai_search_resources: + self.logger.info("Existing definition has no Azure AI Search resources") + return not self.search.connection_name # Compatible if we also don't need search + + # Get connection ID from existing definition + indexes = azure_ai_search_resources.get('indexes')[0] + existing_connection_id = indexes.get('index_connection_id') + if not existing_connection_id: + self.logger.info("Existing definition has no connection ID") + return False + + # Get the current connection to compare + try: + current_connection = await self.client.connections.get(name=self.search.connection_name) + current_connection_id = current_connection.id + + # Compare connection IDs + is_compatible = existing_connection_id == current_connection_id + + if is_compatible: + self.logger.info(f"Connection compatible: existing connection ID {existing_connection_id} matches current connection") + else: + self.logger.info(f"Connection mismatch: existing connection ID {existing_connection_id} != current connection ID {current_connection_id}") + + return is_compatible + + except Exception as conn_ex: + self.logger.error(f"Failed to get current connection '{self.search.connection_name}': {conn_ex}") + return False + + except Exception as ex: + self.logger.error(f"Error checking connection compatibility: {ex}") + return False + async def _get_azure_ai_agent_definition( self, agent_name: str ) -> Awaitable[Agent | None]: + """ Gets an Azure AI Agent with the specified name and instructions using AIProjectClient if it is already created. """ diff --git a/src/backend/v3/magentic_agents/reasoning_agent.py b/src/backend/v3/magentic_agents/reasoning_agent.py index 915756cab..b29b002ff 100644 --- a/src/backend/v3/magentic_agents/reasoning_agent.py +++ b/src/backend/v3/magentic_agents/reasoning_agent.py @@ -7,6 +7,7 @@ from v3.magentic_agents.common.lifecycle import MCPEnabledBase from v3.magentic_agents.models.agent_models import MCPConfig, SearchConfig from v3.magentic_agents.reasoning_search import ReasoningSearch +from v3.config.agent_registry import agent_registry class ReasoningAgentTemplate(MCPEnabledBase): @@ -71,6 +72,13 @@ async def _after_open(self) -> None: instructions=self.agent_instructions, ) + # Register agent with global registry for tracking and cleanup + try: + agent_registry.register_agent(self) + self.logger.info(f"๐Ÿ“ Registered agent '{self.agent_name}' with global registry") + except Exception as registry_error: + self.logger.warning(f"โš ๏ธ Failed to register agent '{self.agent_name}' with registry: {registry_error}") + async def invoke(self, message: str): """Invoke the agent with a message.""" if not self._agent: