diff --git a/src/backend/common/database/cosmosdb.py b/src/backend/common/database/cosmosdb.py index ecb42237a..956d5025e 100644 --- a/src/backend/common/database/cosmosdb.py +++ b/src/backend/common/database/cosmosdb.py @@ -11,7 +11,7 @@ from azure.cosmos.aio import CosmosClient from azure.cosmos.aio._database import DatabaseProxy from azure.cosmos.exceptions import CosmosResourceExistsError -from pytest import Session +import v3.models.messages as messages from common.models.messages_kernel import ( AgentMessage, @@ -23,8 +23,8 @@ from .database_base import DatabaseBase from ..models.messages_kernel import ( + AgentMessageData, BaseDataModel, - Session, Plan, Step, AgentMessage, @@ -38,7 +38,6 @@ class CosmosDBClient(DatabaseBase): """CosmosDB implementation of the database interface.""" MODEL_CLASS_MAPPING = { - DataType.session: Session, DataType.plan: Plan, DataType.step: Step, DataType.agent_message: AgentMessage, @@ -190,29 +189,6 @@ async def delete_item(self, item_id: str, partition_key: str) -> None: self.logger.error("Failed to delete item from CosmosDB: %s", str(e)) raise - # Session Operations - async def add_session(self, session: Session) -> None: - """Add a session to CosmosDB.""" - await self.add_item(session) - - async def get_session(self, session_id: str) -> Optional[Session]: - """Retrieve a session by session_id.""" - query = "SELECT * FROM c WHERE c.id=@id AND c.data_type=@data_type" - parameters = [ - {"name": "@id", "value": session_id}, - {"name": "@data_type", "value": DataType.session}, - ] - results = await self.query_items(query, parameters, Session) - return results[0] if results else None - - async def get_all_sessions(self) -> List[Session]: - """Retrieve all sessions for the user.""" - query = "SELECT * FROM c WHERE c.user_id=@user_id AND c.data_type=@data_type" - parameters = [ - {"name": "@user_id", "value": self.user_id}, - {"name": "@data_type", "value": DataType.session}, - ] - return await self.query_items(query, parameters, Session) # Plan Operations async def add_plan(self, plan: Plan) -> None: @@ -223,17 +199,6 @@ async def update_plan(self, plan: Plan) -> None: """Update a plan in CosmosDB.""" await self.update_item(plan) - async def get_plan_by_session(self, session_id: str) -> Optional[Plan]: - """Retrieve a plan by session_id.""" - query = ( - "SELECT * FROM c WHERE c.session_id=@session_id AND c.data_type=@data_type" - ) - parameters = [ - {"name": "@session_id", "value": session_id}, - {"name": "@data_type", "value": DataType.plan}, - ] - results = await self.query_items(query, parameters, Plan) - return results[0] if results else None async def get_plan_by_plan_id(self, plan_id: str) -> Optional[Plan]: """Retrieve a plan by plan_id.""" @@ -272,7 +237,7 @@ async def get_all_plans_by_team_id(self, team_id: str) -> List[Plan]: async def get_all_plans_by_team_id_status(self, team_id: str, status: str) -> List[Plan]: """Retrieve all plans for a specific team.""" - query = "SELECT * FROM c WHERE c.team_id=@team_id AND c.data_type=@data_type and c.user_id=@user_id and c.overall_status=@status" + query = "SELECT * FROM c WHERE c.team_id=@team_id AND c.data_type=@data_type and c.user_id=@user_id and c.overall_status=@status ORDER BY c._ts DESC" parameters = [ {"name": "@user_id", "value": self.user_id}, {"name": "@team_id", "value": team_id}, @@ -328,7 +293,7 @@ async def get_team(self, team_id: str) -> Optional[TeamConfiguration]: teams = await self.query_items(query, parameters, TeamConfiguration) return teams[0] if teams else None - async def get_team_by_id(self, id: str) -> Optional[TeamConfiguration]: + async def get_team_by_id(self, team_id: str) -> Optional[TeamConfiguration]: """Retrieve a specific team configuration by its document id. Args: @@ -337,9 +302,9 @@ async def get_team_by_id(self, id: str) -> Optional[TeamConfiguration]: Returns: TeamConfiguration object or None if not found """ - query = "SELECT * FROM c WHERE c.id=@id AND c.data_type=@data_type" + query = "SELECT * FROM c WHERE c.team_id=@team_id AND c.data_type=@data_type" parameters = [ - {"name": "@id", "value": id}, + {"name": "@team_id", "value": team_id}, {"name": "@data_type", "value": DataType.team_config}, ] teams = await self.query_items(query, parameters, TeamConfiguration) @@ -383,27 +348,6 @@ async def delete_team(self, team_id: str) -> bool: logging.exception(f"Failed to delete team from Cosmos DB: {e}") return False - async def get_data_by_type_and_session_id( - self, data_type: str, session_id: str - ) -> List[BaseDataModel]: - """Query the Cosmos DB for documents with the matching data_type, session_id and user_id.""" - await self._ensure_initialized() - if self.container is None: - return [] - - model_class = self.MODEL_CLASS_MAPPING.get(data_type, BaseDataModel) - try: - query = "SELECT * FROM c WHERE c.session_id=@session_id AND c.user_id=@user_id AND c.data_type=@data_type ORDER BY c._ts ASC" - parameters = [ - {"name": "@session_id", "value": session_id}, - {"name": "@data_type", "value": data_type}, - {"name": "@user_id", "value": self.user_id}, - ] - return await self.query_items(query, parameters, model_class) - except Exception as e: - logging.exception(f"Failed to query data by type from Cosmos DB: {e}") - return [] - # Data Management Operations async def get_data_by_type(self, data_type: str) -> List[BaseDataModel]: """Retrieve all data of a specific type.""" @@ -470,13 +414,89 @@ async def get_current_team(self, user_id: str) -> Optional[UserCurrentTeam]: teams = await self.query_items(query, parameters, UserCurrentTeam) return teams[0] if teams else None + + + async def delete_current_team(self, user_id: str) -> bool: + """Delete the current team for a user.""" + query = "SELECT c.id, c.session_id FROM c WHERE c.user_id=@user_id AND c.data_type=@data_type" + + params = [ + {"name": "@user_id", "value": user_id}, + {"name": "@data_type", "value": DataType.user_current_team}, + ] + items = self.container.query_items(query=query, parameters=params) + print("Items to delete:", items) + if items: + async for doc in items: + try: + await self.container.delete_item(doc["id"], partition_key=doc["session_id"]) + except Exception as e: + self.logger.warning("Failed deleting current team doc %s: %s", doc.get("id"), e) + + return True + async def set_current_team(self, current_team: UserCurrentTeam) -> None: """Set the current team for a user.""" await self._ensure_initialized() await self.add_item(current_team) - async def update_current_team(self, current_team: UserCurrentTeam) -> None: """Update the current team for a user.""" await self._ensure_initialized() await self.update_item(current_team) + + async def delete_plan_by_plan_id(self, plan_id: str) -> bool: + """Delete a plan by its ID.""" + query = "SELECT c.id, c.session_id FROM c WHERE c.id=@plan_id " + + params = [ + {"name": "@plan_id", "value": plan_id}, + ] + items = self.container.query_items(query=query, parameters=params) + print("Items to delete planid:", items) + if items: + async for doc in items: + try: + await self.container.delete_item(doc["id"], partition_key=doc["session_id"]) + except Exception as e: + self.logger.warning("Failed deleting current team doc %s: %s", doc.get("id"), e) + + return True + + async def add_mplan(self, mplan: messages.MPlan) -> None: + """Add a team configuration to the database.""" + await self.add_item(mplan) + + async def update_mplan(self, mplan: messages.MPlan) -> None: + """Update a team configuration in the database.""" + await self.update_item(mplan) + + + async def get_mplan(self, plan_id: str) -> Optional[messages.MPlan]: + """Retrieve a mplan configuration by mplan_id.""" + query = "SELECT * FROM c WHERE c.plan_id=@plan_id AND c.data_type=@data_type" + parameters = [ + {"name": "@plan_id", "value": plan_id}, + {"name": "@data_type", "value": DataType.m_plan}, + ] + results = await self.query_items(query, parameters, messages.MPlan) + return results[0] if results else None + + + async def add_agent_message(self, message: AgentMessageData) -> None: + """Add an agent message to the database.""" + await self.add_item(message) + + async def update_agent_message(self, message: AgentMessageData) -> None: + """Update an agent message in the database.""" + await self.update_item(message) + + async def get_agent_messages(self, plan_id: str) -> List[AgentMessageData]: + """Retrieve an agent message by message_id.""" + query = "SELECT * FROM c WHERE c.plan_id=@plan_id AND c.data_type=@data_type ORDER BY c._ts ASC" + parameters = [ + {"name": "@plan_id", "value": plan_id}, + {"name": "@data_type", "value": DataType.m_plan_message}, + ] + + return await self.query_items(query, parameters, AgentMessageData) \ No newline at end of file diff --git a/src/backend/common/database/database_base.py b/src/backend/common/database/database_base.py index 6b4572133..30f173004 100644 --- a/src/backend/common/database/database_base.py +++ b/src/backend/common/database/database_base.py @@ -2,10 +2,10 @@ from abc import ABC, abstractmethod from typing import Any, Dict, List, Optional, Type - +import v3.models.messages as messages from ..models.messages_kernel import ( + AgentMessageData, BaseDataModel, - Session, Plan, Step, TeamConfiguration, @@ -59,21 +59,6 @@ async def delete_item(self, item_id: str, partition_key: str) -> None: """Delete an item from the database.""" pass - # Session Operations - @abstractmethod - async def add_session(self, session: Session) -> None: - """Add a session to the database.""" - pass - - @abstractmethod - async def get_session(self, session_id: str) -> Optional[Session]: - """Retrieve a session by session_id.""" - pass - - @abstractmethod - async def get_all_sessions(self) -> List[Session]: - """Retrieve all sessions for the user.""" - pass # Plan Operations @abstractmethod @@ -86,11 +71,6 @@ async def update_plan(self, plan: Plan) -> None: """Update a plan in the database.""" pass - @abstractmethod - async def get_plan_by_session(self, session_id: str) -> Optional[Plan]: - """Retrieve a plan by session_id.""" - pass - @abstractmethod async def get_plan_by_plan_id(self, plan_id: str) -> Optional[Plan]: """Retrieve a plan by plan_id.""" @@ -118,11 +98,7 @@ async def get_all_plans_by_team_id_status( """Retrieve all plans for a specific team.""" pass - @abstractmethod - async def get_data_by_type_and_session_id( - self, data_type: str, session_id: str - ) -> List[BaseDataModel]: - pass + # Step Operations @abstractmethod @@ -162,7 +138,7 @@ async def get_team(self, team_id: str) -> Optional[TeamConfiguration]: pass @abstractmethod - async def get_team_by_id(self, id: str) -> Optional[TeamConfiguration]: + async def get_team_by_id(self, team_id: str) -> Optional[TeamConfiguration]: """Retrieve a team configuration by internal id.""" pass @@ -207,6 +183,11 @@ async def get_current_team(self, user_id: str) -> Optional[UserCurrentTeam]: """Retrieve the current team for a user.""" pass + @abstractmethod + async def delete_current_team(self, user_id: str) -> Optional[UserCurrentTeam]: + """Retrieve the current team for a user.""" + pass + @abstractmethod async def set_current_team(self, current_team: UserCurrentTeam) -> None: pass @@ -215,3 +196,37 @@ async def set_current_team(self, current_team: UserCurrentTeam) -> None: async def update_current_team(self, current_team: UserCurrentTeam) -> None: """Update the current team for a user.""" pass + + @abstractmethod + async def delete_plan_by_plan_id(self, plan_id: str) -> bool: + """Retrieve the current team for a user.""" + pass + + @abstractmethod + async def add_mplan(self, mplan: messages.MPlan) -> None: + """Add a team configuration to the database.""" + pass + + @abstractmethod + async def update_mplan(self, mplan: messages.MPlan) -> None: + """Update a team configuration in the database.""" + pass + + @abstractmethod + async def get_mplan(self, plan_id: str) -> Optional[messages.MPlan]: + """Retrieve a mplan configuration by plan_id.""" + pass + + @abstractmethod + async def add_agent_message(self, message: AgentMessageData) -> None: + pass + + @abstractmethod + async def update_agent_message(self, message: AgentMessageData) -> None: + """Update an agent message in the database.""" + pass + + @abstractmethod + async def get_agent_messages(self, plan_id: str) -> Optional[AgentMessageData]: + """Retrieve an agent message by message_id.""" + pass \ No newline at end of file diff --git a/src/backend/common/models/messages_kernel.py b/src/backend/common/models/messages_kernel.py index e6b98beff..d87902417 100644 --- a/src/backend/common/models/messages_kernel.py +++ b/src/backend/common/models/messages_kernel.py @@ -2,10 +2,8 @@ from datetime import datetime, timezone from enum import Enum from typing import Any, Dict, List, Literal, Optional - - +from pydantic import BaseModel from semantic_kernel.kernel_pydantic import Field, KernelBaseModel -from dataclasses import dataclass class DataType(str, Enum): """Enumeration of possible data types for documents in the database.""" @@ -55,6 +53,8 @@ class PlanStatus(str, Enum): completed = "completed" failed = "failed" canceled = "canceled" + approved = "approved" + created = "created" class HumanFeedbackStatus(str, Enum): @@ -78,6 +78,7 @@ class BaseDataModel(KernelBaseModel): """Base data model with common fields.""" id: str = Field(default_factory=lambda: str(uuid.uuid4())) + session_id: str = Field(default_factory=lambda: str(uuid.uuid4())) timestamp: Optional[datetime] = Field( default_factory=lambda: datetime.now(timezone.utc) ) @@ -87,8 +88,6 @@ class AgentMessage(BaseDataModel): """Base class for messages sent between agents.""" data_type: Literal[DataType.agent_message] = Field(DataType.agent_message, Literal=True) - session_id: str - user_id: str plan_id: str content: str source: str @@ -112,18 +111,32 @@ class UserCurrentTeam(BaseDataModel): user_id: str team_id: str - +class MStep(BaseModel): + """model of a step in a plan""" + agent: str = "" + action: str = "" +class MPlan(BaseModel): + """model of a plan""" + id: str = Field(default_factory=lambda: str(uuid.uuid4())) + user_id: str = "" + team_id: str = "" + plan_id: str = "" + overall_status: PlanStatus = PlanStatus.created + user_request: str = "" + team: List[str] = [] + facts: str = "" + steps: List[MStep] = [] class Plan(BaseDataModel): """Represents a plan containing multiple steps.""" data_type: Literal[DataType.plan] = Field(DataType.plan, Literal=True) plan_id: str - session_id: str user_id: str initial_goal: str overall_status: PlanStatus = PlanStatus.in_progress approved: bool = False source: str = AgentType.PLANNER.value + m_plan: Optional[MPlan] = None summary: Optional[str] = None team_id: Optional[str] = None human_clarification_request: Optional[str] = None @@ -135,7 +148,6 @@ class Step(BaseDataModel): data_type: Literal[DataType.step] = Field(DataType.step, Literal=True) plan_id: str - session_id: str # Partition key user_id: str action: str agent: AgentType @@ -146,7 +158,7 @@ class Step(BaseDataModel): updated_action: Optional[str] = None -class TeamSelectionRequest(KernelBaseModel): +class TeamSelectionRequest(BaseDataModel): """Request model for team selection.""" team_id: str @@ -262,6 +274,15 @@ class AgentMessageType(str, Enum): class AgentMessageData (BaseDataModel): + + data_type: Literal[DataType.m_plan_message] = Field(DataType.m_plan_message, Literal=True) + plan_id: str + user_id: str agent: str + m_plan_id: Optional[str] = None agent_type: AgentMessageType = AgentMessageType.AI_AGENT - content: str \ No newline at end of file + content: str + raw_data: str + steps: List[Any] = Field(default_factory=list) + next_steps: List[Any] = Field(default_factory=list) + \ No newline at end of file diff --git a/src/backend/v3/api/router.py b/src/backend/v3/api/router.py index 0d99ca29c..1eb79a4f5 100644 --- a/src/backend/v3/api/router.py +++ b/src/backend/v3/api/router.py @@ -9,18 +9,36 @@ from auth.auth_utils import get_authenticated_user_details from common.config.app_config import config from common.database.database_factory import DatabaseFactory -from common.models.messages_kernel import (InputTask, Plan, PlanStatus, - PlanWithSteps, TeamSelectionRequest) +from common.models.messages_kernel import ( + InputTask, + Plan, + PlanStatus, + PlanWithSteps, + TeamSelectionRequest, +) from common.utils.event_utils import track_event_if_configured from common.utils.utils_date import format_dates_in_messages from common.utils.utils_kernel import rai_success, rai_validate_team_config -from fastapi import (APIRouter, BackgroundTasks, File, HTTPException, Query, - Request, UploadFile, WebSocket, WebSocketDisconnect) +from fastapi import ( + APIRouter, + BackgroundTasks, + File, + HTTPException, + Query, + Request, + UploadFile, + WebSocket, + WebSocketDisconnect, +) from semantic_kernel.agents.runtime import InProcessRuntime from v3.common.services.plan_service import PlanService from v3.common.services.team_service import TeamService -from v3.config.settings import (connection_config, current_user_id, - orchestration_config, team_config) +from v3.config.settings import ( + connection_config, + current_user_id, + orchestration_config, + team_config, +) from v3.orchestration.orchestration_manager import OrchestrationManager router = APIRouter() @@ -117,9 +135,12 @@ async def init_team( team_service = TeamService(memory_store) user_current_team = await memory_store.get_current_team(user_id=user_id) if not user_current_team: - await team_service.handle_team_selection( + print("User has no current team, setting to default:", init_team_id) + user_current_team = await team_service.handle_team_selection( user_id=user_id, team_id=init_team_id ) + if user_current_team: + init_team_id = user_current_team.team_id else: init_team_id = user_current_team.team_id # Verify the team exists and user has access to it @@ -337,7 +358,56 @@ async def run_with_context(): async def plan_approval( human_feedback: messages.PlanApprovalResponse, request: Request ): - """Endpoint to receive plan approval or rejection from the user.""" + """ + Endpoint to receive plan approval or rejection from the user. + + --- + tags: + - Plans + parameters: + - name: user_principal_id + in: header + type: string + required: true + description: User ID extracted from the authentication header + requestBody: + description: Plan approval payload + required: true + content: + application/json: + schema: + type: object + properties: + m_plan_id: + type: string + description: The internal m_plan id for the plan (required) + approved: + type: boolean + description: Whether the plan is approved (true) or rejected (false) + feedback: + type: string + description: Optional feedback or comment from the user + plan_id: + type: string + description: Optional user-facing plan_id + responses: + 200: + description: Approval recorded successfully + content: + application/json: + schema: + type: object + properties: + status: + type: string + 401: + description: Missing or invalid user information + 404: + description: No active plan found for approval + 500: + description: Internal server error + """ + authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] if not user_id: @@ -399,7 +469,51 @@ async def plan_approval( async def user_clarification( human_feedback: messages.UserClarificationResponse, request: Request ): - """Endpoint to receive plan approval or rejection from the user.""" + """ + Endpoint to receive user clarification responses for clarification requests sent by the system. + + --- + tags: + - Plans + parameters: + - name: user_principal_id + in: header + type: string + required: true + description: User ID extracted from the authentication header + requestBody: + description: User clarification payload + required: true + content: + application/json: + schema: + type: object + properties: + request_id: + type: string + description: The clarification request id sent by the system (required) + answer: + type: string + description: The user's answer or clarification text + plan_id: + type: string + description: (Optional) Associated plan_id + m_plan_id: + type: string + description: (Optional) Internal m_plan id + responses: + 200: + description: Clarification recorded successfully + 400: + description: RAI check failed or invalid input + 401: + description: Missing or invalid user information + 404: + description: No active plan found for clarification + 500: + description: Internal server error + """ + authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] if not user_id: @@ -442,15 +556,27 @@ async def user_clarification( orchestration_config.clarifications[human_feedback.request_id] = ( human_feedback.answer ) + + try: + result = await PlanService.handle_human_clarification( + human_feedback, user_id + ) + print("Human clarification processed:", result) + except ValueError as ve: + print(f"ValueError processing human clarification: {ve}") + except Exception as e: + print(f"Error processing human clarification: {e}") track_event_if_configured( - "PlanApprovalReceived", + "HumanClarificationReceived", { "request_id": human_feedback.request_id, "answer": human_feedback.answer, "user_id": user_id, }, ) - return {"status": "clarification recorded"} + return { + "status": "clarification recorded", + } else: logging.warning( f"No orchestration or plan found for request_id: {human_feedback.request_id}" @@ -460,6 +586,87 @@ async def user_clarification( ) +@app_v3.post("/agent_message") +async def agent_message_user( + agent_message: messages.AgentMessageResponse, request: Request +): + """ + Endpoint to receive messages from agents (agent -> user communication). + + --- + tags: + - Agents + parameters: + - name: user_principal_id + in: header + type: string + required: true + description: User ID extracted from the authentication header + requestBody: + description: Agent message payload + required: true + content: + application/json: + schema: + type: object + properties: + plan_id: + type: string + description: ID of the plan this message relates to + agent: + type: string + description: Name or identifier of the agent sending the message + content: + type: string + description: The message content + agent_type: + type: string + description: Type of agent (AI/Human) + m_plan_id: + type: string + description: Optional internal m_plan id + responses: + 200: + description: Message recorded successfully + schema: + type: object + properties: + status: + type: string + 401: + description: Missing or invalid user information + """ + + authenticated_user = get_authenticated_user_details(request_headers=request.headers) + user_id = authenticated_user["user_principal_id"] + if not user_id: + raise HTTPException( + status_code=401, detail="Missing or invalid user information" + ) + # Set the approval in the orchestration config + + try: + + result = await PlanService.handle_agent_messages(agent_message, user_id) + print("Agent message processed:", result) + except ValueError as ve: + print(f"ValueError processing agent message: {ve}") + except Exception as e: + print(f"Error processing agent message: {e}") + + track_event_if_configured( + "AgentMessageReceived", + { + "agent": agent_message.agent, + "content": agent_message.content, + "user_id": user_id, + }, + ) + return { + "status": "message recorded", + } + + @app_v3.post("/upload_team_config") async def upload_team_config( request: Request, @@ -519,24 +726,24 @@ async def upload_team_config( ) # Validate content with RAI before processing - rai_valid, rai_error = await rai_validate_team_config(json_data) - if not rai_valid: - track_event_if_configured( - "Team configuration RAI validation failed", - { - "status": "failed", - "user_id": user_id, - "filename": file.filename, - "reason": rai_error, - }, - ) - raise HTTPException(status_code=400, detail=rai_error) + if not team_id: + rai_valid, rai_error = await rai_validate_team_config(json_data) + if not rai_valid: + track_event_if_configured( + "Team configuration RAI validation failed", + { + "status": "failed", + "user_id": user_id, + "filename": file.filename, + "reason": rai_error, + }, + ) + raise HTTPException(status_code=400, detail=rai_error) track_event_if_configured( "Team configuration RAI validation passed", {"status": "passed", "user_id": user_id, "filename": file.filename}, ) - # Initialize memory store and service memory_store = await DatabaseFactory.get_database(user_id=user_id) team_service = TeamService(memory_store) @@ -896,7 +1103,7 @@ async def select_team(selection: TeamSelectionRequest, request: Request): team_configuration = await team_service.get_team_configuration( selection.team_id, user_id ) - if team_config is None: + if team_configuration is None: # ensure that id is valid raise HTTPException( status_code=404, detail=f"Team configuration '{selection.team_id}' not found or access denied", @@ -920,9 +1127,9 @@ async def select_team(selection: TeamSelectionRequest, request: Request): ) # save to in-memory config for current user - team_config.set_current_team( - user_id=user_id, team_configuration=team_configuration - ) + # team_config.set_current_team( + # user_id=user_id, team_configuration=team_configuration + # ) # Track the team selection event track_event_if_configured( @@ -1043,20 +1250,12 @@ async def get_plans(request: Request): team_id=current_team.team_id, status=PlanStatus.completed ) - # Create list of PlanWithSteps and update step counts - list_of_plans_with_steps = [] - for plan in all_plans: - plan_with_steps = PlanWithSteps(**plan.model_dump(), steps=[]) - plan_with_steps.overall_status - plan_with_steps.update_step_counts() - list_of_plans_with_steps.append(plan_with_steps) - - return list_of_plans_with_steps + return all_plans # Get plans is called in the initial side rendering of the frontend @app_v3.get("/plan") -async def get_plan_by_id(request: Request, plan_id: str): +async def get_plan_by_id(request: Request, plan_id: Optional[str] = Query(None),): """ Retrieve plans for the current user. @@ -1127,33 +1326,32 @@ async def get_plan_by_id(request: Request, plan_id: str): # # Initialize memory context memory_store = await DatabaseFactory.get_database(user_id=user_id) + try: + if plan_id: + plan = await memory_store.get_plan_by_plan_id(plan_id=plan_id) + if not plan: + track_event_if_configured( + "GetPlanBySessionNotFound", + {"status_code": 400, "detail": "Plan not found"}, + ) + raise HTTPException(status_code=404, detail="Plan not found") - if plan_id: - plan = await memory_store.get_plan_by_plan_id(plan_id=plan_id) - if not plan: - track_event_if_configured( - "GetPlanBySessionNotFound", - {"status_code": 400, "detail": "Plan not found"}, - ) - raise HTTPException(status_code=404, detail="Plan not found") - - # Use get_steps_by_plan to match the original implementation - steps = await memory_store.get_steps_by_plan(plan_id=plan.id) - messages = await memory_store.get_data_by_type_and_session_id( - "agent_message", session_id=plan.session_id - ) + # Use get_steps_by_plan to match the original implementation - plan_with_steps = PlanWithSteps(**plan.model_dump(), steps=steps) - plan_with_steps.update_step_counts() + team = await memory_store.get_team_by_id(team_id=plan.team_id) + agent_messages = await memory_store.get_agent_messages(plan_id=plan.plan_id) - # Format dates in messages according to locale - formatted_messages = format_dates_in_messages( - messages, config.get_user_local_browser_language() - ) - - return [plan_with_steps, formatted_messages] - else: - track_event_if_configured( - "GetPlanId", {"status_code": 400, "detail": "no plan id"} - ) - raise HTTPException(status_code=400, detail="no plan id") + return { + "plan": plan, + "team": team if team else None, + "messages": agent_messages, + "m_plan": plan.m_plan, + } + else: + track_event_if_configured( + "GetPlanId", {"status_code": 400, "detail": "no plan id"} + ) + raise HTTPException(status_code=400, detail="no plan id") + except Exception as e: + logging.error(f"Error retrieving plan: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error occurred") diff --git a/src/backend/v3/common/services/plan_service.py b/src/backend/v3/common/services/plan_service.py index 62def1035..ab8ee1f2f 100644 --- a/src/backend/v3/common/services/plan_service.py +++ b/src/backend/v3/common/services/plan_service.py @@ -1,18 +1,133 @@ +from dataclasses import Field, asdict +import json import logging +import time from typing import Dict, Any, Optional from common.database.database_factory import DatabaseFactory -from common.database.database_base import DatabaseBase + +from v3.models.models import MPlan import v3.models.messages as messages +from common.models.messages_kernel import ( + AgentMessageData, + AgentMessageType, + AgentType, + PlanStatus, +) from v3.config.settings import orchestration_config from common.utils.event_utils import track_event_if_configured +import uuid +from semantic_kernel.kernel_pydantic import Field + logger = logging.getLogger(__name__) -class PlanService: + +def build_agent_message_from_user_clarification( + human_feedback: messages.UserClarificationResponse, user_id: str +) -> AgentMessageData: + """ + Convert a UserClarificationResponse (human feedback) into an AgentMessageData. + """ + # NOTE: AgentMessageType enum currently defines values with trailing commas in messages_kernel.py. + # e.g. HUMAN_AGENT = "Human_Agent", -> value becomes ('Human_Agent',) + # Consider fixing that enum (remove trailing commas) so .value is a string. + return AgentMessageData( + plan_id=human_feedback.plan_id or "", + user_id=user_id, + m_plan_id=human_feedback.m_plan_id or None, + agent=AgentType.HUMAN.value, # or simply "Human_Agent" + agent_type=AgentMessageType.HUMAN_AGENT, # will serialize per current enum definition + content=human_feedback.answer or "", + raw_data=json.dumps(asdict(human_feedback)), + steps=[], # intentionally empty + next_steps=[], # intentionally empty + ) + + +def build_agent_message_from_agent_message_response( + agent_response: messages.AgentMessageResponse, + user_id: str, +) -> AgentMessageData: + """ + Convert a messages.AgentMessageResponse into common.models.messages_kernel.AgentMessageData. + This is defensive: it tolerates missing fields and different timestamp formats. + """ + # Robust timestamp parsing (accepts seconds or ms or missing) + + # Raw data serialization + raw = getattr(agent_response, "raw_data", None) + try: + if raw is None: + # try asdict if it's a dataclass-like + try: + raw_str = json.dumps(asdict(agent_response)) + except Exception: + raw_str = json.dumps( + { + k: getattr(agent_response, k) + for k in dir(agent_response) + if not k.startswith("_") + } + ) + elif isinstance(raw, (dict, list)): + raw_str = json.dumps(raw) + else: + raw_str = str(raw) + except Exception: + raw_str = json.dumps({"raw": str(raw)}) + + # Steps / next_steps defaulting + steps = getattr(agent_response, "steps", []) or [] + next_steps = getattr(agent_response, "next_steps", []) or [] + + # Agent name and type + agent_name = ( + getattr(agent_response, "agent", "") + or getattr(agent_response, "agent_name", "") + or getattr(agent_response, "source", "") + ) + # Try to infer agent_type, fallback to AI_AGENT + agent_type_raw = getattr(agent_response, "agent_type", None) + if isinstance(agent_type_raw, AgentMessageType): + agent_type = agent_type_raw + else: + # Normalize common strings + agent_type_str = str(agent_type_raw or "").lower() + if "human" in agent_type_str: + agent_type = AgentMessageType.HUMAN_AGENT + else: + agent_type = AgentMessageType.AI_AGENT + + # Content + content = ( + getattr(agent_response, "content", "") + or getattr(agent_response, "text", "") + or "" + ) + + # plan_id / user_id fallback + plan_id_val = getattr(agent_response, "plan_id", "") or "" + user_id_val = getattr(agent_response, "user_id", "") or user_id + + return AgentMessageData( + plan_id=plan_id_val, + user_id=user_id_val, + m_plan_id=getattr(agent_response, "m_plan_id", ""), + agent=agent_name, + agent_type=agent_type, + content=content, + raw_data=raw_str, + steps=list(steps), + next_steps=list(next_steps), + ) +class PlanService: + @staticmethod - async def handle_plan_approval(human_feedback: messages.PlanApprovalResponse, user_id: str) -> bool: + async def handle_plan_approval( + human_feedback: messages.PlanApprovalResponse, user_id: str + ) -> bool: """ Process a PlanApprovalResponse coming from the client. @@ -30,24 +145,115 @@ async def handle_plan_approval(human_feedback: messages.PlanApprovalResponse, us return False try: mplan = orchestration_config.plans[human_feedback.m_plan_id] + memory_store = await DatabaseFactory.get_database(user_id=user_id) if hasattr(mplan, "plan_id"): print( "Updated orchestration config:", orchestration_config.plans[human_feedback.m_plan_id], ) - mplan.plan_id = human_feedback.plan_id - orchestration_config.plans[human_feedback.m_plan_id] = mplan - memory_store = await DatabaseFactory.get_database(user_id=user_id) - plan = await memory_store.get_plan(human_feedback.plan_id) - if plan: - print("Retrieved plan from memory store:", plan) - - - else: - print("Plan not found in memory store.") - return False + if human_feedback.approved: + plan = await memory_store.get_plan(human_feedback.plan_id) + mplan.plan_id = human_feedback.plan_id + mplan.team_id = plan.team_id # just to keep consistency + orchestration_config.plans[human_feedback.m_plan_id] = mplan + if plan: + plan.overall_status = PlanStatus.approved + plan.m_plan = mplan + await memory_store.update_plan(plan) + track_event_if_configured( + "PlanApproved", + { + "m_plan_id": human_feedback.m_plan_id, + "plan_id": human_feedback.plan_id, + "user_id": user_id, + }, + ) + else: + print("Plan not found in memory store.") + return False + else: # reject plan + track_event_if_configured( + "PlanRejected", + { + "m_plan_id": human_feedback.m_plan_id, + "plan_id": human_feedback.plan_id, + "user_id": user_id, + }, + ) + await memory_store.delete_plan_by_plan_id(human_feedback.plan_id) except Exception as e: print(f"Error processing plan approval: {e}") return False return True + + @staticmethod + async def handle_agent_messages( + agent_message: messages.AgentMessageResponse, user_id: str + ) -> bool: + """ + Process an AgentMessage coming from the client. + + Args: + standard_message: messages.AgentMessage (contains relevant message data) + user_id: authenticated user id + + Returns: + dict with status and metadata + + Raises: + ValueError on invalid state + """ + try: + agent_msg = build_agent_message_from_agent_message_response( + agent_message, user_id + ) + + # Persist if your database layer supports it. + # Look for or implement something like: memory_store.add_agent_message(agent_msg) + memory_store = await DatabaseFactory.get_database(user_id=user_id) + await memory_store.add_agent_message(agent_msg) + if agent_message.is_final: + plan = await memory_store.get_plan(agent_msg.plan_id) + plan.overall_status = PlanStatus.completed + await memory_store.update_plan(plan) + return True + except Exception as e: + logger.exception( + "Failed to handle human clarification -> agent message: %s", e + ) + return False + + @staticmethod + async def handle_human_clarification( + human_feedback: messages.UserClarificationResponse, user_id: str + ) -> bool: + """ + Process a UserClarificationResponse coming from the client. + + Args: + human_feedback: messages.UserClarificationResponse (contains relevant message data) + user_id: authenticated user id + + Returns: + dict with status and metadata + + Raises: + ValueError on invalid state + """ + try: + agent_msg = build_agent_message_from_user_clarification( + human_feedback, user_id + ) + + # Persist if your database layer supports it. + # Look for or implement something like: memory_store.add_agent_message(agent_msg) + memory_store = await DatabaseFactory.get_database(user_id=user_id) + await memory_store.add_agent_message(agent_msg) + + return True + except Exception as e: + logger.exception( + "Failed to handle human clarification -> agent message: %s", e + ) + return False diff --git a/src/backend/v3/common/services/team_service.py b/src/backend/v3/common/services/team_service.py index 72fb4d0c6..1e7251921 100644 --- a/src/backend/v3/common/services/team_service.py +++ b/src/backend/v3/common/services/team_service.py @@ -235,7 +235,7 @@ async def delete_user_current_team(self, user_id: str) -> bool: True if successful, False otherwise """ try: - await self.memory_context.delete_user_current_team(user_id) + await self.memory_context.delete_current_team(user_id) self.logger.info("Successfully deleted current team for user %s", user_id) return True @@ -243,7 +243,7 @@ async def delete_user_current_team(self, user_id: str) -> bool: self.logger.error("Error deleting current team: %s", str(e)) return False - async def handle_team_selection(self, user_id: str, team_id: str) -> bool: + async def handle_team_selection(self, user_id: str, team_id: str) -> UserCurrentTeam: """ Set a default team for a user. @@ -254,25 +254,21 @@ async def handle_team_selection(self, user_id: str, team_id: str) -> bool: Returns: True if successful, False otherwise """ + print("Handling team selection for user:", user_id, "team:", team_id) try: - current_team = await self.memory_context.get_current_team(user_id) - - if current_team is None: - current_team = UserCurrentTeam(user_id=user_id, team_id=team_id) - await self.memory_context.set_current_team(current_team) - return True - else: - current_team.team_id = team_id - await self.memory_context.update_current_team(current_team) - return True + await self.memory_context.delete_current_team(user_id) + current_team = UserCurrentTeam( + user_id=user_id, + team_id=team_id, + ) + await self.memory_context.set_current_team(current_team) + return current_team except Exception as e: self.logger.error("Error setting default team: %s", str(e)) - return False + return None - async def get_all_team_configurations( - self - ) -> List[TeamConfiguration]: + async def get_all_team_configurations(self) -> List[TeamConfiguration]: """ Retrieve all team configurations for a user. diff --git a/src/backend/v3/config/settings.py b/src/backend/v3/config/settings.py index 518290e8c..6de4c7cc6 100644 --- a/src/backend/v3/config/settings.py +++ b/src/backend/v3/config/settings.py @@ -15,7 +15,7 @@ from semantic_kernel.agents.orchestration.magentic import MagenticOrchestration from semantic_kernel.connectors.ai.open_ai import ( AzureChatCompletion, OpenAIChatPromptExecutionSettings) -from v3.models.messages import WebsocketMessageType +from v3.models.messages import WebsocketMessageType, MPlan logger = logging.getLogger(__name__) @@ -82,7 +82,7 @@ def __init__(self): self.orchestrations: Dict[str, MagenticOrchestration] = ( {} ) # user_id -> orchestration instance - self.plans: Dict[str, any] = {} # plan_id -> plan details + self.plans: Dict[str, MPlan] = {} # plan_id -> plan details self.approvals: Dict[str, bool] = {} # m_plan_id -> approval status self.sockets: Dict[str, WebSocket] = {} # user_id -> WebSocket self.clarifications: Dict[str, str] = {} # m_plan_id -> clarification response diff --git a/src/backend/v3/models/messages.py b/src/backend/v3/models/messages.py index c7bb7d052..c8ca79e92 100644 --- a/src/backend/v3/models/messages.py +++ b/src/backend/v3/models/messages.py @@ -6,6 +6,7 @@ from typing import Any, Dict, List, Literal, Optional import time from semantic_kernel.kernel_pydantic import Field, KernelBaseModel +from src.backend.common.models.messages_kernel import AgentMessageType from v3.models.models import MPlan, PlanStatus @@ -136,6 +137,16 @@ class ApprovalRequest(KernelBaseModel): action: str agent_name: str +@dataclass(slots=True) +class AgentMessageResponse: + """Response message representing an agent's message.""" + plan_id: str + agent: str + content: str + agent_type: AgentMessageType + is_final: bool = False + raw_data: str = None + class WebsocketMessageType(str, Enum): """Types of WebSocket messages.""" diff --git a/src/backend/v3/models/models.py b/src/backend/v3/models/models.py index 19a2b9f33..32b7fc68e 100644 --- a/src/backend/v3/models/models.py +++ b/src/backend/v3/models/models.py @@ -24,7 +24,6 @@ class MStep(BaseModel): class MPlan(BaseModel): """model of a plan""" id: str = Field(default_factory=lambda: str(uuid.uuid4())) - data_type: Literal[DataType.m_plan] = Field(DataType.m_plan, Literal=True) user_id: str = "" team_id: str = "" plan_id: str = "" diff --git a/src/frontend/src/api/apiService.tsx b/src/frontend/src/api/apiService.tsx index 91eeb124e..e28f11840 100644 --- a/src/frontend/src/api/apiService.tsx +++ b/src/frontend/src/api/apiService.tsx @@ -4,14 +4,18 @@ import { HumanClarification, InputTask, InputTaskResponse, - PlanWithSteps, Plan, - Step, StepStatus, AgentType, - PlanMessage, PlanApprovalRequest, - PlanApprovalResponse + PlanApprovalResponse, + AgentMessageData, + MPlanData, + AgentMessageBE, + MPlanBE, + TeamConfigurationBE, + PlanFromAPI, + AgentMessageResponse } from '../models'; // Constants for endpoints @@ -21,8 +25,8 @@ const API_ENDPOINTS = { PLAN: '/v3/plan', PLAN_APPROVAL: '/v3/plan_approval', HUMAN_CLARIFICATION: '/v3/user_clarification', - USER_BROWSER_LANGUAGE: '/user_browser_language' - + USER_BROWSER_LANGUAGE: '/user_browser_language', + AGENT_MESSAGE: '/v3/agent_message', }; // Simple cache implementation @@ -121,7 +125,7 @@ export class APIService { * @param useCache Whether to use cached data or force fresh fetch * @returns Promise with array of plans with their steps */ - async getPlans(sessionId?: string, useCache = true): Promise { + async getPlans(sessionId?: string, useCache = true): Promise { const cacheKey = `plans_${sessionId || 'all'}`; const params = sessionId ? { session_id: sessionId } : {}; // TODO replace session for team_id @@ -146,7 +150,7 @@ export class APIService { * @param useCache Whether to use cached data or force fresh fetch * @returns Promise with the plan and its steps */ - async getPlanById(planId: string, useCache = true): Promise<{ plan_with_steps: PlanWithSteps; messages: PlanMessage[] }> { + async getPlanById(planId: string, useCache = true): Promise { const cacheKey = `plan_by_id_${planId}`; const params = { plan_id: planId }; @@ -157,17 +161,21 @@ export class APIService { if (!data) { throw new Error(`Plan with ID ${planId} not found`); } - - const plan = data[0] as PlanWithSteps; - const messages = data[1] || []; + console.log('Fetched plan by ID:', data); + const results = { + plan: data.plan as Plan, + messages: data.messages as AgentMessageBE[], + m_plan: data.m_plan as MPlanBE | null, + team: data.team as TeamConfigurationBE | null + } as PlanFromAPI; if (useCache) { - this._cache.set(cacheKey, { plan_with_steps: plan, messages }, 30000); // Cache for 30 seconds + this._cache.set(cacheKey, results, 30000); // Cache for 30 seconds } - return { plan_with_steps: plan, messages }; + return results; }; if (useCache) { - const cachedPlan = this._cache.get<{ plan_with_steps: PlanWithSteps; messages: PlanMessage[] }>(cacheKey); + const cachedPlan = this._cache.get(cacheKey); if (cachedPlan) return cachedPlan; return this._requestTracker.trackRequest(cacheKey, fetcher); @@ -176,43 +184,6 @@ export class APIService { return fetcher(); } - /** - * Get a specific plan with its steps - * @param sessionId Session ID - * @param planId Plan ID - * @param useCache Whether to use cached data or force fresh fetch - * @returns Promise with the plan and its steps - */ - async getPlanWithSteps(sessionId: string, planId: string, useCache = true): Promise { - const cacheKey = `plan_${sessionId}_${planId}`; - - if (useCache) { - const cachedPlan = this._cache.get(cacheKey); - if (cachedPlan) return cachedPlan; - } - - const fetcher = async () => { - const plans = await this.getPlans(sessionId, useCache); - const plan = plans.find(p => p.id === planId); - - if (!plan) { - throw new Error(`Plan with ID ${planId} not found`); - } - - if (useCache) { - this._cache.set(cacheKey, plan, 30000); // Cache for 30 seconds - } - - return plan; - }; - - if (useCache) { - return this._requestTracker.trackRequest(cacheKey, fetcher); - } - - return fetcher(); - } - /** * Approve a plan for execution @@ -238,49 +209,6 @@ export class APIService { }); } - /** - * Get final plan execution results after approval - * @param planId - * @param useCache - * @returns Promise with final plan execution data - */ - async getFinalPlanResults(planId: string, useCache = true): Promise<{ plan_with_steps: PlanWithSteps; messages: PlanMessage[] }> { - const cacheKey = `final_plan_results_${planId}`; - - const fetcher = async () => { - console.log('📤 Fetching final plan results for plan_id:', planId); - - // ✅ Call /api/plans?plan_id={plan_id} to get executed plan - const data = await apiClient.get(API_ENDPOINTS.PLANS, { - params: { plan_id: planId } - }); - - if (!data || !Array.isArray(data) || data.length === 0) { - throw new Error(`No plan results found for plan_id: ${planId}`); - } - - const plan = data[0] as PlanWithSteps; - const messages = data[1] || []; - - if (useCache) { - this._cache.set(cacheKey, { plan_with_steps: plan, messages }, 30000); - } - - console.log('✅ Final plan results received:', { plan, messages }); - return { plan_with_steps: plan, messages }; - }; - - if (useCache) { - const cachedData = this._cache.get<{ plan_with_steps: PlanWithSteps; messages: PlanMessage[] }>(cacheKey); - if (cachedData) return cachedData; - - return this._requestTracker.trackRequest(cacheKey, fetcher); - } - - return fetcher(); - } - - /** * Submit clarification for a plan @@ -335,6 +263,16 @@ export class APIService { }); return response; } + async sendAgentMessage(data: AgentMessageResponse): Promise { + const t0 = performance.now(); + const result = await apiClient.post(API_ENDPOINTS.AGENT_MESSAGE, data); + console.log('[agent_message] sent', { + ms: +(performance.now() - t0).toFixed(1), + agent: data.agent, + type: data.agent_type + }); + return result; + } } // Export a singleton instance diff --git a/src/frontend/src/components/content/PlanPanelLeft.tsx b/src/frontend/src/components/content/PlanPanelLeft.tsx index 3cb75676f..e7447b7eb 100644 --- a/src/frontend/src/components/content/PlanPanelLeft.tsx +++ b/src/frontend/src/components/content/PlanPanelLeft.tsx @@ -19,7 +19,7 @@ import { import TaskList from "./TaskList"; import { useCallback, useEffect, useState } from "react"; import { useNavigate, useParams } from "react-router-dom"; -import { PlanPanelLefProps, PlanWithSteps, Task, UserInfo } from "@/models"; +import { Plan, PlanPanelLefProps, Task, UserInfo } from "@/models"; import { apiService } from "@/api"; import { TaskService } from "@/services"; import MsftColor from "@/coral/imports/MsftColor"; @@ -47,7 +47,7 @@ const PlanPanelLeft: React.FC = ({ const [inProgressTasks, setInProgressTasks] = useState([]); const [completedTasks, setCompletedTasks] = useState([]); - const [plans, setPlans] = useState(null); + const [plans, setPlans] = useState(null); const [plansLoading, setPlansLoading] = useState(false); const [plansError, setPlansError] = useState(null); const [userInfo, setUserInfo] = useState( @@ -85,7 +85,8 @@ const PlanPanelLeft: React.FC = ({ useEffect(() => { loadPlansData(); - }, [loadPlansData]); + setUserInfo(getUserInfoGlobal()); + }, [loadPlansData, setUserInfo]); useEffect(() => { if (plans) { @@ -118,7 +119,7 @@ const PlanPanelLeft: React.FC = ({ const handleTaskSelect = useCallback( (taskId: string) => { const selectedPlan = plans?.find( - (plan: PlanWithSteps) => plan.session_id === taskId + (plan: Plan) => plan.session_id === taskId ); if (selectedPlan) { navigate(`/plan/${selectedPlan.id}`); diff --git a/src/frontend/src/components/content/streaming/StreamingBufferMessage.tsx b/src/frontend/src/components/content/streaming/StreamingBufferMessage.tsx index 8e888d277..13a66215f 100644 --- a/src/frontend/src/components/content/streaming/StreamingBufferMessage.tsx +++ b/src/frontend/src/components/content/streaming/StreamingBufferMessage.tsx @@ -8,7 +8,7 @@ import remarkGfm from "remark-gfm"; import rehypePrism from "rehype-prism"; const renderBufferMessage = (streamingMessageBuffer: string) => { - const [isExpanded, setIsExpanded] = useState(false); + const [isExpanded, setIsExpanded] = useState(false); if (!streamingMessageBuffer || streamingMessageBuffer.trim() === "") return null; @@ -98,23 +98,23 @@ const renderBufferMessage = (streamingMessageBuffer: string) => { ( - { - e.currentTarget.style.textDecoration = 'underline'; - }} - onMouseLeave={(e) => { - e.currentTarget.style.textDecoration = 'none'; - }} - /> - ) - }} + components={{ + a: ({ node, ...props }) => ( + { + e.currentTarget.style.textDecoration = 'underline'; + }} + onMouseLeave={(e) => { + e.currentTarget.style.textDecoration = 'none'; + }} + /> + ) + }} > {previewText} diff --git a/src/frontend/src/models/agentMessage.tsx b/src/frontend/src/models/agentMessage.tsx index ec933db11..777082c27 100644 --- a/src/frontend/src/models/agentMessage.tsx +++ b/src/frontend/src/models/agentMessage.tsx @@ -1,6 +1,6 @@ import { Agent } from 'http'; import { BaseModel } from './plan'; -import { AgentMessageType, AgentType } from './enums'; +import { AgentMessageType, AgentType, WebsocketMessageType } from './enums'; /** * Represents a message from an agent @@ -24,8 +24,44 @@ export interface AgentMessageData { agent: string; agent_type: AgentMessageType; timestamp: number; - steps: any[]; // intentionally always empty - next_steps: []; // intentionally always empty + steps: any[]; + next_steps: any[]; content: string; raw_data: string; } + +/** + * Message sent to HumanAgent to request approval for a step. + * Corresponds to the Python AgentMessageResponse class. + */ +export interface AgentMessageResponse { + + /** Plan identifier */ + plan_id: string; + /** Agent name or identifier */ + agent: string; + /** Message content */ + content: string; + /** Type of agent (Human or AI) */ + agent_type: AgentMessageType; + is_final: boolean; + /** Raw data associated with the message */ + raw_data: string; + +} + +export interface FinalMessage { + type: WebsocketMessageType; + content: string; + status: string; + timestamp: number | null; + raw_data: any; +} + +export interface StreamingMessage { + type: WebsocketMessageType; + agent: string; + content: string; + is_final: boolean; + raw_data: any; +} \ No newline at end of file diff --git a/src/frontend/src/models/enums.tsx b/src/frontend/src/models/enums.tsx index cc5c2cfd5..9c5be061f 100644 --- a/src/frontend/src/models/enums.tsx +++ b/src/frontend/src/models/enums.tsx @@ -222,9 +222,12 @@ export enum StepStatus { * Enumeration of possible statuses for a plan. */ export enum PlanStatus { + CREATED = "created", IN_PROGRESS = "in_progress", COMPLETED = "completed", - FAILED = "failed" + FAILED = "failed", + CANCELED = "canceled", + APPROVED = "approved" } /** diff --git a/src/frontend/src/models/messages.tsx b/src/frontend/src/models/messages.tsx index 3c0a2c7e1..3dc7859de 100644 --- a/src/frontend/src/models/messages.tsx +++ b/src/frontend/src/models/messages.tsx @@ -1,29 +1,7 @@ import { AgentType, StepStatus, PlanStatus, WebsocketMessageType } from './enums'; import { MPlanData } from './plan'; -/** - * Message roles compatible with Semantic Kernel - * Currently unused but kept for potential future use - */ -// export enum MessageRole { -// SYSTEM = "system", -// USER = "user", -// ASSISTANT = "assistant", -// FUNCTION = "function" -// } -/** - * Base class for generic chat messages with roles - * Currently unused but kept for potential future use with Semantic Kernel integration - */ -// export interface GenericChatMessage { -// /** Role of the message sender */ -// role: MessageRole; -// /** Content of the message */ -// content: string; -// /** Additional metadata */ -// metadata: Record; -// } /** * Message sent to request approval for a step diff --git a/src/frontend/src/models/plan.tsx b/src/frontend/src/models/plan.tsx index a14598c23..fe87df163 100644 --- a/src/frontend/src/models/plan.tsx +++ b/src/frontend/src/models/plan.tsx @@ -1,5 +1,7 @@ -import { AgentType, PlanStatus, StepStatus, HumanFeedbackStatus } from './enums'; +import { AgentMessageData } from './agentMessage'; +import { PlanStatus, AgentMessageType } from './enums'; import { StreamingPlanUpdate } from './messages'; +import { TeamConfig } from './Team'; /** * Base interface with common fields @@ -8,126 +10,184 @@ export interface BaseModel { /** Unique identifier */ id: string; /** Timestamp when created */ - + session_id: string; /** Timestamp when last updated */ timestamp: string; } +// these entries as they are comming from db +export interface TeamAgentBE { + /** Input key for the agent */ + input_key: string; + /** Type of the agent */ + type: string; + /** Name of the agent */ + name: string; + /** Deployment name for the agent */ + deployment_name: string; + /** System message for the agent */ + system_message?: string; + /** Description of the agent */ + description?: string; + /** Icon for the agent */ + icon?: string; + /** Index name for RAG capabilities */ + index_name?: string; + /** Whether the agent uses RAG */ + use_rag?: boolean; + /** Whether the agent uses MCP (Model Context Protocol) */ + use_mcp?: boolean; + /** Whether the agent uses Bing search */ + use_bing?: boolean; + /** Whether the agent uses reasoning */ + use_reasoning?: boolean; + /** Whether the agent has coding tools */ + coding_tools?: boolean; +} + +/** + * Represents a starting task for a team. + */ +export interface StartingTaskBE { + /** Unique identifier for the task */ + id: string; + /** Name of the task */ + name: string; + /** Prompt for the task */ + prompt: string; + /** Creation timestamp */ + created: string; + /** Creator of the task */ + creator: string; + /** Logo for the task */ + logo: string; +} + +/** + * Represents a team configuration stored in the database. + */ +export interface TeamConfigurationBE extends BaseModel { + /** The type of data model */ + data_type: "team_config"; + /** Team identifier */ + team_id: string; + /** Name of the team */ + name: string; + /** Status of the team */ + status: string; + /** Creation timestamp */ + created: string; + /** Creator of the team */ + created_by: string; + /** List of agents in the team */ + agents: TeamAgentBE[]; + /** Description of the team */ + description?: string; + /** Logo for the team */ + logo?: string; + /** Plan for the team */ + plan?: string; + /** Starting tasks for the team */ + starting_tasks: StartingTaskBE[]; + /** User who uploaded this configuration */ + user_id: string; +} + /** * Represents a plan containing multiple steps. */ export interface Plan extends BaseModel { /** The type of data model */ data_type: "plan"; - /** Session identifier */ - session_id: string; + /** Plan identifier */ + plan_id: string; /** User identifier */ user_id: string; - /** Plan title */ + /** Initial goal/title of the plan */ initial_goal: string; - /** Current status of the plan */ overall_status: PlanStatus; + /** Whether the plan is approved */ + approved?: boolean; + /** Source of the plan (typically the planner agent) */ + source?: string; + /** Summary of the plan */ + summary?: string; + /** Team identifier associated with the plan */ + team_id?: string; /** Human clarification request text */ human_clarification_request?: string; /** Human clarification response text */ human_clarification_response?: string; } -/** - * Represents an individual step (task) within a plan. - */ -export interface Step extends BaseModel { +export interface MStepBE { + /** Agent responsible for the step */ + agent: string; + /** Action to be performed */ + action: string; +} + +export interface MPlanBE extends BaseModel { /** The type of data model */ - data_type: "step"; - /** Session identifier */ - session_id: string; + data_type: "m_plan"; /** User identifier */ user_id: string; - /** Plan identifier this step belongs to */ + /** Team identifier */ + team_id: string; + /** Associated plan identifier */ plan_id: string; - /** Step title */ - title: string; - /** Step description */ - description: string; - /** Agent responsible for this step */ - agent: AgentType; - /** Current status of the step */ - status: StepStatus; - /** Human feedback status */ - human_feedback_status: HumanFeedbackStatus; - /** Human feedback text */ - human_feedback?: string; - /** Step order/position in the plan */ - step_order: number; + /** Overall status of the plan */ + overall_status: PlanStatus; + /** User's original request */ + user_request: string; + /** List of team member names */ + team: string[]; + /** Facts or context for the plan */ + facts: string; + /** List of steps in the plan */ + steps: MStepBE[]; } - -export interface PlanMessage extends BaseModel { +export interface AgentMessageBE extends BaseModel { /** The type of data model */ - data_type: "agent_message"; - /** Session identifier */ - session_id: string; - /** User identifier */ - user_id: string; + data_type: "m_plan_message"; /** Plan identifier */ plan_id: string; + /** User identifier */ + user_id: string; + /** Agent name or identifier */ + agent: string; + /** Associated m_plan identifier */ + m_plan_id?: string; + /** Type of agent (Human or AI) */ + agent_type: AgentMessageType; /** Message content */ content: string; - /** Source of the message */ - source: string; - /** Step identifier */ - step_id: string; - /** Whether this is a streaming message */ - streaming?: boolean; - /** Status of the streaming message */ - status?: string; - /** Type of message (thinking, action, etc.) */ - message_type?: string; + /** Raw data associated with the message */ + raw_data: string; + /** Steps associated with the message */ + steps: any[]; + /** Next steps associated with the message */ + next_steps: any[]; } -/** - * Union type for chat messages - can be either a regular plan message or a temporary streaming message - */ -export type ChatMessage = PlanMessage | { source: string; content: string; timestamp: string; streaming?: boolean; status?: string; message_type?: string; }; - -/** - * Represents a plan that includes its associated steps. - */ -export interface PlanWithSteps extends Plan { - /** Steps associated with this plan */ - steps: Step[]; - /** Total number of steps */ - total_steps: number; - /** Count of steps in planned status */ - planned: number; - /** Count of steps awaiting feedback */ - awaiting_feedback: number; - /** Count of steps approved */ - approved: number; - /** Count of steps rejected */ - rejected: number; - /** Count of steps with action requested */ - action_requested: number; - /** Count of steps completed */ - completed: number; - /** Count of steps failed */ - failed: number; +export interface PlanFromAPI { + plan: Plan; + messages: AgentMessageBE[]; + m_plan: MPlanBE | null; + team: TeamConfigurationBE | null; } - /** * Interface for processed plan data */ export interface ProcessedPlanData { - plan: PlanWithSteps; - agents: AgentType[]; - steps: Step[]; - hasClarificationRequest: boolean; - hasClarificationResponse: boolean; - enableChat: boolean; - enableStepButtons: boolean; - messages: PlanMessage[]; + plan: Plan; + team: TeamConfig | null; + messages: AgentMessageData[]; + mplan: MPlanData | null; } + export interface PlanChatProps { planData: ProcessedPlanData; input: string; diff --git a/src/frontend/src/models/taskDetails.tsx b/src/frontend/src/models/taskDetails.tsx index b47dc41d1..e68ef4ca5 100644 --- a/src/frontend/src/models/taskDetails.tsx +++ b/src/frontend/src/models/taskDetails.tsx @@ -1,4 +1,4 @@ -import { MPlanData, ProcessedPlanData, Step } from "./plan"; +import { MPlanData, ProcessedPlanData } from "./plan"; export interface SubTask { id: string; diff --git a/src/frontend/src/models/taskList.tsx b/src/frontend/src/models/taskList.tsx index ad66181b0..f9ff310f4 100644 --- a/src/frontend/src/models/taskList.tsx +++ b/src/frontend/src/models/taskList.tsx @@ -1,10 +1,8 @@ export interface Task { id: string; name: string; - status: 'inprogress' | 'completed'; + status: string; date?: string; - completed_steps?: number; - total_steps?: number; } export interface TaskListProps { diff --git a/src/frontend/src/pages/PlanPage.tsx b/src/frontend/src/pages/PlanPage.tsx index 5d88755c9..59b6a3302 100644 --- a/src/frontend/src/pages/PlanPage.tsx +++ b/src/frontend/src/pages/PlanPage.tsx @@ -2,7 +2,7 @@ import React, { useCallback, useEffect, useRef, useState, useMemo } from "react" import { useParams, useNavigate } from "react-router-dom"; import { Spinner, Text } from "@fluentui/react-components"; import { PlanDataService } from "../services/PlanDataService"; -import { ProcessedPlanData, PlanWithSteps, WebsocketMessageType, MPlanData, AgentMessageData, AgentMessageType, ParsedUserClarification, AgentType } from "../models"; +import { ProcessedPlanData, WebsocketMessageType, MPlanData, AgentMessageData, AgentMessageType, ParsedUserClarification, AgentType, PlanStatus, FinalMessage } from "../models"; import PlanChat from "../components/content/PlanChat"; import PlanPanelRight from "../components/content/PlanPanelRight"; import PlanPanelLeft from "../components/content/PlanPanelLeft"; @@ -17,9 +17,6 @@ import Octo from "../coral/imports/Octopus.png"; import PanelRightToggles from "../coral/components/Header/PanelRightToggles"; import { TaskListSquareLtr } from "../coral/imports/bundleicons"; import LoadingMessage, { loadingMessages } from "../coral/components/LoadingMessage"; -import { RAIErrorCard, RAIErrorData } from "../components/errors"; -import { TeamConfig } from "../models/Team"; -import { TeamService } from "../services/TeamService"; import webSocketService from "../services/WebSocketService"; import { APIService } from "../api/apiService"; import { StreamMessage, StreamingPlanUpdate } from "../models"; @@ -38,37 +35,60 @@ const PlanPage: React.FC = () => { const navigate = useNavigate(); const { showToast, dismissToast } = useInlineToaster(); const messagesContainerRef = useRef(null); - const [input, setInput] = useState(""); + const [input, setInput] = useState(""); const [planData, setPlanData] = useState(null); - const [allPlans, setAllPlans] = useState([]); const [loading, setLoading] = useState(true); const [submittingChatDisableInput, setSubmittingChatDisableInput] = useState(true); - const [error, setError] = useState(null); + const [errorLoading, setErrorLoading] = useState(false); const [clarificationMessage, setClarificationMessage] = useState(null); - const [processingApproval, setProcessingApproval] = useState(false); + const [processingApproval, setProcessingApproval] = useState(false); const [planApprovalRequest, setPlanApprovalRequest] = useState(null); - const [reloadLeftList, setReloadLeftList] = useState(true); - const [waitingForPlan, setWaitingForPlan] = useState(true); + const [reloadLeftList, setReloadLeftList] = useState(true); + const [waitingForPlan, setWaitingForPlan] = useState(true); const [showProcessingPlanSpinner, setShowProcessingPlanSpinner] = useState(false); const [showApprovalButtons, setShowApprovalButtons] = useState(true); + const [continueWithWebsocketFlow, setContinueWithWebsocketFlow] = useState(true); // WebSocket connection state - const [wsConnected, setWsConnected] = useState(false); + const [wsConnected, setWsConnected] = useState(false); const [streamingMessages, setStreamingMessages] = useState([]); const [streamingMessageBuffer, setStreamingMessageBuffer] = useState(""); - const [agentMessages, setAgentMessages] = useState([]); - // Team config state - const [teamConfig, setTeamConfig] = useState(null); - const [loadingTeamConfig, setLoadingTeamConfig] = useState(true); // Plan approval state - track when plan is approved - const [planApproved, setPlanApproved] = useState(false); + const [planApproved, setPlanApproved] = useState(false); const [loadingMessage, setLoadingMessage] = useState(loadingMessages[0]); - // Use ref to store the function to avoid stale closure issues - const loadPlanDataRef = useRef<() => Promise>(); + + + const processAgentMessage = useCallback((agentMessageData: AgentMessageData, planData: ProcessedPlanData, is_final: boolean = false) => { + + // Persist / forward to backend (fire-and-forget with logging) + console.log(planData) + console.log(is_final) + console.log(agentMessageData) + const agentMessageResponse = PlanDataService.createAgentMessageResponse(agentMessageData, planData, is_final); + console.log('📤 Persisting agent message:', agentMessageResponse); + void apiService.sendAgentMessage(agentMessageResponse) + .then(saved => { + console.log('[agent_message][persisted]', { + agent: agentMessageData.agent, + type: agentMessageData.agent_type, + ts: agentMessageData.timestamp + }); + }) + .catch(err => { + console.warn('[agent_message][persist-failed]', err); + }); + + }, []); + + const resetPlanVariables = useCallback(() => { + + + }, []); + // Auto-scroll helper const scrollToBottom = useCallback(() => { setTimeout(() => { @@ -114,7 +134,6 @@ const PlanPage: React.FC = () => { setPlanApprovalRequest(mPlanData); setWaitingForPlan(false); setShowProcessingPlanSpinner(false); - // onPlanReceived?.(mPlanData); scrollToBottom(); } else { console.error('❌ Failed to parse plan data', approvalRequest); @@ -122,25 +141,31 @@ const PlanPage: React.FC = () => { }); return () => unsubscribe(); - }, [scrollToBottom]); //onPlanReceived, scrollToBottom + }, [scrollToBottom]); //(WebsocketMessageType.AGENT_MESSAGE_STREAMING useEffect(() => { const unsubscribe = webSocketService.on(WebsocketMessageType.AGENT_MESSAGE_STREAMING, (streamingMessage: any) => { - // console.log('📋 Streaming Message', streamingMessage); + //console.log('📋 Streaming Message', streamingMessage); // if is final true clear buffer and add final message to agent messages - setStreamingMessageBuffer(prev => prev + streamingMessage.data.content); - scrollToBottom(); + const line = PlanDataService.simplifyHumanClarification(streamingMessage.data.content); + setStreamingMessageBuffer(prev => prev + line); + //scrollToBottom(); }); return () => unsubscribe(); - }, [scrollToBottom]); //onPlanReceived, scrollToBottom + }, [scrollToBottom]); //WebsocketMessageType.USER_CLARIFICATION_REQUEST useEffect(() => { const unsubscribe = webSocketService.on(WebsocketMessageType.USER_CLARIFICATION_REQUEST, (clarificationMessage: any) => { console.log('📋 Clarification Message', clarificationMessage); + console.log('📋 Current plan data User clarification', planData); + if (!clarificationMessage) { + console.warn('⚠️ clarification message missing data:', clarificationMessage); + return; + } const agentMessageData = { agent: AgentType.GROUP_CHAT_MANAGER, agent_type: AgentMessageType.AI_AGENT, @@ -157,16 +182,18 @@ const PlanPage: React.FC = () => { setShowProcessingPlanSpinner(false); setSubmittingChatDisableInput(false); scrollToBottom(); + // Persist the agent message + processAgentMessage(agentMessageData, planData); }); return () => unsubscribe(); - }, [scrollToBottom]); + }, [scrollToBottom, planData, processAgentMessage]); //WebsocketMessageType.AGENT_TOOL_MESSAGE useEffect(() => { const unsubscribe = webSocketService.on(WebsocketMessageType.AGENT_TOOL_MESSAGE, (toolMessage: any) => { console.log('📋 Tool Message', toolMessage); - scrollToBottom(); + // scrollToBottom() }); @@ -178,39 +205,63 @@ const PlanPage: React.FC = () => { useEffect(() => { const unsubscribe = webSocketService.on(WebsocketMessageType.FINAL_RESULT_MESSAGE, (finalMessage: any) => { console.log('📋 Final Result Message', finalMessage); + if (!finalMessage) { + + console.warn('⚠️ Final result message missing data:', finalMessage); + return; + } const agentMessageData = { agent: AgentType.GROUP_CHAT_MANAGER, agent_type: AgentMessageType.AI_AGENT, timestamp: Date.now(), steps: [], // intentionally always empty next_steps: [], // intentionally always empty - content: finalMessage.data.content || '', - raw_data: finalMessage.data || '', + content: "🎉🎉 " + (finalMessage.data?.content || ''), + raw_data: finalMessage || '', } as AgentMessageData; + + console.log('✅ Parsed final result message:', agentMessageData); - setStreamingMessageBuffer(""); - setShowProcessingPlanSpinner(false); - setAgentMessages(prev => [...prev, agentMessageData]); - scrollToBottom(); + // we ignore the terminated message + if (finalMessage?.data?.status === PlanStatus.COMPLETED) { + setStreamingMessageBuffer(""); + setShowProcessingPlanSpinner(false); + setAgentMessages(prev => [...prev, agentMessageData]); + scrollToBottom(); + // Persist the agent message + const is_final = true; + if (planData?.plan) { + planData.plan.overall_status = PlanStatus.COMPLETED; + setPlanData({ ...planData }); + } + + processAgentMessage(agentMessageData, planData, is_final); + } + }); return () => unsubscribe(); - }, [scrollToBottom]); - + }, [scrollToBottom, planData, processAgentMessage]); //WebsocketMessageType.AGENT_MESSAGE useEffect(() => { const unsubscribe = webSocketService.on(WebsocketMessageType.AGENT_MESSAGE, (agentMessage: any) => { - console.log('📋 Agent Message', agentMessage); + console.log('📋 Agent Message', agentMessage) + console.log('📋 Current plan data', planData); const agentMessageData = agentMessage.data as AgentMessageData; - setAgentMessages(prev => [...prev, agentMessageData]); - setShowProcessingPlanSpinner(true); - scrollToBottom(); + if (agentMessageData) { + agentMessageData.content = PlanDataService.simplifyHumanClarification(agentMessageData?.content); + setAgentMessages(prev => [...prev, agentMessageData]); + setShowProcessingPlanSpinner(true); + scrollToBottom(); + processAgentMessage(agentMessageData, planData); + } + }); return () => unsubscribe(); - }, [scrollToBottom]); //onPlanReceived, scrollToBottom + }, [scrollToBottom, planData, processAgentMessage]); //onPlanReceived, scrollToBottom // Loading message rotation effect useEffect(() => { @@ -227,7 +278,7 @@ const PlanPage: React.FC = () => { // WebSocket connection with proper error handling and v3 backend compatibility useEffect(() => { - if (planId && !loading) { + if (planId && continueWithWebsocketFlow) { console.log('🔌 Connecting WebSocket:', { planId }); const connectWebSocket = async () => { @@ -288,69 +339,38 @@ const PlanPage: React.FC = () => { } }, [planId, loading]); - useEffect(() => { - - const loadTeamConfig = async () => { - try { - setLoadingTeamConfig(true); - const teams = await TeamService.getUserTeams(); - // Get the first team as default config, or you can implement logic to get current team - const config = teams.length > 0 ? teams[0] : null; - setTeamConfig(config); - } catch (error) { - console.error('Failed to load team config:', error); - // Don't show error for team config loading - it's optional - } finally { - setLoadingTeamConfig(false); - } - }; - - loadTeamConfig(); - }, []); - - // Helper function to convert PlanWithSteps to ProcessedPlanData - const convertToProcessedPlanData = (planWithSteps: PlanWithSteps): ProcessedPlanData => { - return PlanDataService.processPlanData(planWithSteps, []); - }; - // Create loadPlanData function with useCallback to memoize it const loadPlanData = useCallback( - async (useCache = true): Promise => { - if (!planId) return []; + async (useCache = true): Promise => { + if (!planId) return null; setLoading(true); - setError(null); - try { - let actualPlanId = planId; + let planResult: ProcessedPlanData | null = null; + console.log("Fetching plan with ID:", planId); + planResult = await PlanDataService.fetchPlanData(planId, useCache); + console.log("Plan data fetched:", planResult); + if (planResult?.plan?.overall_status === PlanStatus.IN_PROGRESS) { + setShowApprovalButtons(true); - if (actualPlanId && !planResult) { - console.log("Fetching plan with ID:", actualPlanId); - planResult = await PlanDataService.fetchPlanData(actualPlanId, useCache); - console.log("Plan data loaded successfully"); + } else { + setWaitingForPlan(false); } - - const allPlansWithSteps = await apiService.getPlans(); - const allPlansData = allPlansWithSteps.map(convertToProcessedPlanData); - setAllPlans(allPlansData); - - if (planResult?.plan?.id && planResult.plan.id !== actualPlanId) { - console.log('Plan ID mismatch detected, redirecting...', { - requested: actualPlanId, - actual: planResult.plan.id - }); - navigate(`/plan/${planResult.plan.id}`, { replace: true }); + if (planResult?.plan?.overall_status !== PlanStatus.COMPLETED) { + setContinueWithWebsocketFlow(true); + } + if (planResult?.messages) { + setAgentMessages(planResult.messages); + } + if (planResult?.mplan) { + setPlanApprovalRequest(planResult.mplan); } - setPlanData(planResult); - return allPlansData; + return planResult; } catch (err) { console.log("Failed to load plan data:", err); - setError( - err instanceof Error ? err : new Error("Failed to load plan data") - ); - return []; + return null; } finally { setLoading(false); } @@ -358,10 +378,6 @@ const PlanPage: React.FC = () => { [planId, navigate] ); - // Update the ref whenever loadPlanData changes - useEffect(() => { - loadPlanDataRef.current = loadPlanData; - }, [loadPlanData]); // Handle plan approval const handleApprovePlan = useCallback(async () => { @@ -418,6 +434,7 @@ const PlanPage: React.FC = () => { } }, [planApprovalRequest, planData, navigate, setProcessingApproval]); // Chat submission handler - updated for v3 backend compatibility + const handleOnchatSubmit = useCallback( async (chatInput: string) => { if (!chatInput.trim()) { @@ -445,7 +462,7 @@ const PlanPage: React.FC = () => { showToast("Clarification submitted successfully", "success"); const agentMessageData = { - agent: 'You', + agent: 'human', agent_type: AgentMessageType.HUMAN_AGENT, timestamp: Date.now(), steps: [], // intentionally always empty @@ -482,26 +499,29 @@ const PlanPage: React.FC = () => { }, [navigate]); - const resetReload = useCallback(() => { setReloadLeftList(false); }, []); useEffect(() => { const initializePlanLoading = async () => { - if (!planId) return; + if (!planId) { + resetPlanVariables(); + setErrorLoading(true); + return; + } try { - await loadPlanData(true); + await loadPlanData(false); } catch (err) { console.error("Failed to initialize plan loading:", err); } }; initializePlanLoading(); - }, [planId, loadPlanData]); + }, [planId, loadPlanData, resetPlanVariables, setErrorLoading]); - if (error) { + if (errorLoading) { return ( @@ -512,7 +532,7 @@ const PlanPage: React.FC = () => { color: 'var(--colorNeutralForeground2)' }}> - {error.message || "An error occurred while loading the plan"} + {"An error occurred while loading the plan"} @@ -532,7 +552,7 @@ const PlanPage: React.FC = () => { onTeamSelect={() => { }} onTeamUpload={async () => { }} isHomePage={false} - selectedTeam={teamConfig} + selectedTeam={null} /> diff --git a/src/frontend/src/services/PlanDataService.tsx b/src/frontend/src/services/PlanDataService.tsx index dff5c2ba9..021ce2842 100644 --- a/src/frontend/src/services/PlanDataService.tsx +++ b/src/frontend/src/services/PlanDataService.tsx @@ -1,14 +1,26 @@ import { - PlanWithSteps, - Step, + AgentType, ProcessedPlanData, - PlanMessage, MPlanData, StepStatus, WebsocketMessageType, ParsedUserClarification, - AgentMessageType + AgentMessageType, + PlanFromAPI, + AgentMessageData, + AgentMessageBE, + StartingTaskBE, + StartingTask, + TeamAgentBE, + Agent, + TeamConfig, + TeamConfigurationBE, + MPlanBE, + MStepBE, + AgentMessageResponse, + FinalMessage, + StreamingMessage } from "@/models"; import { apiService } from "@/api"; @@ -28,10 +40,7 @@ export class PlanDataService { try { // Use optimized getPlanById method for better performance const planBody = await apiService.getPlanById(planId, useCache); - return this.processPlanData( - planBody.plan_with_steps, - planBody.messages || [] - ); + return this.processPlanData(planBody); } catch (error) { console.log("Failed to fetch plan data:", error); throw error; @@ -43,97 +52,162 @@ export class PlanDataService { * @param plan PlanWithSteps object to process * @returns Processed plan data */ - static processPlanData( - plan: PlanWithSteps, - messages: PlanMessage[] - ): ProcessedPlanData { - // Extract unique agents from steps - - const uniqueAgents = new Set(); - if (plan.steps && plan.steps.length > 0) { - plan.steps.forEach((step) => { - if (step.agent) { - uniqueAgents.add(step.agent); - } - }); + /** + * Converts AgentMessageBE array to AgentMessageData array + * @param messages - Array of AgentMessageBE from backend + * @returns Array of AgentMessageData or empty array if input is null/empty + */ + static convertAgentMessages(messages: AgentMessageBE[]): AgentMessageData[] { + if (!messages || messages.length === 0) { + return []; } - // Convert Set to Array for easier handling - const agents = Array.from(uniqueAgents); - - // Get all steps - const steps = plan.steps ?? []; - - // Check if human_clarification_request is not null - const hasClarificationRequest = - plan.human_clarification_request != null && - plan.human_clarification_request.trim().length > 0; - const hasClarificationResponse = - plan.human_clarification_response != null && - plan.human_clarification_response.trim().length > 0; - const enableChat = hasClarificationRequest && !hasClarificationResponse; - const enableStepButtons = - (hasClarificationRequest && hasClarificationResponse) || - (!hasClarificationRequest && !hasClarificationResponse); - return { - plan, - agents, - steps, - hasClarificationRequest, - hasClarificationResponse, - enableChat, - enableStepButtons, - messages, - }; + return messages.map((message: AgentMessageBE): AgentMessageData => ({ + agent: message.agent, + agent_type: message.agent_type, + timestamp: message.timestamp ? new Date(message.timestamp).getTime() : Date.now(), + steps: message.steps || [], + next_steps: message.next_steps ?? [], + content: message.content, + raw_data: message.raw_data + })); } /** - * Get steps for a specific agent type - * @param plan Plan with steps - * @param agentType Agent type to filter by - * @returns Array of steps for the specified agent + * Converts TeamConfigurationBE to TeamConfig + * @param teamConfigBE - TeamConfigurationBE from backend + * @returns TeamConfig or null if input is null/undefined */ - static getStepsForAgent(plan: PlanWithSteps, agentType: AgentType): Step[] { - return plan.steps.filter(step => step.agent === agentType); - } + static convertTeamConfiguration(teamConfigBE: TeamConfigurationBE | null): TeamConfig | null { + if (!teamConfigBE) { + return null; + } + return { + id: teamConfigBE.id, + team_id: teamConfigBE.team_id, + name: teamConfigBE.name, + description: teamConfigBE.description || '', + status: teamConfigBE.status as 'visible' | 'hidden', + protected: false, // Default value since it's not in TeamConfigurationBE + created: teamConfigBE.created, + created_by: teamConfigBE.created_by, + logo: teamConfigBE.logo || '', + plan: teamConfigBE.plan || '', + agents: teamConfigBE.agents.map((agentBE: TeamAgentBE): Agent => ({ + input_key: agentBE.input_key, + type: agentBE.type, + name: agentBE.name, + deployment_name: agentBE.deployment_name, + system_message: agentBE.system_message, + description: agentBE.description, + icon: agentBE.icon, + index_name: agentBE.index_name, + use_rag: agentBE.use_rag, + use_mcp: agentBE.use_mcp, + coding_tools: agentBE.coding_tools, + // Additional fields that exist in Agent but not in TeamAgentBE + index_endpoint: undefined, + id: undefined, + capabilities: undefined, + role: undefined + })), + starting_tasks: teamConfigBE.starting_tasks.map((taskBE: StartingTaskBE): StartingTask => ({ + id: taskBE.id, + name: taskBE.name, + prompt: taskBE.prompt, + created: taskBE.created, + creator: taskBE.creator, + logo: taskBE.logo + })) + }; + } /** - * Get steps that are awaiting human feedback - * @param plan Plan with steps - * @returns Array of steps awaiting feedback + * Converts MPlanBE to MPlanData + * @param mplanBE - MPlanBE from backend + * @returns MPlanData or null if input is null/undefined */ - static getStepsAwaitingFeedback(plan: PlanWithSteps): Step[] { - return plan.steps.filter(step => step.status === StepStatus.AWAITING_FEEDBACK); + static convertMPlan(mplanBE: MPlanBE | null): MPlanData | null { + if (!mplanBE) { + return null; + } + + // Convert MStepBE[] to the MPlanData steps format + const steps = mplanBE.steps.map((stepBE: MStepBE, index: number) => ({ + id: index + 1, // MPlanData expects numeric id starting from 1 + action: stepBE.action, + cleanAction: stepBE.action + .replace(/\*\*/g, '') // Remove markdown bold + .replace(/^Certainly!\s*/i, '') + .replace(/^Given the team composition and the available facts,?\s*/i, '') + .replace(/^here is a (?:concise )?plan to[^.]*\.\s*/i, '') + .replace(/^\*\*([^*]+)\*\*:?\s*/g, '$1: ') + .replace(/^[-•]\s*/, '') + .replace(/\s+/g, ' ') + .trim(), + agent: stepBE.agent + })); + + return { + id: mplanBE.id, + status: mplanBE.overall_status.toString().toUpperCase(), + user_request: mplanBE.user_request, + team: mplanBE.team, + facts: mplanBE.facts, + steps: steps, + context: { + task: mplanBE.user_request, + participant_descriptions: {} // Default empty object since it's not in MPlanBE + }, + // Additional fields from m_plan + user_id: mplanBE.user_id, + team_id: mplanBE.team_id, + plan_id: mplanBE.plan_id, + overall_status: mplanBE.overall_status.toString(), + raw_data: mplanBE // Store the original object as raw_data + }; } + static processPlanData(planFromAPI: PlanFromAPI): ProcessedPlanData { + // Extract unique agents from steps - /** - * Check if plan is complete - * @param plan Plan with steps - * @returns Boolean indicating if plan is complete - */ - static isPlanComplete(plan: PlanWithSteps): boolean { - return plan.steps.every(step => - [StepStatus.COMPLETED, StepStatus.FAILED].includes(step.status) - ); + const plan = planFromAPI.plan; + const team = this.convertTeamConfiguration(planFromAPI.team); + const mplan = this.convertMPlan(planFromAPI.m_plan); + const messages: AgentMessageData[] = this.convertAgentMessages(planFromAPI.messages || []); + return { + plan, + team, + mplan, + messages + }; } /** - * Get plan completion percentage - * @param plan Plan with steps - * @returns Completion percentage (0-100) - */ - static getPlanCompletionPercentage(plan: PlanWithSteps): number { - if (!plan.steps.length) return 0; + * Converts AgentMessageData to AgentMessageResponse using ProcessedPlanData context + * @param agentMessage - AgentMessageData to convert + * @param planData - ProcessedPlanData for context (plan_id, user_id, etc.) + * @returns AgentMessageResponse + */ + static createAgentMessageResponse( + agentMessage: AgentMessageData, + planData: ProcessedPlanData, + is_final: boolean = false + ): AgentMessageResponse { + if (!planData || !planData.plan) { + console.log("Invalid plan data provided to createAgentMessageResponse"); + } + return { - const completedSteps = plan.steps.filter( - step => [StepStatus.COMPLETED, StepStatus.FAILED].includes(step.status) - ).length; + plan_id: planData.plan.plan_id, + agent: agentMessage.agent, + content: agentMessage.content, + agent_type: agentMessage.agent_type, + is_final: is_final, + raw_data: JSON.stringify(agentMessage.raw_data), - return Math.round((completedSteps / plan.steps.length) * 100); + }; } - - /** * Submit human clarification for a plan * @param planId Plan ID @@ -398,10 +472,10 @@ export class PlanDataService { const data = rawData.data; const content = data.content || ''; const timestamp = typeof data.timestamp === 'number' ? data.timestamp : null; - + // Parse the content for steps and next_steps (reuse existing logic) const { steps, next_steps } = this.parseContentForStepsAndNextSteps(content); - + return { agent: data.agent_name || 'UnknownAgent', agent_type: AgentMessageType.AI_AGENT, @@ -421,10 +495,10 @@ export class PlanDataService { if (rawData && typeof rawData === 'object' && rawData.agent_name) { const content = rawData.content || ''; const timestamp = typeof rawData.timestamp === 'number' ? rawData.timestamp : null; - + // Parse the content for steps and next_steps const { steps, next_steps } = this.parseContentForStepsAndNextSteps(content); - + return { agent: rawData.agent_name || 'UnknownAgent', agent_type: AgentMessageType.AI_AGENT, @@ -560,12 +634,7 @@ export class PlanDataService { * - { type: 'agent_message_streaming', data: { agent_name: 'X', content: 'partial', is_final: true } } * - "AgentMessageStreaming(agent_name='X', content='partial', is_final=False)" */ - static parseAgentMessageStreaming(rawData: any): { - agent: string; - content: string; - is_final: boolean; - raw_data: any; - } | null { + static parseAgentMessageStreaming(rawData: any): StreamingMessage | null { try { // Handle JSON string input - parse it first if (typeof rawData === 'string' && rawData.startsWith('{')) { @@ -583,6 +652,7 @@ export class PlanDataService { // New format: { type: 'agent_message_streaming', data: { agent_name: '...', content: '...', is_final: true } } const data = rawData.data; return { + type: WebsocketMessageType.AGENT_MESSAGE_STREAMING, agent: data.agent_name || 'UnknownAgent', content: data.content || '', is_final: Boolean(data.is_final), @@ -597,6 +667,7 @@ export class PlanDataService { // Handle direct object format if (rawData && typeof rawData === 'object' && rawData.agent_name) { return { + type: WebsocketMessageType.AGENT_MESSAGE_STREAMING, agent: rawData.agent_name || 'UnknownAgent', content: rawData.content || '', is_final: Boolean(rawData.is_final), @@ -629,7 +700,10 @@ export class PlanDataService { is_final = /True/i.test(finalMatch[1]); } - return { agent, content, is_final, raw_data: rawData }; + return { + type: WebsocketMessageType.AGENT_MESSAGE_STREAMING, + agent, content, is_final, raw_data: rawData + }; } catch (e) { console.error('Failed to parse streaming agent message:', e); return null; @@ -718,13 +792,7 @@ export class PlanDataService { * } * Returns null if not parsable. */ - static parseFinalResultMessage(rawData: any): { - type: WebsocketMessageType; - content: string; - status: string; - timestamp: number | null; - raw_data: any; - } | null { + static parseFinalResultMessage(rawData: any): FinalMessage | null { try { const extractPayload = (val: any, depth = 0): any => { if (depth > 10) return null; @@ -777,5 +845,34 @@ export class PlanDataService { } } + static simplifyHumanClarification(line: string): string { + if ( + typeof line !== 'string' || + !line.includes('Human clarification:') || + !line.includes('UserClarificationResponse(') + ) { + return line; + } + + // Capture the inside of UserClarificationResponse(...) + const outerMatch = line.match(/Human clarification:\s*UserClarificationResponse\((.*?)\)/s); + if (!outerMatch) return line; + + const inner = outerMatch[1]; + // Find answer= '...' | "..." + const answerMatch = inner.match(/answer=(?:"((?:\\.|[^"])*)"|'((?:\\.|[^'])*)')/); + if (!answerMatch) return line; + + let answer = answerMatch[1] ?? answerMatch[2] ?? ''; + // Unescape common sequences + answer = answer + .replace(/\\n/g, '\n') + .replace(/\\'/g, "'") + .replace(/\\"/g, '"') + .replace(/\\\\/g, '\\') + .trim(); + + return `Human clarification: ${answer}`; + } } \ No newline at end of file diff --git a/src/frontend/src/services/TaskService.tsx b/src/frontend/src/services/TaskService.tsx index 5630a8438..bb8651a3b 100644 --- a/src/frontend/src/services/TaskService.tsx +++ b/src/frontend/src/services/TaskService.tsx @@ -1,8 +1,7 @@ -import { PlanWithSteps, PlanStatus } from "../models"; +import { Plan, PlanStatus } from "../models"; import { Task } from "../models/taskList"; import { apiService } from "../api/apiService"; import { InputTask, InputTaskResponse } from "../models/inputTask"; -import { PlanDataService } from "./PlanDataService"; /** * TaskService - Service for handling task-related operations and transformations @@ -13,7 +12,7 @@ export class TaskService { * @param plansData Array of PlanWithSteps to transform * @returns Object containing inProgress and completed task arrays */ - static transformPlansToTasks(plansData: PlanWithSteps[]): { + static transformPlansToTasks(plansData: Plan[]): { inProgress: Task[]; completed: Task[]; } { @@ -28,8 +27,6 @@ export class TaskService { const task: Task = { id: plan.session_id, name: plan.initial_goal, - completed_steps: plan.completed, - total_steps: plan.total_steps, status: plan.overall_status === PlanStatus.COMPLETED ? "completed" : "inprogress", date: new Intl.DateTimeFormat(undefined, { dateStyle: "long", @@ -153,13 +150,13 @@ export class TaskService { // Convert camelCase and PascalCase to spaces // This regex finds lowercase letter followed by uppercase letter cleanedText = cleanedText.replace(/([a-z])([A-Z])/g, '$1 $2'); - + // Replace any remaining non-alphanumeric characters with spaces cleanedText = cleanedText.replace(/[^a-zA-Z0-9]/g, ' '); - + // Clean up multiple spaces and trim cleanedText = cleanedText.replace(/\s+/g, ' ').trim(); - + // Capitalize each word for better readability cleanedText = cleanedText.replace(/\b\w/g, (char) => char.toUpperCase()); diff --git a/src/frontend/src/services/WebSocketService.tsx b/src/frontend/src/services/WebSocketService.tsx index e52734a17..e17af7911 100644 --- a/src/frontend/src/services/WebSocketService.tsx +++ b/src/frontend/src/services/WebSocketService.tsx @@ -14,7 +14,7 @@ class WebSocketService { private isConnecting = false; - private buildSocketUrl(processId?: string, sessionId?: string): string { + private buildSocketUrl(processId?: string, planId?: string): string { const baseWsUrl = getApiUrl() || 'ws://localhost:8000'; // Trim and remove trailing slashes let base = (baseWsUrl || '').trim().replace(/\/+$/, ''); @@ -27,11 +27,11 @@ class WebSocketService { // Decide path addition const hasApiSegment = /\/api(\/|$)/i.test(base); const socketPath = hasApiSegment ? '/v3/socket' : '/api/v3/socket'; - const url = `${base}${socketPath}${processId ? `/${processId}` : `/${sessionId}`}`; + const url = `${base}${socketPath}${processId ? `/${processId}` : `/${planId}`}`; console.log("Constructed WebSocket URL:", url); return url; } - connect(sessionId: string, processId?: string): Promise { + connect(planId: string, processId?: string): Promise { return new Promise((resolve, reject) => { if (this.isConnecting) { reject(new Error('Connection already in progress')); @@ -43,7 +43,7 @@ class WebSocketService { } try { this.isConnecting = true; - const wsUrl = this.buildSocketUrl(processId, sessionId); + const wsUrl = this.buildSocketUrl(processId, planId); this.ws = new WebSocket(wsUrl); this.ws.onopen = () => { @@ -181,12 +181,6 @@ class WebSocketService { private handleMessage(message: StreamMessage): void { - //console.log('WebSocket message received:', message); - const hasClarification = /\bclarifications?\b/i.test(message.data || ''); - - if (hasClarification) { - console.log("Message contains 'clarification':", message.data); - } switch (message.type) { case WebsocketMessageType.PLAN_APPROVAL_REQUEST: { console.log("Message Plan Approval Request':", message); @@ -210,6 +204,7 @@ class WebSocketService { if (message.data) { console.log('WebSocket message received:', message); const transformed = PlanDataService.parseAgentMessage(message); + console.log('Transformed AGENT_MESSAGE:', transformed); this.emit(WebsocketMessageType.AGENT_MESSAGE, transformed); } @@ -217,8 +212,10 @@ class WebSocketService { } case WebsocketMessageType.AGENT_MESSAGE_STREAMING: { + console.log("Message streamming agent buffer:", message); if (message.data) { const streamedMessage = PlanDataService.parseAgentMessageStreaming(message); + console.log('WebSocket AGENT_MESSAGE_STREAMING message received:', streamedMessage); this.emit(WebsocketMessageType.AGENT_MESSAGE_STREAMING, streamedMessage); } break;