Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/backend/v3/api/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,8 @@ async def plan_approval(
orchestration_config
and human_feedback.m_plan_id in orchestration_config.approvals
):
orchestration_config.approvals[human_feedback.m_plan_id] = (
human_feedback.approved
orchestration_config.set_approval_result(
human_feedback.m_plan_id, human_feedback.approved
)
# orchestration_config.plans[human_feedback.m_plan_id][
# "plan_id"
Expand Down Expand Up @@ -528,10 +528,10 @@ async def user_clarification(
orchestration_config
and human_feedback.request_id in orchestration_config.clarifications
):
orchestration_config.clarifications[human_feedback.request_id] = (
human_feedback.answer
# Use the new event-driven method to set clarification result
orchestration_config.set_clarification_result(
human_feedback.request_id, human_feedback.answer
)

try:
result = await PlanService.handle_human_clarification(
human_feedback, user_id
Expand Down
151 changes: 150 additions & 1 deletion src/backend/v3/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import asyncio
import json
import logging
from typing import Dict
from typing import Dict, Optional

from common.config.app_config import config
from common.models.messages_kernel import TeamConfiguration
Expand Down Expand Up @@ -86,10 +86,159 @@ def __init__(self):
20 # Maximum number of replanning rounds 20 needed to accommodate complex tasks
)

# Event-driven notification system for approvals and clarifications
self._approval_events: Dict[str, asyncio.Event] = {}
self._clarification_events: Dict[str, asyncio.Event] = {}

# Default timeout for waiting operations (5 minutes)
self.default_timeout: float = 300.0

def get_current_orchestration(self, user_id: str) -> MagenticOrchestration:
"""get existing orchestration instance."""
return self.orchestrations.get(user_id, None)

def set_approval_pending(self, plan_id: str) -> None:
"""Set an approval as pending and create an event for it."""
self.approvals[plan_id] = None
if plan_id not in self._approval_events:
self._approval_events[plan_id] = asyncio.Event()
else:
# Clear existing event to reset state
self._approval_events[plan_id].clear()

def set_approval_result(self, plan_id: str, approved: bool) -> None:
"""Set the approval result and trigger the event."""
self.approvals[plan_id] = approved
if plan_id in self._approval_events:
self._approval_events[plan_id].set()

async def wait_for_approval(self, plan_id: str, timeout: Optional[float] = None) -> bool:
"""
Wait for an approval decision with timeout.

Args:
plan_id: The plan ID to wait for
timeout: Timeout in seconds (defaults to default_timeout)

Returns:
The approval decision (True/False)

Raises:
asyncio.TimeoutError: If timeout is exceeded
KeyError: If plan_id is not found in approvals
"""
if timeout is None:
timeout = self.default_timeout

if plan_id not in self.approvals:
raise KeyError(f"Plan ID {plan_id} not found in approvals")

if self.approvals[plan_id] is not None:
# Already has a result
return self.approvals[plan_id]

if plan_id not in self._approval_events:
self._approval_events[plan_id] = asyncio.Event()

try:
await asyncio.wait_for(self._approval_events[plan_id].wait(), timeout=timeout)
return self.approvals[plan_id]
except asyncio.TimeoutError:
# Clean up on timeout
self.cleanup_approval(plan_id)
raise
except asyncio.CancelledError:
# Handle task cancellation gracefully
logger.debug(f"Approval request {plan_id} was cancelled")
raise
except Exception as e:
# Handle any other unexpected errors
logger.error(f"Unexpected error waiting for approval {plan_id}: {e}")
raise
finally:
# Ensure cleanup happens regardless of how the try block exits
# Only cleanup if the approval is still pending (None) to avoid
# cleaning up successful approvals
if plan_id in self.approvals and self.approvals[plan_id] is None:
self.cleanup_approval(plan_id)

def set_clarification_pending(self, request_id: str) -> None:
"""Set a clarification as pending and create an event for it."""
self.clarifications[request_id] = None
if request_id not in self._clarification_events:
self._clarification_events[request_id] = asyncio.Event()
else:
# Clear existing event to reset state
self._clarification_events[request_id].clear()

def set_clarification_result(self, request_id: str, answer: str) -> None:
"""Set the clarification response and trigger the event."""
self.clarifications[request_id] = answer
if request_id in self._clarification_events:
self._clarification_events[request_id].set()

async def wait_for_clarification(self, request_id: str, timeout: Optional[float] = None) -> str:
"""
Wait for a clarification response with timeout.

Args:
request_id: The request ID to wait for
timeout: Timeout in seconds (defaults to default_timeout)

Returns:
The clarification response

Raises:
asyncio.TimeoutError: If timeout is exceeded
KeyError: If request_id is not found in clarifications
"""
if timeout is None:
timeout = self.default_timeout

if request_id not in self.clarifications:
raise KeyError(f"Request ID {request_id} not found in clarifications")

if self.clarifications[request_id] is not None:
# Already has a result
return self.clarifications[request_id]

if request_id not in self._clarification_events:
self._clarification_events[request_id] = asyncio.Event()

try:
await asyncio.wait_for(self._clarification_events[request_id].wait(), timeout=timeout)
return self.clarifications[request_id]
except asyncio.TimeoutError:
# Clean up on timeout
self.cleanup_clarification(request_id)
raise
except asyncio.CancelledError:
# Handle task cancellation gracefully
logger.debug(f"Clarification request {request_id} was cancelled")
raise
except Exception as e:
# Handle any other unexpected errors
logger.error(f"Unexpected error waiting for clarification {request_id}: {e}")
raise
finally:
# Ensure cleanup happens regardless of how the try block exits
# Only cleanup if the clarification is still pending (None) to avoid
# cleaning up successful clarifications
if request_id in self.clarifications and self.clarifications[request_id] is None:
self.cleanup_clarification(request_id)

def cleanup_approval(self, plan_id: str) -> None:
"""Clean up approval resources."""
self.approvals.pop(plan_id, None)
if plan_id in self._approval_events:
del self._approval_events[plan_id]

def cleanup_clarification(self, request_id: str) -> None:
"""Clean up clarification resources."""
self.clarifications.pop(request_id, None)
if request_id in self._clarification_events:
del self._clarification_events[request_id]


class ConnectionConfig:
"""Connection manager for WebSocket connections."""
Expand Down
118 changes: 102 additions & 16 deletions src/backend/v3/magentic_agents/proxy_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import asyncio
import logging
import time
import uuid
from collections.abc import AsyncIterable
from typing import AsyncIterator, Optional
Expand Down Expand Up @@ -30,6 +31,9 @@
from v3.models.messages import (UserClarificationRequest,
UserClarificationResponse, WebsocketMessageType)

# Initialize logger for the module
logger = logging.getLogger(__name__)


class DummyAgentThread(AgentThread):
"""Dummy thread implementation for proxy agent."""
Expand Down Expand Up @@ -185,10 +189,16 @@ async def invoke(
clarification_message.request_id
)

if not human_response:
human_response = "No additional clarification provided."
# Handle silent timeout/cancellation
if human_response is None:
# Process was terminated silently - don't yield any response
logger.debug("Clarification process terminated silently - ending invoke")
return

# Extract the answer from the response
answer = human_response.answer if human_response else "No additional clarification provided."

response = f"Human clarification: {human_response}"
response = f"Human clarification: {answer}"

chat_message = self._create_message_content(response, thread.id)

Expand Down Expand Up @@ -242,10 +252,16 @@ async def invoke_stream(
clarification_message.request_id
)

if not human_response:
human_response = "No additional clarification provided."
# Handle silent timeout/cancellation
if human_response is None:
# Process was terminated silently - don't yield any response
logger.debug("Clarification process terminated silently - ending invoke_stream")
return

# Extract the answer from the response
answer = human_response.answer if human_response else "No additional clarification provided."

response = f"Human clarification: {human_response}"
response = f"Human clarification: {answer}"

chat_message = self._create_message_content(response, thread.id)

Expand All @@ -254,16 +270,86 @@ async def invoke_stream(
async def _wait_for_user_clarification(
self, request_id: str
) -> Optional[UserClarificationResponse]:
"""Wait for user clarification response."""
# To do: implement timeout and error handling
if request_id not in orchestration_config.clarifications:
orchestration_config.clarifications[request_id] = None
while orchestration_config.clarifications[request_id] is None:
await asyncio.sleep(0.2)
return UserClarificationResponse(
request_id=request_id,
answer=orchestration_config.clarifications[request_id],
)
"""
Wait for user clarification response using event-driven pattern with timeout handling.

Args:
request_id: The request ID to wait for clarification

Returns:
UserClarificationResponse: Clarification result with request ID and answer

Raises:
asyncio.TimeoutError: If timeout is exceeded (300 seconds default)
"""
# logger.info(f"Waiting for user clarification for request: {request_id}")

# Initialize clarification as pending using the new event-driven method
orchestration_config.set_clarification_pending(request_id)

try:
# Wait for clarification with timeout using the new event-driven method
answer = await orchestration_config.wait_for_clarification(request_id)

# logger.info(f"Clarification received for request {request_id}: {answer}")
return UserClarificationResponse(
request_id=request_id,
answer=answer,
)
except asyncio.TimeoutError:
# Enhanced timeout handling - notify user via WebSocket and cleanup
logger.debug(f"Clarification timeout for request {request_id} - notifying user and terminating process")

# Create timeout notification message
from v3.models.messages import TimeoutNotification, WebsocketMessageType
timeout_notification = TimeoutNotification(
timeout_type="clarification",
request_id=request_id,
message=f"User clarification request timed out after {orchestration_config.default_timeout} seconds. Please try again.",
timestamp=time.time(),
timeout_duration=orchestration_config.default_timeout
)

# Send timeout notification to user via WebSocket
try:
await connection_config.send_status_update_async(
message=timeout_notification,
user_id=self.user_id,
message_type=WebsocketMessageType.TIMEOUT_NOTIFICATION,
)
logger.info(f"Timeout notification sent to user {self.user_id} for clarification {request_id}")
except Exception as e:
logger.error(f"Failed to send timeout notification: {e}")

# Clean up this specific request
orchestration_config.cleanup_clarification(request_id)

# Return None to indicate silent termination
# The timeout naturally stops this specific wait operation without affecting other tasks
return None

except KeyError as e:
# Silent error handling for invalid request IDs
logger.debug(f"Request ID not found: {e} - terminating process silently")
return None

except asyncio.CancelledError:
# Handle task cancellation gracefully
logger.debug(f"Clarification request {request_id} was cancelled")
orchestration_config.cleanup_clarification(request_id)
return None

except Exception as e:
# Silent error handling for unexpected errors
logger.debug(f"Unexpected error waiting for clarification: {e} - terminating process silently")
orchestration_config.cleanup_clarification(request_id)
return None
finally:
# Ensure cleanup happens for any incomplete requests
# This provides an additional safety net for resource cleanup
if (request_id in orchestration_config.clarifications and orchestration_config.clarifications[request_id] is None):
logger.debug(f"Final cleanup for pending clarification request {request_id}")
orchestration_config.cleanup_clarification(request_id)

async def get_response(self, chat_history, **kwargs):
"""Get response from the agent - required by Agent base class."""
Expand Down
22 changes: 22 additions & 0 deletions src/backend/v3/models/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,27 @@ class AgentMessageResponse:
streaming_message: str = None


@dataclass(slots=True)
class TimeoutNotification:
"""Notification sent to user when session timeout occurs."""

timeout_type: str # "approval" or "clarification"
request_id: str # plan_id or request_id that timed out
message: str # Human-readable timeout message
timestamp: float # When the timeout occurred
timeout_duration: float # How long we waited before timing out

def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
return {
"timeout_type": self.timeout_type,
"request_id": self.request_id,
"message": self.message,
"timestamp": self.timestamp,
"timeout_duration": self.timeout_duration
}


class WebsocketMessageType(str, Enum):
"""Types of WebSocket messages."""

Expand All @@ -192,3 +213,4 @@ class WebsocketMessageType(str, Enum):
USER_CLARIFICATION_REQUEST = "user_clarification_request"
USER_CLARIFICATION_RESPONSE = "user_clarification_response"
FINAL_RESULT_MESSAGE = "final_result_message"
TIMEOUT_NOTIFICATION = "timeout_notification"
Loading
Loading