Skip to content

Commit ddf8c44

Browse files
Merge pull request microsoft#554 from microsoft/psl-bug-expissue
fix: added the lifespan of the agent on container stop and checked the already existed agents are not using the previous resource group connection
2 parents d8d6248 + cc6eeee commit ddf8c44

File tree

6 files changed

+291
-2
lines changed

6 files changed

+291
-2
lines changed

src/backend/app_kernel.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
# app_kernel.py
22
import logging
33

4+
from contextlib import asynccontextmanager
5+
6+
47
from azure.monitor.opentelemetry import configure_azure_monitor
58
from common.config.app_config import config
69
from common.models.messages_kernel import UserLanguage
@@ -16,6 +19,32 @@
1619
# Azure monitoring
1720

1821
# Semantic Kernel imports
22+
from v3.config.agent_registry import agent_registry
23+
24+
25+
@asynccontextmanager
26+
async def lifespan(app: FastAPI):
27+
"""Manage FastAPI application lifecycle - startup and shutdown."""
28+
logger = logging.getLogger(__name__)
29+
30+
# Startup
31+
logger.info("🚀 Starting MACAE application...")
32+
yield
33+
34+
# Shutdown
35+
logger.info("🛑 Shutting down MACAE application...")
36+
try:
37+
# Clean up all agents from Azure AI Foundry when container stops
38+
await agent_registry.cleanup_all_agents()
39+
logger.info("✅ Agent cleanup completed successfully")
40+
41+
except ImportError as ie:
42+
logger.error(f"❌ Could not import agent_registry: {ie}")
43+
except Exception as e:
44+
logger.error(f"❌ Error during shutdown cleanup: {e}")
45+
46+
logger.info("👋 MACAE application shutdown complete")
47+
1948

2049
# Check if the Application Insights Instrumentation Key is set in the environment variables
2150
connection_string = config.APPLICATIONINSIGHTS_CONNECTION_STRING
@@ -46,7 +75,7 @@
4675
)
4776

4877
# Initialize the FastAPI app
49-
app = FastAPI()
78+
app = FastAPI(lifespan=lifespan)
5079

5180
frontend_url = config.FRONTEND_SITE_NAME
5281

src/backend/common/utils/utils_kernel.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent
88
from v3.magentic_agents.foundry_agent import FoundryAgentTemplate
99

10+
from v3.config.agent_registry import agent_registry
11+
1012
logging.basicConfig(level=logging.INFO)
1113

1214
# Cache for agent instances by session
@@ -46,6 +48,12 @@ async def create_RAI_agent() -> FoundryAgentTemplate:
4648
)
4749

4850
await agent.open()
51+
52+
try:
53+
agent_registry.register_agent(agent)
54+
55+
except Exception as registry_error:
56+
logging.warning(f"Failed to register agent '{agent.agent_name}' with registry: {registry_error}")
4957
return agent
5058

5159

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
# Copyright (c) Microsoft. All rights reserved.
2+
"""Global agent registry for tracking and managing agent lifecycles across the application."""
3+
4+
import asyncio
5+
import logging
6+
import threading
7+
from typing import List, Dict, Any, Optional
8+
from weakref import WeakSet
9+
10+
11+
class AgentRegistry:
12+
"""Global registry for tracking and managing all agent instances across the application."""
13+
14+
def __init__(self):
15+
self.logger = logging.getLogger(__name__)
16+
self._lock = threading.Lock()
17+
self._all_agents: WeakSet = WeakSet()
18+
self._agent_metadata: Dict[int, Dict[str, Any]] = {}
19+
20+
def register_agent(self, agent: Any, user_id: Optional[str] = None) -> None:
21+
"""Register an agent instance for tracking and lifecycle management."""
22+
with self._lock:
23+
try:
24+
self._all_agents.add(agent)
25+
agent_id = id(agent)
26+
self._agent_metadata[agent_id] = {
27+
'type': type(agent).__name__,
28+
'user_id': user_id,
29+
'name': getattr(agent, 'agent_name', getattr(agent, 'name', 'Unknown'))
30+
}
31+
self.logger.info(f"Registered agent: {type(agent).__name__} (ID: {agent_id}, User: {user_id})")
32+
except Exception as e:
33+
self.logger.error(f"Failed to register agent: {e}")
34+
35+
def unregister_agent(self, agent: Any) -> None:
36+
"""Unregister an agent instance."""
37+
with self._lock:
38+
try:
39+
agent_id = id(agent)
40+
self._all_agents.discard(agent)
41+
if agent_id in self._agent_metadata:
42+
metadata = self._agent_metadata.pop(agent_id)
43+
self.logger.info(f"Unregistered agent: {metadata.get('type', 'Unknown')} (ID: {agent_id})")
44+
except Exception as e:
45+
self.logger.error(f"Failed to unregister agent: {e}")
46+
47+
def get_all_agents(self) -> List[Any]:
48+
"""Get all currently registered agents."""
49+
with self._lock:
50+
return list(self._all_agents)
51+
52+
def get_agent_count(self) -> int:
53+
"""Get the total number of registered agents."""
54+
with self._lock:
55+
return len(self._all_agents)
56+
57+
async def cleanup_all_agents(self) -> None:
58+
"""Clean up all registered agents across all users."""
59+
all_agents = self.get_all_agents()
60+
61+
if not all_agents:
62+
self.logger.info("No agents to clean up")
63+
return
64+
65+
self.logger.info(f"🧹 Starting cleanup of {len(all_agents)} total agents")
66+
67+
# Log agent details for debugging
68+
for i, agent in enumerate(all_agents):
69+
agent_name = getattr(agent, 'agent_name', getattr(agent, 'name', type(agent).__name__))
70+
agent_type = type(agent).__name__
71+
has_close = hasattr(agent, 'close')
72+
self.logger.info(f"Agent {i + 1}: {agent_name} (Type: {agent_type}, Has close(): {has_close})")
73+
74+
# Clean up agents concurrently
75+
cleanup_tasks = []
76+
for agent in all_agents:
77+
if hasattr(agent, 'close'):
78+
cleanup_tasks.append(self._safe_close_agent(agent))
79+
else:
80+
agent_name = getattr(agent, 'agent_name', getattr(agent, 'name', type(agent).__name__))
81+
self.logger.warning(f"⚠️ Agent {agent_name} has no close() method - just unregistering from registry")
82+
self.unregister_agent(agent)
83+
84+
if cleanup_tasks:
85+
self.logger.info(f"🔄 Executing {len(cleanup_tasks)} cleanup tasks...")
86+
results = await asyncio.gather(*cleanup_tasks, return_exceptions=True)
87+
88+
# Log any exceptions that occurred during cleanup
89+
success_count = 0
90+
for i, result in enumerate(results):
91+
if isinstance(result, Exception):
92+
self.logger.error(f"❌ Error cleaning up agent {i}: {result}")
93+
else:
94+
success_count += 1
95+
96+
self.logger.info(f"✅ Successfully cleaned up {success_count}/{len(cleanup_tasks)} agents")
97+
98+
# Clear all tracking
99+
with self._lock:
100+
self._all_agents.clear()
101+
self._agent_metadata.clear()
102+
103+
self.logger.info("🎉 Completed cleanup of all agents")
104+
105+
async def _safe_close_agent(self, agent: Any) -> None:
106+
"""Safely close an agent with error handling."""
107+
try:
108+
agent_name = getattr(agent, 'agent_name', getattr(agent, 'name', type(agent).__name__))
109+
self.logger.info(f"Closing agent: {agent_name}")
110+
111+
# Call the agent's close method - it should handle Azure deletion and registry cleanup
112+
if asyncio.iscoroutinefunction(agent.close):
113+
await agent.close()
114+
else:
115+
agent.close()
116+
117+
self.logger.info(f"Successfully closed agent: {agent_name}")
118+
119+
except Exception as e:
120+
agent_name = getattr(agent, 'agent_name', getattr(agent, 'name', type(agent).__name__))
121+
self.logger.error(f"Failed to close agent {agent_name}: {e}")
122+
123+
def get_registry_status(self) -> Dict[str, Any]:
124+
"""Get current status of the agent registry for debugging and monitoring."""
125+
with self._lock:
126+
status = {
127+
'total_agents': len(self._all_agents),
128+
'agent_types': {}
129+
}
130+
131+
# Count agents by type
132+
for agent in self._all_agents:
133+
agent_type = type(agent).__name__
134+
status['agent_types'][agent_type] = status['agent_types'].get(agent_type, 0) + 1
135+
136+
return status
137+
138+
139+
# Global registry instance
140+
agent_registry = AgentRegistry()

src/backend/v3/magentic_agents/common/lifecycle.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent
77
from semantic_kernel.connectors.mcp import MCPStreamableHttpPlugin
88
from v3.magentic_agents.models.agent_models import MCPConfig
9+
from v3.config.agent_registry import agent_registry
910

1011

1112
class MCPEnabledBase:
@@ -118,5 +119,32 @@ async def open(self) -> "AzureAgentBase":
118119
return self
119120

120121
async def close(self) -> None:
121-
await self.creds.close()
122+
"""
123+
Close the agent and clean up Azure AI Foundry resources.
124+
This method deletes the agent from Azure AI Foundry and closes credentials.
125+
"""
126+
127+
try:
128+
# Delete agent from Azure AI Foundry if we have the necessary information
129+
if hasattr(self, '_agent') and self._agent and hasattr(self._agent, 'definition'):
130+
agent_id = getattr(self._agent.definition, 'id', None)
131+
132+
if agent_id and self.client:
133+
try:
134+
await self.client.agents.delete_agent(agent_id)
135+
except Exception:
136+
pass
137+
# Unregister from agent registry
138+
try:
139+
agent_registry.unregister_agent(self)
140+
except Exception:
141+
pass
142+
except Exception:
143+
pass
144+
# Always close credentials and parent resources
145+
try:
146+
if hasattr(self, 'creds') and self.creds:
147+
await self.creds.close()
148+
except Exception:
149+
pass
122150
await super().close()

src/backend/v3/magentic_agents/foundry_agent.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
from v3.magentic_agents.common.lifecycle import AzureAgentBase
99
from v3.magentic_agents.models.agent_models import MCPConfig, SearchConfig
1010

11+
from v3.config.agent_registry import agent_registry
12+
1113
# from v3.magentic_agents.models.agent_models import (BingConfig, MCPConfig,
1214
# SearchConfig)
1315

@@ -143,6 +145,15 @@ async def _after_open(self) -> None:
143145

144146
# Try to get existing agent definition from Foundry
145147
definition = await self._get_azure_ai_agent_definition(self.agent_name)
148+
149+
# Check if existing definition uses the same connection name
150+
if definition is not None:
151+
connection_compatible = await self._check_connection_compatibility(definition)
152+
if not connection_compatible:
153+
await self.client.agents.delete_agent(definition.id)
154+
self.logger.info(f"Existing agent '{self.agent_name}' uses different connection. Creating new agent definition.")
155+
definition = None
156+
146157
# If not found in Foundry, create a new one
147158
if definition is None:
148159
# Collect all tools
@@ -171,6 +182,13 @@ async def _after_open(self) -> None:
171182
self.logger.error("Failed to create AzureAIAgent: %s", ex)
172183
raise
173184

185+
# Register agent with global registry for tracking and cleanup
186+
try:
187+
agent_registry.register_agent(self)
188+
self.logger.info(f"📝 Registered agent '{self.agent_name}' with global registry")
189+
except Exception as registry_error:
190+
self.logger.warning(f"⚠️ Failed to register agent '{self.agent_name}' with registry: {registry_error}")
191+
174192
# # After self._agent creation in _after_open:
175193
# # Diagnostics
176194
# try:
@@ -205,9 +223,67 @@ async def fetch_run_details(self, thread_id: str, run_id: str):
205223
except Exception as ex:
206224
self.logger.error("Could not fetch run details: %s", ex)
207225

226+
async def _check_connection_compatibility(self, existing_definition) -> bool:
227+
"""
228+
Check if the existing agent definition uses the same connection name as the current configuration.
229+
230+
Args:
231+
existing_definition: The existing agent definition from Azure AI Foundry
232+
233+
Returns:
234+
bool: True if connections are compatible, False otherwise
235+
"""
236+
try:
237+
# Check if we have search configuration to compare
238+
if not self.search or not self.search.connection_name:
239+
self.logger.info("No search configuration to compare")
240+
return True
241+
242+
# Get tool resources from existing definition
243+
if not hasattr(existing_definition, 'tool_resources') or not existing_definition.tool_resources:
244+
self.logger.info("Existing definition has no tool resources")
245+
return not self.search.connection_name # Compatible if we also don't need search
246+
247+
# Check Azure AI Search tool resources
248+
azure_ai_search_resources = existing_definition.tool_resources.get('azure_ai_search', {})
249+
if not azure_ai_search_resources:
250+
self.logger.info("Existing definition has no Azure AI Search resources")
251+
return not self.search.connection_name # Compatible if we also don't need search
252+
253+
# Get connection ID from existing definition
254+
indexes = azure_ai_search_resources.get('indexes')[0]
255+
existing_connection_id = indexes.get('index_connection_id')
256+
if not existing_connection_id:
257+
self.logger.info("Existing definition has no connection ID")
258+
return False
259+
260+
# Get the current connection to compare
261+
try:
262+
current_connection = await self.client.connections.get(name=self.search.connection_name)
263+
current_connection_id = current_connection.id
264+
265+
# Compare connection IDs
266+
is_compatible = existing_connection_id == current_connection_id
267+
268+
if is_compatible:
269+
self.logger.info(f"Connection compatible: existing connection ID {existing_connection_id} matches current connection")
270+
else:
271+
self.logger.info(f"Connection mismatch: existing connection ID {existing_connection_id} != current connection ID {current_connection_id}")
272+
273+
return is_compatible
274+
275+
except Exception as conn_ex:
276+
self.logger.error(f"Failed to get current connection '{self.search.connection_name}': {conn_ex}")
277+
return False
278+
279+
except Exception as ex:
280+
self.logger.error(f"Error checking connection compatibility: {ex}")
281+
return False
282+
208283
async def _get_azure_ai_agent_definition(
209284
self, agent_name: str
210285
) -> Awaitable[Agent | None]:
286+
211287
"""
212288
Gets an Azure AI Agent with the specified name and instructions using AIProjectClient if it is already created.
213289
"""

src/backend/v3/magentic_agents/reasoning_agent.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from v3.magentic_agents.common.lifecycle import MCPEnabledBase
88
from v3.magentic_agents.models.agent_models import MCPConfig, SearchConfig
99
from v3.magentic_agents.reasoning_search import ReasoningSearch
10+
from v3.config.agent_registry import agent_registry
1011

1112

1213
class ReasoningAgentTemplate(MCPEnabledBase):
@@ -71,6 +72,13 @@ async def _after_open(self) -> None:
7172
instructions=self.agent_instructions,
7273
)
7374

75+
# Register agent with global registry for tracking and cleanup
76+
try:
77+
agent_registry.register_agent(self)
78+
self.logger.info(f"📝 Registered agent '{self.agent_name}' with global registry")
79+
except Exception as registry_error:
80+
self.logger.warning(f"⚠️ Failed to register agent '{self.agent_name}' with registry: {registry_error}")
81+
7482
async def invoke(self, message: str):
7583
"""Invoke the agent with a message."""
7684
if not self._agent:

0 commit comments

Comments
 (0)