From c97925310e851fa63903f7e81175af3263c45044 Mon Sep 17 00:00:00 2001 From: Francia Riesco Date: Wed, 3 Sep 2025 12:53:57 -0400 Subject: [PATCH] Implement plan retrieval and WebSocket improvements Uncommented and enabled plan retrieval logic in the backend, including session and plan ID resolution and message date formatting. Updated frontend PlanPage and WebSocketService to pass planId when connecting, improved plan data processing, and fixed minor formatting and callback issues for better reliability and maintainability. --- src/backend/app_kernel.py | 103 +++++---- src/frontend/src/api/config.tsx | 8 +- src/frontend/src/pages/PlanPage.tsx | 218 +++++++++--------- src/frontend/src/services/PlanDataService.tsx | 16 +- .../src/services/WebSocketService.tsx | 34 +-- 5 files changed, 192 insertions(+), 187 deletions(-) diff --git a/src/backend/app_kernel.py b/src/backend/app_kernel.py index dae304f01..711bb4198 100644 --- a/src/backend/app_kernel.py +++ b/src/backend/app_kernel.py @@ -28,6 +28,7 @@ from kernel_agents.agent_factory import AgentFactory # Local imports from middleware.health_check import HealthCheckMiddleware +from common.utils.utils_date import format_dates_in_messages from v3.api.router import app_v3 # Semantic Kernel imports @@ -671,57 +672,57 @@ async def get_plans( #### Replace the following with code to get plan run history from the database # # Initialize memory context - # memory_store = await DatabaseFactory.get_database(user_id=user_id) - # if session_id: - # plan = await memory_store.get_plan_by_session(session_id=session_id) - # if not plan: - # track_event_if_configured( - # "GetPlanBySessionNotFound", - # {"status_code": 400, "detail": "Plan not found"}, - # ) - # raise HTTPException(status_code=404, detail="Plan not found") - - # # Use get_steps_by_plan to match the original implementation - # steps = await memory_store.get_steps_by_plan(plan_id=plan.id) - # plan_with_steps = PlanWithSteps(**plan.model_dump(), steps=steps) - # plan_with_steps.update_step_counts() - # return [plan_with_steps] - # if plan_id: - # plan = await memory_store.get_plan_by_plan_id(plan_id=plan_id) - # if not plan: - # track_event_if_configured( - # "GetPlanBySessionNotFound", - # {"status_code": 400, "detail": "Plan not found"}, - # ) - # raise HTTPException(status_code=404, detail="Plan not found") - - # # Use get_steps_by_plan to match the original implementation - # steps = await memory_store.get_steps_by_plan(plan_id=plan.id) - # messages = await memory_store.get_data_by_type_and_session_id( - # "agent_message", session_id=plan.session_id - # ) - - # plan_with_steps = PlanWithSteps(**plan.model_dump(), steps=steps) - # plan_with_steps.update_step_counts() - - # # Format dates in messages according to locale - # formatted_messages = format_dates_in_messages( - # messages, config.get_user_local_browser_language() - # ) - - # return [plan_with_steps, formatted_messages] - - # all_plans = await memory_store.get_all_plans() - # # Fetch steps for all plans concurrently - # steps_for_all_plans = await asyncio.gather( - # *[memory_store.get_steps_by_plan(plan_id=plan.id) for plan in all_plans] - # ) - # # Create list of PlanWithSteps and update step counts - # list_of_plans_with_steps = [] - # for plan, steps in zip(all_plans, steps_for_all_plans): - # plan_with_steps = PlanWithSteps(**plan.model_dump(), steps=steps) - # plan_with_steps.update_step_counts() - # list_of_plans_with_steps.append(plan_with_steps) + memory_store = await DatabaseFactory.get_database(user_id=user_id) + if session_id: + plan = await memory_store.get_plan_by_session(session_id=session_id) + if not plan: + track_event_if_configured( + "GetPlanBySessionNotFound", + {"status_code": 400, "detail": "Plan not found"}, + ) + raise HTTPException(status_code=404, detail="Plan not found") + + # Use get_steps_by_plan to match the original implementation + steps = await memory_store.get_steps_by_plan(plan_id=plan.id) + plan_with_steps = PlanWithSteps(**plan.model_dump(), steps=steps) + plan_with_steps.update_step_counts() + return [plan_with_steps] + if plan_id: + plan = await memory_store.get_plan_by_plan_id(plan_id=plan_id) + if not plan: + track_event_if_configured( + "GetPlanBySessionNotFound", + {"status_code": 400, "detail": "Plan not found"}, + ) + raise HTTPException(status_code=404, detail="Plan not found") + + # Use get_steps_by_plan to match the original implementation + steps = await memory_store.get_steps_by_plan(plan_id=plan.id) + messages = await memory_store.get_data_by_type_and_session_id( + "agent_message", session_id=plan.session_id + ) + + plan_with_steps = PlanWithSteps(**plan.model_dump(), steps=steps) + plan_with_steps.update_step_counts() + + # Format dates in messages according to locale + formatted_messages = format_dates_in_messages( + messages, config.get_user_local_browser_language() + ) + + return [plan_with_steps, formatted_messages] + + all_plans = await memory_store.get_all_plans() + # Fetch steps for all plans concurrently + steps_for_all_plans = await asyncio.gather( + *[memory_store.get_steps_by_plan(plan_id=plan.id) for plan in all_plans] + ) + # Create list of PlanWithSteps and update step counts + list_of_plans_with_steps = [] + for plan, steps in zip(all_plans, steps_for_all_plans): + plan_with_steps = PlanWithSteps(**plan.model_dump(), steps=steps) + plan_with_steps.update_step_counts() + list_of_plans_with_steps.append(plan_with_steps) return [] diff --git a/src/frontend/src/api/config.tsx b/src/frontend/src/api/config.tsx index 586c16890..db0c2533e 100644 --- a/src/frontend/src/api/config.tsx +++ b/src/frontend/src/api/config.tsx @@ -97,7 +97,7 @@ export function getUserInfoGlobal() { } if (!USER_INFO) { - console.info('User info not yet configured'); + // console.info('User info not yet configured'); return null; } @@ -149,7 +149,7 @@ export function headerBuilder(headers?: Record): Record): Record { // Only connect if not already connected if (!webSocketService.isConnected()) { try { - await webSocketService.connect(); + await webSocketService.connect(planId); setWsConnected(true); } catch (error) { console.error('Failed to connect to WebSocket:', error); @@ -91,7 +91,7 @@ const PlanPage: React.FC = () => { if (message.data && message.data.plan_id === planId) { console.log('Plan update received:', message.data); setStreamingMessages(prev => [...prev, message.data as StreamingPlanUpdate]); - + // Refresh plan data for major updates if (message.data.status === 'completed' && loadPlanDataRef.current) { loadPlanDataRef.current(false); @@ -130,7 +130,7 @@ const PlanPage: React.FC = () => { }, [planId]); // Subscribe to plan updates when planId changes - useEffect(() => { + useEffect(() => { if (planId && wsConnected && !planId.startsWith('sid_')) { // Only subscribe if we have a real plan_id (not session_id) console.log('Subscribing to plan updates for:', planId); @@ -162,7 +162,7 @@ const PlanPage: React.FC = () => { console.log('Default team loaded from storage:', defaultTeam.name); return; } - + try { const teams = await TeamService.getUserTeams(); console.log('All teams loaded:', teams); @@ -188,7 +188,7 @@ const PlanPage: React.FC = () => { (plan) => plan.plan.id === planId ); setPlanData(currentPlan || null); - }, [allPlans,planId]); + }, [allPlans, planId]); const loadPlanData = useCallback( async (navigate: boolean = true) => { @@ -204,18 +204,18 @@ const PlanPage: React.FC = () => { } setError(null); - const data = await PlanDataService.fetchPlanData(planId,navigate); + const data = await PlanDataService.fetchPlanData(planId, navigate); - setAllPlans(currentPlans => { - const plans = [...currentPlans]; - const existingIndex = plans.findIndex(p => p.plan.id === data.plan.id); - if (existingIndex !== -1) { - plans[existingIndex] = data; - } else { - plans.push(data); - } - return plans; - }); + setAllPlans(currentPlans => { + const plans = [...currentPlans]; + const existingIndex = plans.findIndex(p => p.plan.id === data.plan.id); + if (existingIndex !== -1) { + plans[existingIndex] = data; + } else { + plans.push(data); + } + return plans; + }); } catch (err) { console.log("Failed to load plan data:", err); setError( @@ -257,7 +257,7 @@ const PlanPage: React.FC = () => { await loadPlanData(false); } catch (error: any) { dismissToast(id); - + // Check if this is an RAI validation error let errorDetail = null; try { @@ -315,73 +315,73 @@ const PlanPage: React.FC = () => { useEffect(() => { - const initializePlanLoading = async () => { - if (!planId) return; - - // Check if this looks like a session_id (starts with "sid_") - if (planId.startsWith('sid_')) { - console.log('Detected session_id, resolving to plan_id:', planId); - - try { - // Try to find the plan by session_id - const plans = await apiService.getPlans(); - const matchingPlan = plans.find(plan => plan.session_id === planId); - - if (matchingPlan) { - // Found the plan! Replace URL with correct plan_id - console.log('Resolved session_id to plan_id:', matchingPlan.id); - navigate(`/plan/${matchingPlan.id}`, { replace: true }); - return; // Navigation will trigger reload with correct ID - } else { - // Plan not created yet, start polling - console.log('Plan not found yet, starting polling for session:', planId); - let attempts = 0; - const maxAttempts = 20; // Poll for up to 20 seconds - - const pollForPlan = async () => { - attempts++; - if (attempts > maxAttempts) { - console.error('Plan creation timed out after polling'); - setError(new Error('Plan creation is taking longer than expected. Please check your task list or try creating a new plan.')); - setLoading(false); - return; - } - - try { - const plans = await apiService.getPlans(); - const plan = plans.find(p => p.session_id === planId); - - if (plan) { - console.log(`Found plan after ${attempts} attempts:`, plan.id); - navigate(`/plan/${plan.id}`, { replace: true }); - } else { - // Wait and try again - setTimeout(pollForPlan, 1000); // Poll every second + const initializePlanLoading = async () => { + if (!planId) return; + + // Check if this looks like a session_id (starts with "sid_") + if (planId.startsWith('sid_')) { + console.log('Detected session_id, resolving to plan_id:', planId); + + try { + // Try to find the plan by session_id + const plans = await apiService.getPlans(); + const matchingPlan = plans.find(plan => plan.session_id === planId); + + if (matchingPlan) { + // Found the plan! Replace URL with correct plan_id + console.log('Resolved session_id to plan_id:', matchingPlan.id); + navigate(`/plan/${matchingPlan.id}`, { replace: true }); + return; // Navigation will trigger reload with correct ID + } else { + // Plan not created yet, start polling + console.log('Plan not found yet, starting polling for session:', planId); + let attempts = 0; + const maxAttempts = 20; // Poll for up to 20 seconds + + const pollForPlan = async () => { + attempts++; + if (attempts > maxAttempts) { + console.error('Plan creation timed out after polling'); + setError(new Error('Plan creation is taking longer than expected. Please check your task list or try creating a new plan.')); + setLoading(false); + return; } - } catch (error) { - console.error('Polling error:', error); - if (attempts < maxAttempts) { - setTimeout(pollForPlan, 2000); // Wait longer on error + + try { + const plans = await apiService.getPlans(); + const plan = plans.find(p => p.session_id === planId); + + if (plan) { + console.log(`Found plan after ${attempts} attempts:`, plan.id); + navigate(`/plan/${plan.id}`, { replace: true }); + } else { + // Wait and try again + setTimeout(pollForPlan, 1000); // Poll every second + } + } catch (error) { + console.error('Polling error:', error); + if (attempts < maxAttempts) { + setTimeout(pollForPlan, 2000); // Wait longer on error + } } - } - }; - - pollForPlan(); + }; + + pollForPlan(); + } + } catch (error) { + console.error('Session resolution error:', error); + setError(error instanceof Error ? error : new Error('Failed to resolve plan from session')); + setLoading(false); } - } catch (error) { - console.error('Session resolution error:', error); - setError(error instanceof Error ? error : new Error('Failed to resolve plan from session')); - setLoading(false); + } else { + + console.log('Using plan_id directly:', planId); + loadPlanData(true); } - } else { - - console.log('Using plan_id directly:', planId); - loadPlanData(true); - } - }; + }; - initializePlanLoading (); -}, [planId, navigate, loadPlanData]); + initializePlanLoading(); + }, [planId, navigate, loadPlanData]); const handleNewTaskButton = () => { NewTaskService.handleNewTaskFromPlan(navigate); @@ -418,32 +418,32 @@ const PlanPage: React.FC = () => { /** * Handle team upload completion - refresh team list */ - const handleTeamUpload = useCallback(async () => { - try { - const teams = await TeamService.getUserTeams(); - console.log('Teams refreshed after upload:', teams.length); - - if (teams.length > 0) { - // Always keep "Human Resources Team" as default, even after new uploads - const hrTeam = teams.find(team => team.name === "Human Resources Team"); - const defaultTeam = hrTeam || teams[0]; - setSelectedTeam(defaultTeam); - console.log('Default team after upload:', defaultTeam.name); - - dispatchToast( - - Team Uploaded Successfully! - - Team uploaded. {defaultTeam.name} remains your default team. - - , - { intent: "success" } - ); + const handleTeamUpload = useCallback(async () => { + try { + const teams = await TeamService.getUserTeams(); + console.log('Teams refreshed after upload:', teams.length); + + if (teams.length > 0) { + // Always keep "Human Resources Team" as default, even after new uploads + const hrTeam = teams.find(team => team.name === "Human Resources Team"); + const defaultTeam = hrTeam || teams[0]; + setSelectedTeam(defaultTeam); + console.log('Default team after upload:', defaultTeam.name); + + dispatchToast( + + Team Uploaded Successfully! + + Team uploaded. {defaultTeam.name} remains your default team. + + , + { intent: "success" } + ); + } + } catch (error) { + console.error('Error refreshing teams after upload:', error); } - } catch (error) { - console.error('Error refreshing teams after upload:', error); - } -}, [dispatchToast]); + }, [dispatchToast]); if (!planId) { return ( @@ -456,9 +456,9 @@ const PlanPage: React.FC = () => { return ( - setReloadLeftList(false)} onTeamSelect={handleTeamSelect} onTeamUpload={handleTeamUpload} @@ -487,7 +487,7 @@ const PlanPage: React.FC = () => { /> - + {/* Show RAI error if present */} {raiError && (
@@ -500,7 +500,7 @@ const PlanPage: React.FC = () => { />
)} - + (); - plan.steps.forEach((step) => { - if (step.agent) { - uniqueAgents.add(step.agent); - } - }); + if (plan.steps && plan.steps.length > 0) { + plan.steps.forEach((step) => { + if (step.agent) { + uniqueAgents.add(step.agent); + } + }); + } + // Convert Set to Array for easier handling const agents = Array.from(uniqueAgents); // Get all steps - const steps = plan.steps; + const steps = plan.steps ?? []; // Check if human_clarification_request is not null const hasClarificationRequest = diff --git a/src/frontend/src/services/WebSocketService.tsx b/src/frontend/src/services/WebSocketService.tsx index 046a06ef7..e0e37985e 100644 --- a/src/frontend/src/services/WebSocketService.tsx +++ b/src/frontend/src/services/WebSocketService.tsx @@ -55,7 +55,7 @@ class WebSocketService { /** * Connect to WebSocket server */ - connect(): Promise { + connect(plan_id?: string): Promise { return new Promise((resolve, reject) => { if (this.isConnecting) { console.log('Connection attempt already in progress'); @@ -70,14 +70,14 @@ class WebSocketService { try { this.isConnecting = true; - + const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; const wsHost = process.env.REACT_APP_WS_HOST || '127.0.0.1:8000'; const processId = crypto.randomUUID(); const authHeaders = headerBuilder(); const userId = authHeaders['x-ms-client-principal-id']; - + if (!userId) { console.error('No user ID available for WebSocket connection'); this.isConnecting = false; @@ -86,12 +86,12 @@ class WebSocketService { } // Use query parameter for WebSocket authentication (as backend expects) - const wsUrl = `${wsProtocol}//${wsHost}/api/v3/socket/${processId}?user_id=${encodeURIComponent(userId)}`; + const wsUrl = `${wsProtocol}//${wsHost}/api/v3/socket/${processId}?user_id=${encodeURIComponent(userId)}${plan_id ? `&plan_id=${encodeURIComponent(plan_id)}` : ''}`; console.log('Connecting to WebSocket:', wsUrl); - + this.ws = new WebSocket(wsUrl); - + this.ws.onopen = () => { console.log('WebSocket connected successfully'); this.reconnectAttempts = 0; @@ -114,7 +114,7 @@ class WebSocketService { console.log('WebSocket disconnected', event.code, event.reason); this.isConnecting = false; this.emit('connection_status', { connected: false }); - + if (event.code !== 1000) { this.attemptReconnect(); } @@ -139,14 +139,14 @@ class WebSocketService { */ disconnect(): void { console.log('Manually disconnecting WebSocket'); - + if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } - + this.reconnectAttempts = this.maxReconnectAttempts; - + if (this.ws) { this.ws.close(1000, 'Manual disconnect'); this.ws = null; @@ -164,7 +164,7 @@ class WebSocketService { type: 'subscribe_plan', plan_id: planId }; - + this.ws.send(JSON.stringify(message)); this.planSubscriptions.add(planId); console.log(`Subscribed to plan updates: ${planId}`); @@ -180,7 +180,7 @@ class WebSocketService { type: 'unsubscribe_plan', plan_id: planId }; - + this.ws.send(JSON.stringify(message)); this.planSubscriptions.delete(planId); console.log(`Unsubscribed from plan updates: ${planId}`); @@ -194,7 +194,7 @@ class WebSocketService { if (!this.listeners.has(eventType)) { this.listeners.set(eventType, new Set()); } - + this.listeners.get(eventType)!.add(callback); return () => { @@ -277,13 +277,13 @@ class WebSocketService { this.reconnectAttempts++; const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1); - - console.log(`Scheduling reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay/1000}s`); - + + console.log(`Scheduling reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay / 1000}s`); + this.reconnectTimer = setTimeout(() => { this.reconnectTimer = null; console.log(`Attempting reconnection (attempt ${this.reconnectAttempts})`); - + this.connect() .then(() => { console.log('Reconnection successful - re-subscribing to plans');