Skip to content

Commit 9883c8c

Browse files
committed
Enhance event handling in OrchestrationManager
Refactored the workflow event loop to handle additional event types, including orchestrator messages, agent streaming updates, agent final messages, and final result events. Improved error handling and logging for each event type. Commented out unused MagenticCallbackMode references.
1 parent 0613579 commit 9883c8c

File tree

1 file changed

+44
-11
lines changed

1 file changed

+44
-11
lines changed

src/backend/v4/orchestration/orchestration_manager.py

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
ChatMessage,
1212
WorkflowOutputEvent,
1313
MagenticBuilder,
14-
MagenticCallbackMode,
14+
# MagenticCallbackMode,
1515
MagenticOrchestratorMessageEvent,
1616
MagenticAgentDeltaEvent,
1717
MagenticAgentMessageEvent,
@@ -166,7 +166,7 @@ async def on_event(event) -> None:
166166
builder = (
167167
MagenticBuilder()
168168
.participants(**participants)
169-
.on_event(on_event, mode=MagenticCallbackMode.STREAMING) # Enable streaming events
169+
#.on_event(on_event) # Enable streaming events
170170
.with_standard_manager(manager=manager)
171171
)
172172

@@ -255,15 +255,48 @@ async def run_orchestration(self, user_id: str, input_task) -> None:
255255

256256
self.logger.info("Starting workflow execution...")
257257
async for event in workflow.run_stream(task_text):
258-
# Check if this is the final output event
259-
if isinstance(event, WorkflowOutputEvent):
260-
# Extract text from ChatMessage object
261-
output_data = event.data
262-
if isinstance(output_data, ChatMessage):
263-
final_output = getattr(output_data, "text", None) or str(output_data)
264-
else:
265-
final_output = str(output_data)
266-
self.logger.debug("Received workflow output event")
258+
try:
259+
# Handle orchestrator messages (task assignments, coordination)
260+
if isinstance(event, MagenticOrchestratorMessageEvent):
261+
message_text = getattr(event.message, 'text', '')
262+
self.logger.info(f"[ORCHESTRATOR:{event.kind}] {message_text}")
263+
264+
# Handle streaming updates from agents
265+
elif isinstance(event, MagenticAgentDeltaEvent):
266+
try:
267+
await streaming_agent_response_callback(
268+
event.agent_id,
269+
event, # Pass the event itself as the update object
270+
False, # Not final yet (streaming in progress)
271+
user_id
272+
)
273+
except Exception as e:
274+
self.logger.error(f"Error in streaming callback for agent {event.agent_id}: {e}")
275+
276+
# Handle final agent messages (complete response)
277+
elif isinstance(event, MagenticAgentMessageEvent):
278+
if event.message:
279+
try:
280+
agent_response_callback(event.agent_id, event.message, user_id)
281+
except Exception as e:
282+
self.logger.error(f"Error in agent callback for agent {event.agent_id}: {e}")
283+
284+
# Handle final result from the entire workflow
285+
elif isinstance(event, MagenticFinalResultEvent):
286+
final_text = getattr(event.message, 'text', '')
287+
self.logger.info(f"[FINAL RESULT] Length: {len(final_text)} chars")
288+
289+
# Handle workflow output event (captures final result)
290+
elif isinstance(event, WorkflowOutputEvent):
291+
output_data = event.data
292+
if isinstance(output_data, ChatMessage):
293+
final_output = getattr(output_data, "text", None) or str(output_data)
294+
else:
295+
final_output = str(output_data)
296+
self.logger.debug("Received workflow output event")
297+
298+
except Exception as e:
299+
self.logger.error(f"Error processing event {type(event).__name__}: {e}", exc_info=True)
267300

268301
# Extract final result
269302
final_text = final_output if final_output else ""

0 commit comments

Comments
 (0)