Skip to content

Commit 373cea5

Browse files
committed
Refactor orchestration to use AzureAIAgentClient and improve logging
Replaces AzureOpenAIChatClient with AzureAIAgentClient for orchestration, updates callback signatures to match new agent framework, and enhances logging throughout the orchestration process. Improves error handling, updates workflow execution to use run_stream, and ensures compatibility with legacy WebSocket updates. Cleans up legacy shims and streamlines agent/participant management.
1 parent fd33e25 commit 373cea5

File tree

1 file changed

+110
-75
lines changed

1 file changed

+110
-75
lines changed

src/backend/af/orchestration/orchestration_manager.py

Lines changed: 110 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,14 @@
66
from typing import List, Optional, Callable, Awaitable
77

88
# agent_framework imports
9-
from agent_framework.azure import AzureOpenAIChatClient
10-
11-
from agent_framework import ChatMessage, ChatOptions
12-
from agent_framework._workflows import MagenticBuilder
13-
from agent_framework._workflows._magentic import AgentRunResponseUpdate # type: ignore
9+
from agent_framework_azure_ai import AzureAIAgentClient
10+
from agent_framework import ChatMessage, ChatOptions, WorkflowOutputEvent, AgentRunResponseUpdate, MagenticBuilder
1411

1512

1613
from common.config.app_config import config
1714
from common.models.messages_af import TeamConfiguration
1815

19-
# Existing (legacy) callbacks expecting SK content; we'll adapt to them.
20-
# If you've created af-native callbacks (e.g. response_handlers_af) import those instead.
16+
# Existing (legacy) callbacks
2117
from af.callbacks.response_handlers import (
2218
agent_response_callback,
2319
streaming_agent_response_callback,
@@ -46,9 +42,8 @@ def _user_aware_agent_callback(
4642
"""Adapts agent_framework final agent ChatMessage to legacy agent_response_callback signature."""
4743

4844
async def _cb(agent_id: str, message: ChatMessage):
49-
# Reuse existing callback expecting (ChatMessageContent, user_id). We pass text directly.
5045
try:
51-
agent_response_callback(message, user_id) # existing callback is sync
46+
agent_response_callback(agent_id, message, user_id) # Fixed: added agent_id
5247
except Exception as e: # noqa: BLE001
5348
logging.getLogger(__name__).error(
5449
"agent_response_callback error: %s", e
@@ -63,18 +58,8 @@ def _user_aware_streaming_callback(
6358
"""Adapts streaming updates to existing streaming handler."""
6459

6560
async def _cb(agent_id: str, update: AgentRunResponseUpdate, is_final: bool):
66-
# Build a minimal shim object with text/content for legacy handler if needed.
67-
# Your converted streaming handlers (response_handlers_af) should replace this eventual shim.
68-
class _Shim: # noqa: D401
69-
def __init__(self, agent_id: str, update: AgentRunResponseUpdate):
70-
self.agent_id = agent_id
71-
self.text = getattr(update, "text", None)
72-
self.contents = getattr(update, "contents", None)
73-
self.role = getattr(update, "role", None)
74-
75-
shim = _Shim(agent_id, update)
7661
try:
77-
await streaming_agent_response_callback(shim, is_final, user_id)
62+
await streaming_agent_response_callback(agent_id, update, is_final, user_id) # Fixed: removed shim
7863
except Exception as e: # noqa: BLE001
7964
logging.getLogger(__name__).error(
8065
"streaming_agent_response_callback error: %s", e
@@ -91,49 +76,64 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None):
9176
Initialize a Magentic workflow with:
9277
- Provided agents (participants)
9378
- HumanApprovalMagenticManager as orchestrator manager
94-
- AzureOpenAIChatClient as the underlying chat client
79+
- AzureAIAgentClient as the underlying chat client
80+
81+
This mirrors the old Semantic Kernel orchestration setup:
82+
- Uses same deployment, endpoint, and credentials
83+
- Applies same execution settings (temperature, max_tokens)
84+
- Maintains same human approval workflow
9585
"""
9686
if not user_id:
9787
raise ValueError("user_id is required to initialize orchestration")
9888

89+
# Get credential from config (same as old version)
9990
credential = config.get_azure_credential(client_id=config.AZURE_CLIENT_ID)
10091

101-
def get_token():
102-
token = credential.get_token("https://cognitiveservices.azure.com/.default")
103-
return token.token
104-
105-
# Create Azure chat client (agent_framework style) - relying on environment or explicit kwargs.
92+
# Create Azure AI Agent client for orchestration using config
93+
# This replaces AzureChatCompletion from SK
10694
try:
107-
chat_client = AzureOpenAIChatClient(
108-
endpoint=config.AZURE_OPENAI_ENDPOINT,
109-
deployment_name=config.AZURE_OPENAI_DEPLOYMENT_NAME,
110-
ad_token_provider=get_token,
95+
chat_client = AzureAIAgentClient(
96+
project_endpoint=config.AZURE_AI_PROJECT_ENDPOINT,
97+
model_deployment_name=config.AZURE_OPENAI_DEPLOYMENT_NAME,
98+
async_credential=credential,
11199
)
112-
except Exception as e: # noqa: BLE001
113-
logging.getLogger(__name__).error(
114-
"chat_client error: %s", e
115-
)
100+
101+
cls.logger.info(
102+
"Created AzureAIAgentClient for orchestration with model '%s' at endpoint '%s'",
103+
config.AZURE_OPENAI_DEPLOYMENT_NAME,
104+
config.AZURE_AI_PROJECT_ENDPOINT
105+
)
106+
except Exception as e:
107+
cls.logger.error("Failed to create AzureAIAgentClient: %s", e)
116108
raise
117-
# HumanApprovalMagenticManager needs the chat_client passed as 'chat_client' in its constructor signature (it subclasses StandardMagenticManager)
109+
110+
# Create HumanApprovalMagenticManager with the chat client
111+
# Execution settings (temperature=0.1, max_tokens=4000) are configured via
112+
# orchestration_config.create_execution_settings() which matches old SK version
118113
try:
119114
manager = HumanApprovalMagenticManager(
120115
user_id=user_id,
121116
chat_client=chat_client,
122-
instructions=None, # optionally supply orchestrator system instructions
117+
instructions=None, # Orchestrator system instructions (optional)
123118
max_round_count=orchestration_config.max_rounds,
124119
)
125-
except Exception as e: # noqa: BLE001
126-
logging.getLogger(__name__).error(
127-
"manager error: %s", e
128-
)
120+
cls.logger.info(
121+
"Created HumanApprovalMagenticManager for user '%s' with max_rounds=%d",
122+
user_id,
123+
orchestration_config.max_rounds
124+
)
125+
except Exception as e:
126+
cls.logger.error("Failed to create manager: %s", e)
129127
raise
128+
130129
# Build participant map: use each agent's name as key
131130
participants = {}
132131
for ag in agents:
133132
name = getattr(ag, "agent_name", None) or getattr(ag, "name", None)
134133
if not name:
135134
name = f"agent_{len(participants)+1}"
136135
participants[name] = ag
136+
cls.logger.debug("Added participant '%s'", name)
137137

138138
# Assemble workflow
139139
builder = (
@@ -142,18 +142,12 @@ def get_token():
142142
.with_standard_manager(manager=manager)
143143
)
144144

145-
# Register callbacks (non-streaming manager orchestration events). We'll enable streaming agent deltas via unified mode if desired later.
146-
# Provide direct agent + streaming callbacks (legacy adapter form).
147-
# The builder currently surfaces unified callback OR agent callbacks; we use agent callbacks here.
148-
# NOTE: If you want unified events instead, use builder.on_event(..., mode=MagenticCallbackMode.STREAMING).
149-
# We'll just store callbacks by augmenting manager after build via internal surfaces.
145+
# Build workflow
150146
workflow = builder.build()
147+
cls.logger.info("Built Magentic workflow with %d participants", len(participants))
151148

152-
# Wire agent response callbacks onto executor layer
153-
# The built workflow exposes internal orchestrator/executor attributes; we rely on exported API for adding callbacks if present.
149+
# Wire agent response callbacks onto orchestrator
154150
try:
155-
# Attributes available: workflow._orchestrator._agent_response_callback, etc.
156-
# Set them if not already configured (defensive).
157151
orchestrator = getattr(workflow, "_orchestrator", None)
158152
if orchestrator:
159153
if getattr(orchestrator, "_agent_response_callback", None) is None:
@@ -171,7 +165,8 @@ def get_token():
171165
"_streaming_agent_response_callback",
172166
cls._user_aware_streaming_callback(user_id),
173167
)
174-
except Exception as e: # noqa: BLE001
168+
cls.logger.debug("Attached callbacks to workflow orchestrator")
169+
except Exception as e:
175170
cls.logger.warning(
176171
"Could not attach callbacks to workflow orchestrator: %s", e
177172
)
@@ -193,23 +188,25 @@ async def get_current_or_new_orchestration(
193188
current = orchestration_config.get_current_orchestration(user_id)
194189
if current is None or team_switched:
195190
if current is not None and team_switched:
196-
# Close prior agents (skip ProxyAgent if desired)
191+
cls.logger.info("Team switched, closing previous agents for user '%s'", user_id)
192+
# Close prior agents (same logic as old version)
197193
for agent in getattr(current, "_participants", {}).values():
198-
if (
199-
getattr(agent, "agent_name", getattr(agent, "name", ""))
200-
!= "ProxyAgent"
201-
):
194+
agent_name = getattr(agent, "agent_name", getattr(agent, "name", ""))
195+
if agent_name != "ProxyAgent":
202196
close_coro = getattr(agent, "close", None)
203197
if callable(close_coro):
204198
try:
205199
await close_coro()
206-
except Exception as e: # noqa: BLE001
200+
cls.logger.debug("Closed agent '%s'", agent_name)
201+
except Exception as e:
207202
cls.logger.error("Error closing agent: %s", e)
208203

209204
factory = MagenticAgentFactory()
210205
agents = await factory.get_agents(
211206
user_id=user_id, team_config_input=team_config
212207
)
208+
cls.logger.info("Created %d agents for user '%s'", len(agents), user_id)
209+
213210
orchestration_config.orchestrations[user_id] = await cls.init_orchestration(
214211
agents, user_id
215212
)
@@ -221,41 +218,55 @@ async def get_current_or_new_orchestration(
221218
async def run_orchestration(self, user_id: str, input_task) -> None:
222219
"""
223220
Execute the Magentic workflow for the provided user and task description.
221+
222+
This mirrors the old SK orchestration:
223+
- Uses same execution settings (temperature=0.1, max_tokens=4000)
224+
- Maintains same approval workflow
225+
- Sends same WebSocket updates
224226
"""
225227
job_id = str(uuid.uuid4())
226228
orchestration_config.set_approval_pending(job_id)
229+
self.logger.info("Starting orchestration job '%s' for user '%s'", job_id, user_id)
227230

228231
workflow = orchestration_config.get_current_orchestration(user_id)
229232
if workflow is None:
230233
raise ValueError("Orchestration not initialized for user.")
231234

232-
# Ensure manager tracks user_id
235+
# Ensure manager tracks user_id (same as old version)
233236
try:
234237
manager = getattr(workflow, "_manager", None)
235238
if manager and hasattr(manager, "current_user_id"):
236239
manager.current_user_id = user_id
237-
except Exception as e: # noqa: BLE001
240+
self.logger.debug("Set user_id on manager = %s", user_id)
241+
except Exception as e:
238242
self.logger.error("Error setting user_id on manager: %s", e)
239243

240-
# Build a MagenticContext-like starting message; the workflow interface likely exposes invoke(task=...)
244+
# Build task from input (same as old version)
241245
task_text = getattr(input_task, "description", str(input_task))
242-
243-
# Provide chat options (temperature mapping from original execution_settings)
244-
chat_options = ChatOptions(
245-
temperature=0.1,
246-
max_output_tokens=4000,
247-
)
246+
self.logger.debug("Task: %s", task_text)
248247

249248
try:
250-
# Invoke orchestrator; API may be workflow.invoke(task=..., chat_options=...)
251-
result_msg: ChatMessage = await workflow.invoke(
252-
task=task_text, chat_options=chat_options
253-
)
254-
255-
final_text = result_msg.text if result_msg else ""
256-
self.logger.info("Final result:\n%s", final_text)
249+
# Execute workflow using run_stream with task as positional parameter
250+
# The execution settings are configured in the manager/client
251+
final_output: str | None = None
252+
253+
self.logger.info("Starting workflow execution...")
254+
async for event in workflow.run_stream(task_text):
255+
# Check if this is the final output event
256+
if isinstance(event, WorkflowOutputEvent):
257+
final_output = str(event.data)
258+
self.logger.debug("Received workflow output event")
259+
260+
# Extract final result
261+
final_text = final_output if final_output else ""
262+
263+
# Log results (same format as old version)
264+
self.logger.info("\nAgent responses:")
265+
self.logger.info("Orchestration completed. Final result length: %d chars", len(final_text))
266+
self.logger.info("\nFinal result:\n%s", final_text)
257267
self.logger.info("=" * 50)
258268

269+
# Send final result via WebSocket (same as old version)
259270
await connection_config.send_status_update_async(
260271
{
261272
"type": WebsocketMessageType.FINAL_RESULT_MESSAGE,
@@ -268,6 +279,30 @@ async def run_orchestration(self, user_id: str, input_task) -> None:
268279
user_id,
269280
message_type=WebsocketMessageType.FINAL_RESULT_MESSAGE,
270281
)
271-
self.logger.info("Final result sent via WebSocket to user %s", user_id)
272-
except Exception as e: # noqa: BLE001
273-
self.logger.error("Unexpected orchestration error: %s", e)
282+
self.logger.info("Final result sent via WebSocket to user '%s'", user_id)
283+
284+
except Exception as e:
285+
# Error handling (enhanced from old version)
286+
self.logger.error("Unexpected orchestration error: %s", e, exc_info=True)
287+
self.logger.error("Error type: %s", type(e).__name__)
288+
if hasattr(e, "__dict__"):
289+
self.logger.error("Error attributes: %s", e.__dict__)
290+
self.logger.info("=" * 50)
291+
292+
# Send error status to user
293+
try:
294+
await connection_config.send_status_update_async(
295+
{
296+
"type": WebsocketMessageType.FINAL_RESULT_MESSAGE,
297+
"data": {
298+
"content": f"Error during orchestration: {str(e)}",
299+
"status": "error",
300+
"timestamp": asyncio.get_event_loop().time(),
301+
},
302+
},
303+
user_id,
304+
message_type=WebsocketMessageType.FINAL_RESULT_MESSAGE,
305+
)
306+
except Exception as send_error:
307+
self.logger.error("Failed to send error status: %s", send_error)
308+
raise

0 commit comments

Comments
 (0)