diff --git a/src/backend/v3/callbacks/response_handlers.py b/src/backend/v3/callbacks/response_handlers.py index c6e8d7773..0b723c0c6 100644 --- a/src/backend/v3/callbacks/response_handlers.py +++ b/src/backend/v3/callbacks/response_handlers.py @@ -3,11 +3,16 @@ Provides detailed monitoring and response handling for different agent types. """ import asyncio +import json +import logging import sys +import time from semantic_kernel.contents import (ChatMessageContent, StreamingChatMessageContent) from v3.config.settings import connection_config, current_user_id +from v3.models.messages import (AgentMessage, AgentMessageStreaming, + AgentToolCall, AgentToolMessage) def agent_response_callback(message: ChatMessageContent, user_id: str = None) -> None: @@ -16,39 +21,39 @@ def agent_response_callback(message: ChatMessageContent, user_id: str = None) -> # Get agent name to determine handling agent_name = message.name or "Unknown Agent" - + # Get message type + content_type = getattr(message, 'content_type', 'text') + role = getattr(message, 'role', 'unknown') # Send to WebSocket if user_id: try: - asyncio.create_task(connection_config.send_status_update_async({ - "type": "agent_message", - "data": {"agent_name": agent_name, "content": message.content, "role": role} - }, user_id)) + if message.items and message.items[0].content_type == 'function_call': + final_message = AgentToolMessage(agent_name=agent_name) + for item in message.items: + if item.content_type == 'function_call': + tool_call = AgentToolCall(tool_name=item.name or "unknown_tool", arguments=item.arguments or {}) + final_message.tool_calls.append(tool_call) + asyncio.create_task(connection_config.send_status_update_async(final_message, user_id)) + logging.info(f"Function call: {final_message}") + elif message.items and message.items[0].content_type == 'function_result': + # skip returning these results for now - agent will return in a later message + pass + else: + final_message = AgentMessage(agent_name=agent_name, timestamp=time.time() or "", content=message.content or "") + asyncio.create_task(connection_config.send_status_update_async(final_message, user_id)) + logging.info(f"{role.capitalize()} message: {final_message}") except Exception as e: - print(f"Error sending WebSocket message: {e}") - - print(f"\n **{agent_name}** (role: {role})") - - if message.items[-1].content_type == 'function_call': - print(f"🔧 Function call: {message.items[-1].function_name}, Arguments: {message.items[-1].arguments}") - + logging.error(f"Response_callback: Error sending WebSocket message: {e}") -# Add this function after your agent_response_callback function async def streaming_agent_response_callback(streaming_message: StreamingChatMessageContent, is_final: bool, user_id: str = None) -> None: """Simple streaming callback to show real-time agent responses.""" - if streaming_message.name != "CoderAgent": - # Print streaming content as it arrives - if hasattr(streaming_message, 'content') and streaming_message.content: - print(streaming_message.content, end='', flush=False) - - # Send to WebSocket - if user_id: - try: - await connection_config.send_status_update_async({ - "type": "streaming_message", - "data": {"agent_name": streaming_message.name or "Unknown Agent", "content": streaming_message.content, "is_final": is_final} - }, user_id) - except Exception as e: - print(f"Error sending streaming WebSocket message: {e}") \ No newline at end of file + # process only content messages + if hasattr(streaming_message, 'content') and streaming_message.content: + if user_id: + try: + message = AgentMessageStreaming(agent_name=streaming_message.name or "Unknown Agent", content=streaming_message.content, is_final=is_final) + await connection_config.send_status_update_async(message, user_id) + except Exception as e: + logging.error(f"Response_callback: Error sending streaming WebSocket message: {e}") \ No newline at end of file diff --git a/src/backend/v3/magentic_agents/proxy_agent.py b/src/backend/v3/magentic_agents/proxy_agent.py index 26cebe0b5..5e8836955 100644 --- a/src/backend/v3/magentic_agents/proxy_agent.py +++ b/src/backend/v3/magentic_agents/proxy_agent.py @@ -194,8 +194,6 @@ async def invoke_stream(self, messages, thread=None, **kwargs) -> AsyncIterator[ # Send clarification request via streaming callbacks clarification_request = f"I need clarification about: {message}" - #self._create_message_content(clarification_request, thread.id) - # await self._trigger_streaming_callbacks(clarification_request) clarification_message = UserClarificationRequest( question=clarification_request, diff --git a/src/backend/v3/models/messages.py b/src/backend/v3/models/messages.py index 5bbfe6ce2..c464f1538 100644 --- a/src/backend/v3/models/messages.py +++ b/src/backend/v3/models/messages.py @@ -1,7 +1,7 @@ """Messages from the backend to the frontend via WebSocket.""" import uuid -from dataclasses import dataclass +from dataclasses import asdict, dataclass, field from typing import Any, Dict, List, Literal, Optional from semantic_kernel.kernel_pydantic import Field, KernelBaseModel @@ -15,6 +15,10 @@ class AgentMessage: timestamp: str content: str + def to_dict(self) -> Dict[str, Any]: + """Convert the AgentMessage to a dictionary for JSON serialization.""" + return asdict(self) + @dataclass(slots=True) class AgentStreamStart: """Start of a streaming message from the backend to the frontend via WebSocket.""" @@ -32,12 +36,29 @@ class AgentMessageStreaming: content: str is_final: bool = False + def to_dict(self) -> Dict[str, Any]: + """Convert the AgentMessageStreaming to a dictionary for JSON serialization.""" + return asdict(self) + @dataclass(slots=True) class AgentToolMessage: """Message from an agent using a tool.""" agent_name: str + tool_calls: List['AgentToolCall'] = field(default_factory=list) + + def to_dict(self) -> Dict[str, Any]: + """Convert the AgentToolMessage to a dictionary for JSON serialization.""" + return asdict(self) + +@dataclass(slots=True) +class AgentToolCall: + """Message representing a tool call from an agent.""" tool_name: str - input: str + arguments: Dict[str, Any] + + def to_dict(self) -> Dict[str, Any]: + """Convert the AgentToolCall to a dictionary for JSON serialization.""" + return asdict(self) @dataclass(slots=True) class PlanApprovalRequest: