Skip to content

Commit 9174f5a

Browse files
authored
Merge pull request #592 from Fr4nc3/macae-rfp-af-101725
Refactor orchestration to use event-based callbacks
2 parents f8150f0 + 8afed71 commit 9174f5a

File tree

1 file changed

+63
-66
lines changed

1 file changed

+63
-66
lines changed

src/backend/af/orchestration/orchestration_manager.py

Lines changed: 63 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,20 @@
33
import asyncio
44
import logging
55
import uuid
6-
from typing import List, Optional, Callable, Awaitable
6+
from typing import List, Optional
77

88
# agent_framework imports
99
from agent_framework_azure_ai import AzureAIAgentClient
10-
from agent_framework import ChatMessage, ChatOptions, WorkflowOutputEvent, AgentRunResponseUpdate, MagenticBuilder
11-
10+
from agent_framework import (
11+
ChatMessage,
12+
WorkflowOutputEvent,
13+
MagenticBuilder,
14+
MagenticCallbackMode,
15+
MagenticOrchestratorMessageEvent,
16+
MagenticAgentDeltaEvent,
17+
MagenticAgentMessageEvent,
18+
MagenticFinalResultEvent,
19+
)
1220

1321
from common.config.app_config import config
1422
from common.models.messages_af import TeamConfiguration
@@ -23,6 +31,7 @@
2331
from af.orchestration.human_approval_manager import HumanApprovalMagenticManager
2432
from af.magentic_agents.magentic_agent_factory import MagenticAgentFactory
2533

34+
2635
class OrchestrationManager:
2736
"""Manager for handling orchestration logic using agent_framework Magentic workflow."""
2837

@@ -32,41 +41,6 @@ def __init__(self):
3241
self.user_id: Optional[str] = None
3342
self.logger = self.__class__.logger
3443

35-
# ---------------------------
36-
# Internal callback adapters
37-
# ---------------------------
38-
@staticmethod
39-
def _user_aware_agent_callback(
40-
user_id: str,
41-
) -> Callable[[str, ChatMessage], Awaitable[None]]:
42-
"""Adapts agent_framework final agent ChatMessage to legacy agent_response_callback signature."""
43-
44-
async def _cb(agent_id: str, message: ChatMessage):
45-
try:
46-
agent_response_callback(agent_id, message, user_id) # Fixed: added agent_id
47-
except Exception as e: # noqa: BLE001
48-
logging.getLogger(__name__).error(
49-
"agent_response_callback error: %s", e
50-
)
51-
52-
return _cb
53-
54-
@staticmethod
55-
def _user_aware_streaming_callback(
56-
user_id: str,
57-
) -> Callable[[str, AgentRunResponseUpdate, bool], Awaitable[None]]:
58-
"""Adapts streaming updates to existing streaming handler."""
59-
60-
async def _cb(agent_id: str, update: AgentRunResponseUpdate, is_final: bool):
61-
try:
62-
await streaming_agent_response_callback(agent_id, update, is_final, user_id) # Fixed: removed shim
63-
except Exception as e: # noqa: BLE001
64-
logging.getLogger(__name__).error(
65-
"streaming_agent_response_callback error: %s", e
66-
)
67-
68-
return _cb
69-
7044
# ---------------------------
7145
# Orchestration construction
7246
# ---------------------------
@@ -77,6 +51,7 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None):
7751
- Provided agents (participants)
7852
- HumanApprovalMagenticManager as orchestrator manager
7953
- AzureAIAgentClient as the underlying chat client
54+
- Event-based callbacks for streaming and final responses
8055
8156
This mirrors the old Semantic Kernel orchestration setup:
8257
- Uses same deployment, endpoint, and credentials
@@ -135,41 +110,58 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None):
135110
participants[name] = ag
136111
cls.logger.debug("Added participant '%s'", name)
137112

138-
# Assemble workflow
113+
# Create unified event callback for Magentic workflow
114+
async def on_event(event) -> None:
115+
"""
116+
Handle all Magentic workflow events and route them to appropriate handlers.
117+
This replaces the old callback attachment approach with the proper event-based system.
118+
"""
119+
try:
120+
if isinstance(event, MagenticOrchestratorMessageEvent):
121+
# Orchestrator messages (task assignments, coordination)
122+
message_text = getattr(event.message, 'text', '')
123+
cls.logger.info(f"[ORCHESTRATOR:{event.kind}] {message_text}")
124+
125+
elif isinstance(event, MagenticAgentDeltaEvent):
126+
# Streaming update from agent - convert to our format
127+
# MagenticAgentDeltaEvent has: agent_id, text, and other properties
128+
try:
129+
await streaming_agent_response_callback(
130+
event.agent_id,
131+
event, # Pass the event itself as the update object
132+
False, # Not final yet (streaming in progress)
133+
user_id
134+
)
135+
except Exception as e:
136+
cls.logger.error(f"Error in streaming callback for agent {event.agent_id}: {e}")
137+
138+
elif isinstance(event, MagenticAgentMessageEvent):
139+
# Final agent message - complete response
140+
if event.message:
141+
try:
142+
agent_response_callback(event.agent_id, event.message, user_id)
143+
except Exception as e:
144+
cls.logger.error(f"Error in agent callback for agent {event.agent_id}: {e}")
145+
146+
elif isinstance(event, MagenticFinalResultEvent):
147+
# Final result from the entire workflow
148+
final_text = getattr(event.message, 'text', '')
149+
cls.logger.info(f"[FINAL RESULT] Length: {len(final_text)} chars")
150+
151+
except Exception as e:
152+
cls.logger.error(f"Error in on_event callback: {e}", exc_info=True)
153+
154+
# Assemble workflow with .on_event() callback (proper way for agent_framework)
139155
builder = (
140156
MagenticBuilder()
141157
.participants(**participants)
158+
.on_event(on_event, mode=MagenticCallbackMode.STREAMING) # Enable streaming events
142159
.with_standard_manager(manager=manager)
143160
)
144161

145162
# Build workflow
146163
workflow = builder.build()
147-
cls.logger.info("Built Magentic workflow with %d participants", len(participants))
148-
149-
# Wire agent response callbacks onto orchestrator
150-
try:
151-
orchestrator = getattr(workflow, "_orchestrator", None)
152-
if orchestrator:
153-
if getattr(orchestrator, "_agent_response_callback", None) is None:
154-
setattr(
155-
orchestrator,
156-
"_agent_response_callback",
157-
cls._user_aware_agent_callback(user_id),
158-
)
159-
if (
160-
getattr(orchestrator, "_streaming_agent_response_callback", None)
161-
is None
162-
):
163-
setattr(
164-
orchestrator,
165-
"_streaming_agent_response_callback",
166-
cls._user_aware_streaming_callback(user_id),
167-
)
168-
cls.logger.debug("Attached callbacks to workflow orchestrator")
169-
except Exception as e:
170-
cls.logger.warning(
171-
"Could not attach callbacks to workflow orchestrator: %s", e
172-
)
164+
cls.logger.info("Built Magentic workflow with %d participants and event callbacks", len(participants))
173165

174166
return workflow
175167

@@ -254,7 +246,12 @@ async def run_orchestration(self, user_id: str, input_task) -> None:
254246
async for event in workflow.run_stream(task_text):
255247
# Check if this is the final output event
256248
if isinstance(event, WorkflowOutputEvent):
257-
final_output = str(event.data)
249+
# Extract text from ChatMessage object
250+
output_data = event.data
251+
if isinstance(output_data, ChatMessage):
252+
final_output = getattr(output_data, "text", None) or str(output_data)
253+
else:
254+
final_output = str(output_data)
258255
self.logger.debug("Received workflow output event")
259256

260257
# Extract final result

0 commit comments

Comments
 (0)