diff --git a/src/backend/v3/api/router.py b/src/backend/v3/api/router.py index 11e83c53..a86b35c5 100644 --- a/src/backend/v3/api/router.py +++ b/src/backend/v3/api/router.py @@ -396,8 +396,8 @@ async def plan_approval( orchestration_config and human_feedback.m_plan_id in orchestration_config.approvals ): - orchestration_config.approvals[human_feedback.m_plan_id] = ( - human_feedback.approved + orchestration_config.set_approval_result( + human_feedback.m_plan_id, human_feedback.approved ) # orchestration_config.plans[human_feedback.m_plan_id][ # "plan_id" @@ -528,10 +528,10 @@ async def user_clarification( orchestration_config and human_feedback.request_id in orchestration_config.clarifications ): - orchestration_config.clarifications[human_feedback.request_id] = ( - human_feedback.answer + # Use the new event-driven method to set clarification result + orchestration_config.set_clarification_result( + human_feedback.request_id, human_feedback.answer ) - try: result = await PlanService.handle_human_clarification( human_feedback, user_id diff --git a/src/backend/v3/config/settings.py b/src/backend/v3/config/settings.py index 1dcbfbc6..5958eb4f 100644 --- a/src/backend/v3/config/settings.py +++ b/src/backend/v3/config/settings.py @@ -6,7 +6,7 @@ import asyncio import json import logging -from typing import Dict +from typing import Dict, Optional from common.config.app_config import config from common.models.messages_kernel import TeamConfiguration @@ -86,10 +86,159 @@ def __init__(self): 20 # Maximum number of replanning rounds 20 needed to accommodate complex tasks ) + # Event-driven notification system for approvals and clarifications + self._approval_events: Dict[str, asyncio.Event] = {} + self._clarification_events: Dict[str, asyncio.Event] = {} + + # Default timeout for waiting operations (5 minutes) + self.default_timeout: float = 300.0 + def get_current_orchestration(self, user_id: str) -> MagenticOrchestration: """get existing orchestration instance.""" return self.orchestrations.get(user_id, None) + def set_approval_pending(self, plan_id: str) -> None: + """Set an approval as pending and create an event for it.""" + self.approvals[plan_id] = None + if plan_id not in self._approval_events: + self._approval_events[plan_id] = asyncio.Event() + else: + # Clear existing event to reset state + self._approval_events[plan_id].clear() + + def set_approval_result(self, plan_id: str, approved: bool) -> None: + """Set the approval result and trigger the event.""" + self.approvals[plan_id] = approved + if plan_id in self._approval_events: + self._approval_events[plan_id].set() + + async def wait_for_approval(self, plan_id: str, timeout: Optional[float] = None) -> bool: + """ + Wait for an approval decision with timeout. + + Args: + plan_id: The plan ID to wait for + timeout: Timeout in seconds (defaults to default_timeout) + + Returns: + The approval decision (True/False) + + Raises: + asyncio.TimeoutError: If timeout is exceeded + KeyError: If plan_id is not found in approvals + """ + if timeout is None: + timeout = self.default_timeout + + if plan_id not in self.approvals: + raise KeyError(f"Plan ID {plan_id} not found in approvals") + + if self.approvals[plan_id] is not None: + # Already has a result + return self.approvals[plan_id] + + if plan_id not in self._approval_events: + self._approval_events[plan_id] = asyncio.Event() + + try: + await asyncio.wait_for(self._approval_events[plan_id].wait(), timeout=timeout) + return self.approvals[plan_id] + except asyncio.TimeoutError: + # Clean up on timeout + self.cleanup_approval(plan_id) + raise + except asyncio.CancelledError: + # Handle task cancellation gracefully + logger.debug(f"Approval request {plan_id} was cancelled") + raise + except Exception as e: + # Handle any other unexpected errors + logger.error(f"Unexpected error waiting for approval {plan_id}: {e}") + raise + finally: + # Ensure cleanup happens regardless of how the try block exits + # Only cleanup if the approval is still pending (None) to avoid + # cleaning up successful approvals + if plan_id in self.approvals and self.approvals[plan_id] is None: + self.cleanup_approval(plan_id) + + def set_clarification_pending(self, request_id: str) -> None: + """Set a clarification as pending and create an event for it.""" + self.clarifications[request_id] = None + if request_id not in self._clarification_events: + self._clarification_events[request_id] = asyncio.Event() + else: + # Clear existing event to reset state + self._clarification_events[request_id].clear() + + def set_clarification_result(self, request_id: str, answer: str) -> None: + """Set the clarification response and trigger the event.""" + self.clarifications[request_id] = answer + if request_id in self._clarification_events: + self._clarification_events[request_id].set() + + async def wait_for_clarification(self, request_id: str, timeout: Optional[float] = None) -> str: + """ + Wait for a clarification response with timeout. + + Args: + request_id: The request ID to wait for + timeout: Timeout in seconds (defaults to default_timeout) + + Returns: + The clarification response + + Raises: + asyncio.TimeoutError: If timeout is exceeded + KeyError: If request_id is not found in clarifications + """ + if timeout is None: + timeout = self.default_timeout + + if request_id not in self.clarifications: + raise KeyError(f"Request ID {request_id} not found in clarifications") + + if self.clarifications[request_id] is not None: + # Already has a result + return self.clarifications[request_id] + + if request_id not in self._clarification_events: + self._clarification_events[request_id] = asyncio.Event() + + try: + await asyncio.wait_for(self._clarification_events[request_id].wait(), timeout=timeout) + return self.clarifications[request_id] + except asyncio.TimeoutError: + # Clean up on timeout + self.cleanup_clarification(request_id) + raise + except asyncio.CancelledError: + # Handle task cancellation gracefully + logger.debug(f"Clarification request {request_id} was cancelled") + raise + except Exception as e: + # Handle any other unexpected errors + logger.error(f"Unexpected error waiting for clarification {request_id}: {e}") + raise + finally: + # Ensure cleanup happens regardless of how the try block exits + # Only cleanup if the clarification is still pending (None) to avoid + # cleaning up successful clarifications + if request_id in self.clarifications and self.clarifications[request_id] is None: + self.cleanup_clarification(request_id) + + def cleanup_approval(self, plan_id: str) -> None: + """Clean up approval resources.""" + self.approvals.pop(plan_id, None) + if plan_id in self._approval_events: + del self._approval_events[plan_id] + + def cleanup_clarification(self, request_id: str) -> None: + """Clean up clarification resources.""" + self.clarifications.pop(request_id, None) + if request_id in self._clarification_events: + del self._clarification_events[request_id] + class ConnectionConfig: """Connection manager for WebSocket connections.""" diff --git a/src/backend/v3/magentic_agents/proxy_agent.py b/src/backend/v3/magentic_agents/proxy_agent.py index db952cc5..02cd90b7 100644 --- a/src/backend/v3/magentic_agents/proxy_agent.py +++ b/src/backend/v3/magentic_agents/proxy_agent.py @@ -3,6 +3,7 @@ import asyncio import logging +import time import uuid from collections.abc import AsyncIterable from typing import AsyncIterator, Optional @@ -30,6 +31,9 @@ from v3.models.messages import (UserClarificationRequest, UserClarificationResponse, WebsocketMessageType) +# Initialize logger for the module +logger = logging.getLogger(__name__) + class DummyAgentThread(AgentThread): """Dummy thread implementation for proxy agent.""" @@ -185,10 +189,16 @@ async def invoke( clarification_message.request_id ) - if not human_response: - human_response = "No additional clarification provided." + # Handle silent timeout/cancellation + if human_response is None: + # Process was terminated silently - don't yield any response + logger.debug("Clarification process terminated silently - ending invoke") + return + + # Extract the answer from the response + answer = human_response.answer if human_response else "No additional clarification provided." - response = f"Human clarification: {human_response}" + response = f"Human clarification: {answer}" chat_message = self._create_message_content(response, thread.id) @@ -242,10 +252,16 @@ async def invoke_stream( clarification_message.request_id ) - if not human_response: - human_response = "No additional clarification provided." + # Handle silent timeout/cancellation + if human_response is None: + # Process was terminated silently - don't yield any response + logger.debug("Clarification process terminated silently - ending invoke_stream") + return + + # Extract the answer from the response + answer = human_response.answer if human_response else "No additional clarification provided." - response = f"Human clarification: {human_response}" + response = f"Human clarification: {answer}" chat_message = self._create_message_content(response, thread.id) @@ -254,16 +270,86 @@ async def invoke_stream( async def _wait_for_user_clarification( self, request_id: str ) -> Optional[UserClarificationResponse]: - """Wait for user clarification response.""" - # To do: implement timeout and error handling - if request_id not in orchestration_config.clarifications: - orchestration_config.clarifications[request_id] = None - while orchestration_config.clarifications[request_id] is None: - await asyncio.sleep(0.2) - return UserClarificationResponse( - request_id=request_id, - answer=orchestration_config.clarifications[request_id], - ) + """ + Wait for user clarification response using event-driven pattern with timeout handling. + + Args: + request_id: The request ID to wait for clarification + + Returns: + UserClarificationResponse: Clarification result with request ID and answer + + Raises: + asyncio.TimeoutError: If timeout is exceeded (300 seconds default) + """ + # logger.info(f"Waiting for user clarification for request: {request_id}") + + # Initialize clarification as pending using the new event-driven method + orchestration_config.set_clarification_pending(request_id) + + try: + # Wait for clarification with timeout using the new event-driven method + answer = await orchestration_config.wait_for_clarification(request_id) + + # logger.info(f"Clarification received for request {request_id}: {answer}") + return UserClarificationResponse( + request_id=request_id, + answer=answer, + ) + except asyncio.TimeoutError: + # Enhanced timeout handling - notify user via WebSocket and cleanup + logger.debug(f"Clarification timeout for request {request_id} - notifying user and terminating process") + + # Create timeout notification message + from v3.models.messages import TimeoutNotification, WebsocketMessageType + timeout_notification = TimeoutNotification( + timeout_type="clarification", + request_id=request_id, + message=f"User clarification request timed out after {orchestration_config.default_timeout} seconds. Please try again.", + timestamp=time.time(), + timeout_duration=orchestration_config.default_timeout + ) + + # Send timeout notification to user via WebSocket + try: + await connection_config.send_status_update_async( + message=timeout_notification, + user_id=self.user_id, + message_type=WebsocketMessageType.TIMEOUT_NOTIFICATION, + ) + logger.info(f"Timeout notification sent to user {self.user_id} for clarification {request_id}") + except Exception as e: + logger.error(f"Failed to send timeout notification: {e}") + + # Clean up this specific request + orchestration_config.cleanup_clarification(request_id) + + # Return None to indicate silent termination + # The timeout naturally stops this specific wait operation without affecting other tasks + return None + + except KeyError as e: + # Silent error handling for invalid request IDs + logger.debug(f"Request ID not found: {e} - terminating process silently") + return None + + except asyncio.CancelledError: + # Handle task cancellation gracefully + logger.debug(f"Clarification request {request_id} was cancelled") + orchestration_config.cleanup_clarification(request_id) + return None + + except Exception as e: + # Silent error handling for unexpected errors + logger.debug(f"Unexpected error waiting for clarification: {e} - terminating process silently") + orchestration_config.cleanup_clarification(request_id) + return None + finally: + # Ensure cleanup happens for any incomplete requests + # This provides an additional safety net for resource cleanup + if (request_id in orchestration_config.clarifications and orchestration_config.clarifications[request_id] is None): + logger.debug(f"Final cleanup for pending clarification request {request_id}") + orchestration_config.cleanup_clarification(request_id) async def get_response(self, chat_history, **kwargs): """Get response from the agent - required by Agent base class.""" diff --git a/src/backend/v3/models/messages.py b/src/backend/v3/models/messages.py index 8eb4187c..4537820d 100644 --- a/src/backend/v3/models/messages.py +++ b/src/backend/v3/models/messages.py @@ -176,6 +176,27 @@ class AgentMessageResponse: streaming_message: str = None +@dataclass(slots=True) +class TimeoutNotification: + """Notification sent to user when session timeout occurs.""" + + timeout_type: str # "approval" or "clarification" + request_id: str # plan_id or request_id that timed out + message: str # Human-readable timeout message + timestamp: float # When the timeout occurred + timeout_duration: float # How long we waited before timing out + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for JSON serialization.""" + return { + "timeout_type": self.timeout_type, + "request_id": self.request_id, + "message": self.message, + "timestamp": self.timestamp, + "timeout_duration": self.timeout_duration + } + + class WebsocketMessageType(str, Enum): """Types of WebSocket messages.""" @@ -192,3 +213,4 @@ class WebsocketMessageType(str, Enum): USER_CLARIFICATION_REQUEST = "user_clarification_request" USER_CLARIFICATION_RESPONSE = "user_clarification_response" FINAL_RESULT_MESSAGE = "final_result_message" + TIMEOUT_NOTIFICATION = "timeout_notification" diff --git a/src/backend/v3/orchestration/human_approval_manager.py b/src/backend/v3/orchestration/human_approval_manager.py index 1efd6c44..bfba4bef 100644 --- a/src/backend/v3/orchestration/human_approval_manager.py +++ b/src/backend/v3/orchestration/human_approval_manager.py @@ -206,16 +206,88 @@ async def create_progress_ledger( async def _wait_for_user_approval( self, m_plan_id: Optional[str] = None ) -> Optional[messages.PlanApprovalResponse]: - """Wait for user approval response.""" - - # To do: implement timeout and error handling - if m_plan_id not in orchestration_config.approvals: - orchestration_config.approvals[m_plan_id] = None - while orchestration_config.approvals[m_plan_id] is None: - await asyncio.sleep(0.2) - return messages.PlanApprovalResponse( - approved=orchestration_config.approvals[m_plan_id], m_plan_id=m_plan_id - ) + """ + Wait for user approval response using event-driven pattern with timeout handling. + + Args: + m_plan_id: The plan ID to wait for approval + + Returns: + PlanApprovalResponse: Approval result with approved status and plan ID + + Raises: + asyncio.TimeoutError: If timeout is exceeded (300 seconds default) + """ + logger.info(f"Waiting for user approval for plan: {m_plan_id}") + + if not m_plan_id: + logger.error("No plan ID provided for approval") + return messages.PlanApprovalResponse(approved=False, m_plan_id=m_plan_id) + + # Initialize approval as pending using the new event-driven method + orchestration_config.set_approval_pending(m_plan_id) + + try: + # Wait for approval with timeout using the new event-driven method + approved = await orchestration_config.wait_for_approval(m_plan_id) + + logger.info(f"Approval received for plan {m_plan_id}: {approved}") + return messages.PlanApprovalResponse( + approved=approved, m_plan_id=m_plan_id + ) + except asyncio.TimeoutError: + # Enhanced timeout handling - notify user via WebSocket and cleanup + logger.debug(f"Approval timeout for plan {m_plan_id} - notifying user and terminating process") + + # Create timeout notification message + timeout_message = messages.TimeoutNotification( + timeout_type="approval", + request_id=m_plan_id, + message=f"Plan approval request timed out after {orchestration_config.default_timeout} seconds. Please try again.", + timestamp=asyncio.get_event_loop().time(), + timeout_duration=orchestration_config.default_timeout + ) + + # Send timeout notification to user via WebSocket + try: + await connection_config.send_status_update_async( + message=timeout_message, + user_id=self.current_user_id, + message_type=messages.WebsocketMessageType.TIMEOUT_NOTIFICATION, + ) + logger.info(f"Timeout notification sent to user {self.current_user_id} for plan {m_plan_id}") + except Exception as e: + logger.error(f"Failed to send timeout notification: {e}") + + # Clean up this specific request + orchestration_config.cleanup_approval(m_plan_id) + + # Return None to indicate silent termination + # The timeout naturally stops this specific wait operation without affecting other tasks + return None + + except KeyError as e: + # Silent error handling for invalid plan IDs + logger.debug(f"Plan ID not found: {e} - terminating process silently") + return None + + except asyncio.CancelledError: + # Handle task cancellation gracefully + logger.debug(f"Approval request {m_plan_id} was cancelled") + orchestration_config.cleanup_approval(m_plan_id) + return None + + except Exception as e: + # Silent error handling for unexpected errors + logger.debug(f"Unexpected error waiting for approval: {e} - terminating process silently") + orchestration_config.cleanup_approval(m_plan_id) + return None + finally: + # Ensure cleanup happens for any incomplete requests + # This provides an additional safety net for resource cleanup + if (m_plan_id in orchestration_config.approvals and orchestration_config.approvals[m_plan_id] is None): + logger.debug(f"Final cleanup for pending approval plan {m_plan_id}") + orchestration_config.cleanup_approval(m_plan_id) async def prepare_final_answer( self, magentic_context: MagenticContext diff --git a/src/backend/v3/orchestration/orchestration_manager.py b/src/backend/v3/orchestration/orchestration_manager.py index 81f49316..7db458fe 100644 --- a/src/backend/v3/orchestration/orchestration_manager.py +++ b/src/backend/v3/orchestration/orchestration_manager.py @@ -119,7 +119,9 @@ async def run_orchestration(self, user_id, input_task) -> None: """Run the orchestration with user input loop.""" job_id = str(uuid.uuid4()) - orchestration_config.approvals[job_id] = None + + # Use the new event-driven method to set approval as pending + orchestration_config.set_approval_pending(job_id) magentic_orchestration = orchestration_config.get_current_orchestration(user_id)