33import asyncio
44import logging
55import uuid
6- from typing import List , Optional , Callable , Awaitable
6+ from typing import List , Optional
77
88# agent_framework imports
99from agent_framework_azure_ai import AzureAIAgentClient
10- from agent_framework import ChatMessage , ChatOptions , WorkflowOutputEvent , AgentRunResponseUpdate , MagenticBuilder
11-
10+ from agent_framework import (
11+ ChatMessage ,
12+ WorkflowOutputEvent ,
13+ MagenticBuilder ,
14+ MagenticCallbackMode ,
15+ MagenticOrchestratorMessageEvent ,
16+ MagenticAgentDeltaEvent ,
17+ MagenticAgentMessageEvent ,
18+ MagenticFinalResultEvent ,
19+ )
1220
1321from common .config .app_config import config
1422from common .models .messages_af import TeamConfiguration
2331from af .orchestration .human_approval_manager import HumanApprovalMagenticManager
2432from af .magentic_agents .magentic_agent_factory import MagenticAgentFactory
2533
34+
2635class OrchestrationManager :
2736 """Manager for handling orchestration logic using agent_framework Magentic workflow."""
2837
@@ -32,41 +41,6 @@ def __init__(self):
3241 self .user_id : Optional [str ] = None
3342 self .logger = self .__class__ .logger
3443
35- # ---------------------------
36- # Internal callback adapters
37- # ---------------------------
38- @staticmethod
39- def _user_aware_agent_callback (
40- user_id : str ,
41- ) -> Callable [[str , ChatMessage ], Awaitable [None ]]:
42- """Adapts agent_framework final agent ChatMessage to legacy agent_response_callback signature."""
43-
44- async def _cb (agent_id : str , message : ChatMessage ):
45- try :
46- agent_response_callback (agent_id , message , user_id ) # Fixed: added agent_id
47- except Exception as e : # noqa: BLE001
48- logging .getLogger (__name__ ).error (
49- "agent_response_callback error: %s" , e
50- )
51-
52- return _cb
53-
54- @staticmethod
55- def _user_aware_streaming_callback (
56- user_id : str ,
57- ) -> Callable [[str , AgentRunResponseUpdate , bool ], Awaitable [None ]]:
58- """Adapts streaming updates to existing streaming handler."""
59-
60- async def _cb (agent_id : str , update : AgentRunResponseUpdate , is_final : bool ):
61- try :
62- await streaming_agent_response_callback (agent_id , update , is_final , user_id ) # Fixed: removed shim
63- except Exception as e : # noqa: BLE001
64- logging .getLogger (__name__ ).error (
65- "streaming_agent_response_callback error: %s" , e
66- )
67-
68- return _cb
69-
7044 # ---------------------------
7145 # Orchestration construction
7246 # ---------------------------
@@ -77,6 +51,7 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None):
7751 - Provided agents (participants)
7852 - HumanApprovalMagenticManager as orchestrator manager
7953 - AzureAIAgentClient as the underlying chat client
54+ - Event-based callbacks for streaming and final responses
8055
8156 This mirrors the old Semantic Kernel orchestration setup:
8257 - Uses same deployment, endpoint, and credentials
@@ -135,41 +110,58 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None):
135110 participants [name ] = ag
136111 cls .logger .debug ("Added participant '%s'" , name )
137112
138- # Assemble workflow
113+ # Create unified event callback for Magentic workflow
114+ async def on_event (event ) -> None :
115+ """
116+ Handle all Magentic workflow events and route them to appropriate handlers.
117+ This replaces the old callback attachment approach with the proper event-based system.
118+ """
119+ try :
120+ if isinstance (event , MagenticOrchestratorMessageEvent ):
121+ # Orchestrator messages (task assignments, coordination)
122+ message_text = getattr (event .message , 'text' , '' )
123+ cls .logger .info (f"[ORCHESTRATOR:{ event .kind } ] { message_text } " )
124+
125+ elif isinstance (event , MagenticAgentDeltaEvent ):
126+ # Streaming update from agent - convert to our format
127+ # MagenticAgentDeltaEvent has: agent_id, text, and other properties
128+ try :
129+ await streaming_agent_response_callback (
130+ event .agent_id ,
131+ event , # Pass the event itself as the update object
132+ False , # Not final yet (streaming in progress)
133+ user_id
134+ )
135+ except Exception as e :
136+ cls .logger .error (f"Error in streaming callback for agent { event .agent_id } : { e } " )
137+
138+ elif isinstance (event , MagenticAgentMessageEvent ):
139+ # Final agent message - complete response
140+ if event .message :
141+ try :
142+ agent_response_callback (event .agent_id , event .message , user_id )
143+ except Exception as e :
144+ cls .logger .error (f"Error in agent callback for agent { event .agent_id } : { e } " )
145+
146+ elif isinstance (event , MagenticFinalResultEvent ):
147+ # Final result from the entire workflow
148+ final_text = getattr (event .message , 'text' , '' )
149+ cls .logger .info (f"[FINAL RESULT] Length: { len (final_text )} chars" )
150+
151+ except Exception as e :
152+ cls .logger .error (f"Error in on_event callback: { e } " , exc_info = True )
153+
154+ # Assemble workflow with .on_event() callback (proper way for agent_framework)
139155 builder = (
140156 MagenticBuilder ()
141157 .participants (** participants )
158+ .on_event (on_event , mode = MagenticCallbackMode .STREAMING ) # Enable streaming events
142159 .with_standard_manager (manager = manager )
143160 )
144161
145162 # Build workflow
146163 workflow = builder .build ()
147- cls .logger .info ("Built Magentic workflow with %d participants" , len (participants ))
148-
149- # Wire agent response callbacks onto orchestrator
150- try :
151- orchestrator = getattr (workflow , "_orchestrator" , None )
152- if orchestrator :
153- if getattr (orchestrator , "_agent_response_callback" , None ) is None :
154- setattr (
155- orchestrator ,
156- "_agent_response_callback" ,
157- cls ._user_aware_agent_callback (user_id ),
158- )
159- if (
160- getattr (orchestrator , "_streaming_agent_response_callback" , None )
161- is None
162- ):
163- setattr (
164- orchestrator ,
165- "_streaming_agent_response_callback" ,
166- cls ._user_aware_streaming_callback (user_id ),
167- )
168- cls .logger .debug ("Attached callbacks to workflow orchestrator" )
169- except Exception as e :
170- cls .logger .warning (
171- "Could not attach callbacks to workflow orchestrator: %s" , e
172- )
164+ cls .logger .info ("Built Magentic workflow with %d participants and event callbacks" , len (participants ))
173165
174166 return workflow
175167
@@ -254,7 +246,12 @@ async def run_orchestration(self, user_id: str, input_task) -> None:
254246 async for event in workflow .run_stream (task_text ):
255247 # Check if this is the final output event
256248 if isinstance (event , WorkflowOutputEvent ):
257- final_output = str (event .data )
249+ # Extract text from ChatMessage object
250+ output_data = event .data
251+ if isinstance (output_data , ChatMessage ):
252+ final_output = getattr (output_data , "text" , None ) or str (output_data )
253+ else :
254+ final_output = str (output_data )
258255 self .logger .debug ("Received workflow output event" )
259256
260257 # Extract final result
0 commit comments