Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion src/backend/app_kernel.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# app_kernel.py
import logging

from contextlib import asynccontextmanager


from azure.monitor.opentelemetry import configure_azure_monitor
from common.config.app_config import config
from common.models.messages_kernel import UserLanguage
Expand All @@ -16,6 +19,32 @@
# Azure monitoring

# Semantic Kernel imports
from v3.config.agent_registry import agent_registry


@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage FastAPI application lifecycle - startup and shutdown."""
logger = logging.getLogger(__name__)

# Startup
logger.info("🚀 Starting MACAE application...")
yield

# Shutdown
logger.info("🛑 Shutting down MACAE application...")
try:
# Clean up all agents from Azure AI Foundry when container stops
await agent_registry.cleanup_all_agents()
logger.info("✅ Agent cleanup completed successfully")

except ImportError as ie:
logger.error(f"❌ Could not import agent_registry: {ie}")
except Exception as e:
logger.error(f"❌ Error during shutdown cleanup: {e}")

logger.info("👋 MACAE application shutdown complete")


# Check if the Application Insights Instrumentation Key is set in the environment variables
connection_string = config.APPLICATIONINSIGHTS_CONNECTION_STRING
Expand Down Expand Up @@ -46,7 +75,7 @@
)

# Initialize the FastAPI app
app = FastAPI()
app = FastAPI(lifespan=lifespan)

frontend_url = config.FRONTEND_SITE_NAME

Expand Down
8 changes: 8 additions & 0 deletions src/backend/common/utils/utils_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent
from v3.magentic_agents.foundry_agent import FoundryAgentTemplate

from v3.config.agent_registry import agent_registry

logging.basicConfig(level=logging.INFO)

# Cache for agent instances by session
Expand Down Expand Up @@ -46,6 +48,12 @@ async def create_RAI_agent() -> FoundryAgentTemplate:
)

await agent.open()

try:
agent_registry.register_agent(agent)

except Exception as registry_error:
logging.warning(f"Failed to register agent '{agent.agent_name}' with registry: {registry_error}")
return agent


Expand Down
140 changes: 140 additions & 0 deletions src/backend/v3/config/agent_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Copyright (c) Microsoft. All rights reserved.
"""Global agent registry for tracking and managing agent lifecycles across the application."""

import asyncio
import logging
import threading
from typing import List, Dict, Any, Optional
from weakref import WeakSet


class AgentRegistry:
"""Global registry for tracking and managing all agent instances across the application."""

def __init__(self):
self.logger = logging.getLogger(__name__)
self._lock = threading.Lock()
self._all_agents: WeakSet = WeakSet()
self._agent_metadata: Dict[int, Dict[str, Any]] = {}

def register_agent(self, agent: Any, user_id: Optional[str] = None) -> None:
"""Register an agent instance for tracking and lifecycle management."""
with self._lock:
try:
self._all_agents.add(agent)
agent_id = id(agent)
self._agent_metadata[agent_id] = {
'type': type(agent).__name__,
'user_id': user_id,
'name': getattr(agent, 'agent_name', getattr(agent, 'name', 'Unknown'))
}
self.logger.info(f"Registered agent: {type(agent).__name__} (ID: {agent_id}, User: {user_id})")
except Exception as e:
self.logger.error(f"Failed to register agent: {e}")

def unregister_agent(self, agent: Any) -> None:
"""Unregister an agent instance."""
with self._lock:
try:
agent_id = id(agent)
self._all_agents.discard(agent)
if agent_id in self._agent_metadata:
metadata = self._agent_metadata.pop(agent_id)
self.logger.info(f"Unregistered agent: {metadata.get('type', 'Unknown')} (ID: {agent_id})")
except Exception as e:
self.logger.error(f"Failed to unregister agent: {e}")

def get_all_agents(self) -> List[Any]:
"""Get all currently registered agents."""
with self._lock:
return list(self._all_agents)

def get_agent_count(self) -> int:
"""Get the total number of registered agents."""
with self._lock:
return len(self._all_agents)

async def cleanup_all_agents(self) -> None:
"""Clean up all registered agents across all users."""
all_agents = self.get_all_agents()

if not all_agents:
self.logger.info("No agents to clean up")
return

self.logger.info(f"🧹 Starting cleanup of {len(all_agents)} total agents")

# Log agent details for debugging
for i, agent in enumerate(all_agents):
agent_name = getattr(agent, 'agent_name', getattr(agent, 'name', type(agent).__name__))
agent_type = type(agent).__name__
has_close = hasattr(agent, 'close')
self.logger.info(f"Agent {i + 1}: {agent_name} (Type: {agent_type}, Has close(): {has_close})")

# Clean up agents concurrently
cleanup_tasks = []
for agent in all_agents:
if hasattr(agent, 'close'):
cleanup_tasks.append(self._safe_close_agent(agent))
else:
agent_name = getattr(agent, 'agent_name', getattr(agent, 'name', type(agent).__name__))
self.logger.warning(f"⚠️ Agent {agent_name} has no close() method - just unregistering from registry")
self.unregister_agent(agent)

if cleanup_tasks:
self.logger.info(f"🔄 Executing {len(cleanup_tasks)} cleanup tasks...")
results = await asyncio.gather(*cleanup_tasks, return_exceptions=True)

# Log any exceptions that occurred during cleanup
success_count = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
self.logger.error(f"❌ Error cleaning up agent {i}: {result}")
else:
success_count += 1

self.logger.info(f"✅ Successfully cleaned up {success_count}/{len(cleanup_tasks)} agents")

# Clear all tracking
with self._lock:
self._all_agents.clear()
self._agent_metadata.clear()

self.logger.info("🎉 Completed cleanup of all agents")

async def _safe_close_agent(self, agent: Any) -> None:
"""Safely close an agent with error handling."""
try:
agent_name = getattr(agent, 'agent_name', getattr(agent, 'name', type(agent).__name__))
self.logger.info(f"Closing agent: {agent_name}")

# Call the agent's close method - it should handle Azure deletion and registry cleanup
if asyncio.iscoroutinefunction(agent.close):
await agent.close()
else:
agent.close()

self.logger.info(f"Successfully closed agent: {agent_name}")

except Exception as e:
agent_name = getattr(agent, 'agent_name', getattr(agent, 'name', type(agent).__name__))
self.logger.error(f"Failed to close agent {agent_name}: {e}")

def get_registry_status(self) -> Dict[str, Any]:
"""Get current status of the agent registry for debugging and monitoring."""
with self._lock:
status = {
'total_agents': len(self._all_agents),
'agent_types': {}
}

# Count agents by type
for agent in self._all_agents:
agent_type = type(agent).__name__
status['agent_types'][agent_type] = status['agent_types'].get(agent_type, 0) + 1

return status


# Global registry instance
agent_registry = AgentRegistry()
30 changes: 29 additions & 1 deletion src/backend/v3/magentic_agents/common/lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent
from semantic_kernel.connectors.mcp import MCPStreamableHttpPlugin
from v3.magentic_agents.models.agent_models import MCPConfig
from v3.config.agent_registry import agent_registry


class MCPEnabledBase:
Expand Down Expand Up @@ -118,5 +119,32 @@ async def open(self) -> "AzureAgentBase":
return self

async def close(self) -> None:
await self.creds.close()
"""
Close the agent and clean up Azure AI Foundry resources.
This method deletes the agent from Azure AI Foundry and closes credentials.
"""

try:
# Delete agent from Azure AI Foundry if we have the necessary information
if hasattr(self, '_agent') and self._agent and hasattr(self._agent, 'definition'):
agent_id = getattr(self._agent.definition, 'id', None)

if agent_id and self.client:
try:
await self.client.agents.delete_agent(agent_id)
except Exception:
pass
# Unregister from agent registry
try:
agent_registry.unregister_agent(self)
except Exception:
pass
except Exception:
pass
# Always close credentials and parent resources
try:
if hasattr(self, 'creds') and self.creds:
await self.creds.close()
except Exception:
pass
await super().close()
76 changes: 76 additions & 0 deletions src/backend/v3/magentic_agents/foundry_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from v3.magentic_agents.common.lifecycle import AzureAgentBase
from v3.magentic_agents.models.agent_models import MCPConfig, SearchConfig

from v3.config.agent_registry import agent_registry

# from v3.magentic_agents.models.agent_models import (BingConfig, MCPConfig,
# SearchConfig)

Expand Down Expand Up @@ -143,6 +145,15 @@ async def _after_open(self) -> None:

# Try to get existing agent definition from Foundry
definition = await self._get_azure_ai_agent_definition(self.agent_name)

# Check if existing definition uses the same connection name
if definition is not None:
connection_compatible = await self._check_connection_compatibility(definition)
if not connection_compatible:
await self.client.agents.delete_agent(definition.id)
self.logger.info(f"Existing agent '{self.agent_name}' uses different connection. Creating new agent definition.")
definition = None

# If not found in Foundry, create a new one
if definition is None:
# Collect all tools
Expand Down Expand Up @@ -171,6 +182,13 @@ async def _after_open(self) -> None:
self.logger.error("Failed to create AzureAIAgent: %s", ex)
raise

# Register agent with global registry for tracking and cleanup
try:
agent_registry.register_agent(self)
self.logger.info(f"📝 Registered agent '{self.agent_name}' with global registry")
except Exception as registry_error:
self.logger.warning(f"⚠️ Failed to register agent '{self.agent_name}' with registry: {registry_error}")

# # After self._agent creation in _after_open:
# # Diagnostics
# try:
Expand Down Expand Up @@ -205,9 +223,67 @@ async def fetch_run_details(self, thread_id: str, run_id: str):
except Exception as ex:
self.logger.error("Could not fetch run details: %s", ex)

async def _check_connection_compatibility(self, existing_definition) -> bool:
"""
Check if the existing agent definition uses the same connection name as the current configuration.

Args:
existing_definition: The existing agent definition from Azure AI Foundry

Returns:
bool: True if connections are compatible, False otherwise
"""
try:
# Check if we have search configuration to compare
if not self.search or not self.search.connection_name:
self.logger.info("No search configuration to compare")
return True

# Get tool resources from existing definition
if not hasattr(existing_definition, 'tool_resources') or not existing_definition.tool_resources:
self.logger.info("Existing definition has no tool resources")
return not self.search.connection_name # Compatible if we also don't need search

# Check Azure AI Search tool resources
azure_ai_search_resources = existing_definition.tool_resources.get('azure_ai_search', {})
if not azure_ai_search_resources:
self.logger.info("Existing definition has no Azure AI Search resources")
return not self.search.connection_name # Compatible if we also don't need search

# Get connection ID from existing definition
indexes = azure_ai_search_resources.get('indexes')[0]
existing_connection_id = indexes.get('index_connection_id')
if not existing_connection_id:
self.logger.info("Existing definition has no connection ID")
return False

# Get the current connection to compare
try:
current_connection = await self.client.connections.get(name=self.search.connection_name)
current_connection_id = current_connection.id

# Compare connection IDs
is_compatible = existing_connection_id == current_connection_id

if is_compatible:
self.logger.info(f"Connection compatible: existing connection ID {existing_connection_id} matches current connection")
else:
self.logger.info(f"Connection mismatch: existing connection ID {existing_connection_id} != current connection ID {current_connection_id}")

return is_compatible

except Exception as conn_ex:
self.logger.error(f"Failed to get current connection '{self.search.connection_name}': {conn_ex}")
return False

except Exception as ex:
self.logger.error(f"Error checking connection compatibility: {ex}")
return False

async def _get_azure_ai_agent_definition(
self, agent_name: str
) -> Awaitable[Agent | None]:

"""
Gets an Azure AI Agent with the specified name and instructions using AIProjectClient if it is already created.
"""
Expand Down
8 changes: 8 additions & 0 deletions src/backend/v3/magentic_agents/reasoning_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from v3.magentic_agents.common.lifecycle import MCPEnabledBase
from v3.magentic_agents.models.agent_models import MCPConfig, SearchConfig
from v3.magentic_agents.reasoning_search import ReasoningSearch
from v3.config.agent_registry import agent_registry


class ReasoningAgentTemplate(MCPEnabledBase):
Expand Down Expand Up @@ -71,6 +72,13 @@ async def _after_open(self) -> None:
instructions=self.agent_instructions,
)

# Register agent with global registry for tracking and cleanup
try:
agent_registry.register_agent(self)
self.logger.info(f"📝 Registered agent '{self.agent_name}' with global registry")
except Exception as registry_error:
self.logger.warning(f"⚠️ Failed to register agent '{self.agent_name}' with registry: {registry_error}")

async def invoke(self, message: str):
"""Invoke the agent with a message."""
if not self._agent:
Expand Down
Loading