Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 32 additions & 27 deletions src/backend/v3/callbacks/response_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
Provides detailed monitoring and response handling for different agent types.
"""
import asyncio
import json
import logging
import sys
import time

from semantic_kernel.contents import (ChatMessageContent,
StreamingChatMessageContent)
from v3.config.settings import connection_config, current_user_id
from v3.models.messages import (AgentMessage, AgentMessageStreaming,
AgentToolCall, AgentToolMessage)


def agent_response_callback(message: ChatMessageContent, user_id: str = None) -> None:
Expand All @@ -16,39 +21,39 @@ def agent_response_callback(message: ChatMessageContent, user_id: str = None) ->

# Get agent name to determine handling
agent_name = message.name or "Unknown Agent"

# Get message type
content_type = getattr(message, 'content_type', 'text')

role = getattr(message, 'role', 'unknown')

# Send to WebSocket
if user_id:
try:
asyncio.create_task(connection_config.send_status_update_async({
"type": "agent_message",
"data": {"agent_name": agent_name, "content": message.content, "role": role}
}, user_id))
if message.items and message.items[0].content_type == 'function_call':
final_message = AgentToolMessage(agent_name=agent_name)
for item in message.items:
if item.content_type == 'function_call':
tool_call = AgentToolCall(tool_name=item.name or "unknown_tool", arguments=item.arguments or {})
final_message.tool_calls.append(tool_call)
asyncio.create_task(connection_config.send_status_update_async(final_message, user_id))
logging.info(f"Function call: {final_message}")
elif message.items and message.items[0].content_type == 'function_result':
# skip returning these results for now - agent will return in a later message
pass
else:
final_message = AgentMessage(agent_name=agent_name, timestamp=time.time() or "", content=message.content or "")
asyncio.create_task(connection_config.send_status_update_async(final_message, user_id))
logging.info(f"{role.capitalize()} message: {final_message}")
except Exception as e:
print(f"Error sending WebSocket message: {e}")

print(f"\n **{agent_name}** (role: {role})")

if message.items[-1].content_type == 'function_call':
print(f"🔧 Function call: {message.items[-1].function_name}, Arguments: {message.items[-1].arguments}")

logging.error(f"Response_callback: Error sending WebSocket message: {e}")

# Add this function after your agent_response_callback function
async def streaming_agent_response_callback(streaming_message: StreamingChatMessageContent, is_final: bool, user_id: str = None) -> None:
"""Simple streaming callback to show real-time agent responses."""
if streaming_message.name != "CoderAgent":
# Print streaming content as it arrives
if hasattr(streaming_message, 'content') and streaming_message.content:
print(streaming_message.content, end='', flush=False)

# Send to WebSocket
if user_id:
try:
await connection_config.send_status_update_async({
"type": "streaming_message",
"data": {"agent_name": streaming_message.name or "Unknown Agent", "content": streaming_message.content, "is_final": is_final}
}, user_id)
except Exception as e:
print(f"Error sending streaming WebSocket message: {e}")
# process only content messages
if hasattr(streaming_message, 'content') and streaming_message.content:
if user_id:
try:
message = AgentMessageStreaming(agent_name=streaming_message.name or "Unknown Agent", content=streaming_message.content, is_final=is_final)
await connection_config.send_status_update_async(message, user_id)
except Exception as e:
logging.error(f"Response_callback: Error sending streaming WebSocket message: {e}")
2 changes: 0 additions & 2 deletions src/backend/v3/magentic_agents/proxy_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,6 @@ async def invoke_stream(self, messages, thread=None, **kwargs) -> AsyncIterator[

# Send clarification request via streaming callbacks
clarification_request = f"I need clarification about: {message}"
#self._create_message_content(clarification_request, thread.id)
# await self._trigger_streaming_callbacks(clarification_request)

clarification_message = UserClarificationRequest(
question=clarification_request,
Expand Down
25 changes: 23 additions & 2 deletions src/backend/v3/models/messages.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Messages from the backend to the frontend via WebSocket."""

import uuid
from dataclasses import dataclass
from dataclasses import asdict, dataclass, field
from typing import Any, Dict, List, Literal, Optional

from semantic_kernel.kernel_pydantic import Field, KernelBaseModel
Expand All @@ -15,6 +15,10 @@ class AgentMessage:
timestamp: str
content: str

def to_dict(self) -> Dict[str, Any]:
"""Convert the AgentMessage to a dictionary for JSON serialization."""
return asdict(self)

@dataclass(slots=True)
class AgentStreamStart:
"""Start of a streaming message from the backend to the frontend via WebSocket."""
Expand All @@ -32,12 +36,29 @@ class AgentMessageStreaming:
content: str
is_final: bool = False

def to_dict(self) -> Dict[str, Any]:
"""Convert the AgentMessageStreaming to a dictionary for JSON serialization."""
return asdict(self)

@dataclass(slots=True)
class AgentToolMessage:
"""Message from an agent using a tool."""
agent_name: str
tool_calls: List['AgentToolCall'] = field(default_factory=list)

def to_dict(self) -> Dict[str, Any]:
"""Convert the AgentToolMessage to a dictionary for JSON serialization."""
return asdict(self)

@dataclass(slots=True)
class AgentToolCall:
"""Message representing a tool call from an agent."""
tool_name: str
input: str
arguments: Dict[str, Any]

def to_dict(self) -> Dict[str, Any]:
"""Convert the AgentToolCall to a dictionary for JSON serialization."""
return asdict(self)

@dataclass(slots=True)
class PlanApprovalRequest:
Expand Down
Loading