diff --git a/data/agent_teams/hr.json b/data/agent_teams/hr.json index 3e0086a7a..e2142ddd8 100644 --- a/data/agent_teams/hr.json +++ b/data/agent_teams/hr.json @@ -1,7 +1,7 @@ { "id": "3", "team_id": "team-3", - "name": "Retail Customer Success Team", + "name": "Human Resources Team", "status": "visible", "created": "", "created_by": "", @@ -43,7 +43,7 @@ "input_key": "", "type": "", "name": "ProxyAgent", - "deployment_name": "", + "deployment_name": "gpt-4.1", "icon": "", "system_message": "", "description": "", diff --git a/data/agent_teams/new 29.txt b/data/agent_teams/new 29.txt new file mode 100644 index 000000000..aa4da708d --- /dev/null +++ b/data/agent_teams/new 29.txt @@ -0,0 +1,48 @@ +Tasks: + +Done: Make 3 teams upload work to cosmos (HR, Marketing, Retail). We will load this Cosmos data on deploy as default teams. +Done: - call "/socket/{process_id}" (start_comms) to setup websocket +Done: Make sure that a team is always selected - start with the hr.json team + +call init_team API for the currently loaded team on App start - +Spinner / team-loading should display until this call returns (user should not be able to input tasks) + - say something like "team loading" with spinner +FE: send unload current team API and call init team for team switch - sending select_team(new team id) + spin while waiting for return of API +BE: unload old team - load new team - return status + +BE: For Francia - implement get_plans to fill in history from cosmos + +BE: Create a teams container in Cosmos and move all loaded team definitions there + +Implement saving of plan to cosmos -> history in... + +================ Request submit flow ====================== +on request submission call "/create_plan" (process_request) +This will return immediately - move to other page and display spinner -> "creating plan" +Socket will start receiving messages -> +Stream plan output into main window + +Will receive the PlanApprovalRequest message + Enable accept / reject UI +Send PlanApprovalResponse message when user answers + +If not approved + BE: plan will cancel on backend + FE: - enable input again for fresh request + Call input_request API on backend again (just like inputing any request) + +If approved: + Display plan steps in right pane if approved +============================================================= + +================== Message Streaming ======================== +Process socket message routing to display agent output + See message types in src\backend\v3\models\messages.py + for each message from agent - process stream then rollup + +On FinalResultMessage + display final result with all agent output in rollups by agent above +============================================================== + + diff --git a/data/datasets/Competitor_Pricing_Analysis.csv b/data/datasets/competitor_pricing_analysis.csv similarity index 100% rename from data/datasets/Competitor_Pricing_Analysis.csv rename to data/datasets/competitor_pricing_analysis.csv diff --git a/data/datasets/Customer_Churn_Analysis.csv b/data/datasets/customer_churn_analysis.csv similarity index 100% rename from data/datasets/Customer_Churn_Analysis.csv rename to data/datasets/customer_churn_analysis.csv diff --git a/data/datasets/Email_Marketing_Engagement.csv b/data/datasets/email_marketing_engagement.csv similarity index 100% rename from data/datasets/Email_Marketing_Engagement.csv rename to data/datasets/email_marketing_engagement.csv diff --git a/data/datasets/Loyalty_Program_Overview.csv b/data/datasets/loyalty_program_overview.csv similarity index 100% rename from data/datasets/Loyalty_Program_Overview.csv rename to data/datasets/loyalty_program_overview.csv diff --git a/data/datasets/Subscription_benefits_utilization.csv b/data/datasets/subscription_benefits_utilization.csv similarity index 100% rename from data/datasets/Subscription_benefits_utilization.csv rename to data/datasets/subscription_benefits_utilization.csv diff --git a/data/datasets/Unauthorized_Access_Attempts.csv b/data/datasets/unauthorized_access_attempts.csv similarity index 100% rename from data/datasets/Unauthorized_Access_Attempts.csv rename to data/datasets/unauthorized_access_attempts.csv diff --git a/data/datasets/Warehouse_Incident_Reports.csv b/data/datasets/warehouse_incident_reports.csv similarity index 100% rename from data/datasets/Warehouse_Incident_Reports.csv rename to data/datasets/warehouse_incident_reports.csv diff --git a/docs/mcp_server.md b/docs/mcp_server.md new file mode 100644 index 000000000..16c6e7268 --- /dev/null +++ b/docs/mcp_server.md @@ -0,0 +1,34 @@ +Capturing the notes from auth install before deleting for docs... + +### Auth section: +Requires and app registration as in azure_app_service_auth_setup.md so not deployed by default. + +To setup basic auth with FastMCP - bearer token - you can integrate with Azure by using it as your token provider. + +``` from fastmcp.server.auth import JWTVerifier``` + +``` +auth = JWTVerifier( + jwks_uri="https://login.microsoftonline.com/52b39610-0746-4c25-a83d-d4f89fadedfe/discovery/v2.0/keys", + #issuer="https://login.microsoftonline.com/52b39610-0746-4c25-a83d-d4f89fadedfe/v2.0", + # This issuer is not correct in the docs. Found by decoding the token. + issuer="https://sts.windows.net/52b39610-0746-4c25-a83d-d4f89fadedfe/", + algorithm="RS256", + audience="api://7a95e70b-062e-4cd3-a88c-603fc70e1c73" +) +``` + +Requires env vars: +``` +export MICROSOFT_CLIENT_ID="your-client-id" +export MICROSOFT_CLIENT_SECRET="your-client-secret" +export MICROSOFT_TENANT="common" # Or your tenant ID +``` + +```mcp = FastMCP("My MCP Server", auth=auth)``` + +For more complex and production - supports OAuth and PKCE + +Enabled through MCP enabled base - see lifecycle.py + + diff --git a/src/backend/.env.sample b/src/backend/.env.sample index 33c1c4267..965d3768e 100644 --- a/src/backend/.env.sample +++ b/src/backend/.env.sample @@ -20,6 +20,16 @@ AZURE_AI_AGENT_ENDPOINT= AZURE_BING_CONNECTION_NAME= REASONING_MODEL_NAME=o3 APP_ENV=dev -MCP_SERVER_ENDPOINT=http://localhost:9000/mcp +MCP_SERVER_ENDPOINT=http://localhost:8080/mcp +MCP_SERVER_NAME=MyMC +MCP_SERVER_DESCRIPTION=My MCP Server +TENANT_ID= +CLIENT_ID= BACKEND_API_URL=http://localhost:8000 -FRONTEND_SITE_NAME=* \ No newline at end of file +FRONTEND_SITE_NAME= +SUPPORTED_MODELS='["o3","o4-mini","gpt-4.1","gpt-4.1-mini"]' +AZURE_AI_SEARCH_CONNECTION_NAME= +AZURE_AI_SEARCH_INDEX_NAME= +AZURE_AI_SEARCH_ENDPOINT= +AZURE_AI_SEARCH_API_KEY= +BING_CONNECTION_NAME= diff --git a/src/backend/app_kernel.py b/src/backend/app_kernel.py index 659ef93e4..dae304f01 100644 --- a/src/backend/app_kernel.py +++ b/src/backend/app_kernel.py @@ -17,18 +17,19 @@ InputTask, Plan, PlanStatus, PlanWithSteps, Step, UserLanguage) from common.utils.event_utils import track_event_if_configured -from common.utils.utils_date import format_dates_in_messages + # Updated import for KernelArguments from common.utils.utils_kernel import rai_success -from common.utils.websocket_streaming import (websocket_streaming_endpoint, - ws_manager) + # FastAPI imports -from fastapi import FastAPI, HTTPException, Query, Request, WebSocket +from fastapi import (FastAPI, HTTPException, Query, Request, WebSocket, + WebSocketDisconnect) from fastapi.middleware.cors import CORSMiddleware from kernel_agents.agent_factory import AgentFactory # Local imports from middleware.health_check import HealthCheckMiddleware from v3.api.router import app_v3 + # Semantic Kernel imports from v3.orchestration.orchestration_manager import OrchestrationManager @@ -69,7 +70,10 @@ app.add_middleware( CORSMiddleware, allow_origins=[ - frontend_url + "http://localhost:3000", # Add this for local development + "https://localhost:3000", # Add this if using HTTPS locally + "http://127.0.0.1:3000", + "http://127.0.0.1:3001", ], # Allow all origins for development; restrict in production allow_credentials=True, allow_methods=["*"], @@ -82,14 +86,6 @@ app.include_router(app_v3) logging.info("Added health check middleware") - -# WebSocket streaming endpoint -@app.websocket("/ws/streaming") -async def websocket_endpoint(websocket: WebSocket): - """WebSocket endpoint for real-time plan execution streaming""" - await websocket_streaming_endpoint(websocket) - - @app.post("/api/user_browser_language") async def user_browser_language_endpoint(user_language: UserLanguage, request: Request): """ @@ -122,128 +118,128 @@ async def user_browser_language_endpoint(user_language: UserLanguage, request: R return {"status": "Language received successfully"} -@app.post("/api/input_task") -async def input_task_endpoint(input_task: InputTask, request: Request): - """ - Receive the initial input task from the user. - """ - # Fix 1: Properly await the async rai_success function - if not await rai_success(input_task.description, True): - print("RAI failed") - - track_event_if_configured( - "RAI failed", - { - "status": "Plan not created - RAI validation failed", - "description": input_task.description, - "session_id": input_task.session_id, - }, - ) - - return { - "status": "RAI_VALIDATION_FAILED", - "message": "Content Safety Check Failed", - "detail": "Your request contains content that doesn't meet our safety guidelines. Please modify your request to ensure it's appropriate and try again.", - "suggestions": [ - "Remove any potentially harmful, inappropriate, or unsafe content", - "Use more professional and constructive language", - "Focus on legitimate business or educational objectives", - "Ensure your request complies with content policies", - ], - } - authenticated_user = get_authenticated_user_details(request_headers=request.headers) - user_id = authenticated_user["user_principal_id"] - - if not user_id: - track_event_if_configured( - "UserIdNotFound", {"status_code": 400, "detail": "no user"} - ) - raise HTTPException(status_code=400, detail="no user") - - # Generate session ID if not provided - if not input_task.session_id: - input_task.session_id = str(uuid.uuid4()) - - try: - # Create all agents instead of just the planner agent - # This ensures other agents are created first and the planner has access to them - memory_store = await DatabaseFactory.get_database(user_id=user_id) - client = None - try: - client = config.get_ai_project_client() - except Exception as client_exc: - logging.error(f"Error creating AIProjectClient: {client_exc}") - - agents = await AgentFactory.create_all_agents( - session_id=input_task.session_id, - user_id=user_id, - memory_store=memory_store, - client=client, - ) - - group_chat_manager = agents[AgentType.GROUP_CHAT_MANAGER.value] - - # Convert input task to JSON for the kernel function, add user_id here - - # Use the planner to handle the task - await group_chat_manager.handle_input_task(input_task) - - # Get plan from memory store - plan = await memory_store.get_plan_by_session(input_task.session_id) - - if not plan: # If the plan is not found, raise an error - track_event_if_configured( - "PlanNotFound", - { - "status": "Plan not found", - "session_id": input_task.session_id, - "description": input_task.description, - }, - ) - raise HTTPException(status_code=404, detail="Plan not found") - # Log custom event for successful input task processing - track_event_if_configured( - "InputTaskProcessed", - { - "status": f"Plan created with ID: {plan.id}", - "session_id": input_task.session_id, - "plan_id": plan.id, - "description": input_task.description, - }, - ) - if client: - try: - client.close() - except Exception as e: - logging.error(f"Error sending to AIProjectClient: {e}") - return { - "status": f"Plan created with ID: {plan.id}", - "session_id": input_task.session_id, - "plan_id": plan.id, - "description": input_task.description, - } - - except Exception as e: - # Extract clean error message for rate limit errors - error_msg = str(e) - if "Rate limit is exceeded" in error_msg: - match = re.search( - r"Rate limit is exceeded\. Try again in (\d+) seconds?\.", error_msg - ) - if match: - error_msg = "Application temporarily unavailable due to quota limits. Please try again later." - - track_event_if_configured( - "InputTaskError", - { - "session_id": input_task.session_id, - "description": input_task.description, - "error": str(e), - }, - ) - raise HTTPException( - status_code=400, detail=f"Error creating plan: {error_msg}" - ) from e +# @app.post("/api/input_task") +# async def input_task_endpoint(input_task: InputTask, request: Request): +# """ +# Receive the initial input task from the user. +# """ +# # Fix 1: Properly await the async rai_success function +# if not await rai_success(input_task.description, True): +# print("RAI failed") + +# track_event_if_configured( +# "RAI failed", +# { +# "status": "Plan not created - RAI validation failed", +# "description": input_task.description, +# "session_id": input_task.session_id, +# }, +# ) + +# return { +# "status": "RAI_VALIDATION_FAILED", +# "message": "Content Safety Check Failed", +# "detail": "Your request contains content that doesn't meet our safety guidelines. Please modify your request to ensure it's appropriate and try again.", +# "suggestions": [ +# "Remove any potentially harmful, inappropriate, or unsafe content", +# "Use more professional and constructive language", +# "Focus on legitimate business or educational objectives", +# "Ensure your request complies with content policies", +# ], +# } +# authenticated_user = get_authenticated_user_details(request_headers=request.headers) +# user_id = authenticated_user["user_principal_id"] + +# if not user_id: +# track_event_if_configured( +# "UserIdNotFound", {"status_code": 400, "detail": "no user"} +# ) +# raise HTTPException(status_code=400, detail="no user") + +# # Generate session ID if not provided +# if not input_task.session_id: +# input_task.session_id = str(uuid.uuid4()) + +# try: +# # Create all agents instead of just the planner agent +# # This ensures other agents are created first and the planner has access to them +# memory_store = await DatabaseFactory.get_database(user_id=user_id) +# client = None +# try: +# client = config.get_ai_project_client() +# except Exception as client_exc: +# logging.error(f"Error creating AIProjectClient: {client_exc}") + +# agents = await AgentFactory.create_all_agents( +# session_id=input_task.session_id, +# user_id=user_id, +# memory_store=memory_store, +# client=client, +# ) + +# group_chat_manager = agents[AgentType.GROUP_CHAT_MANAGER.value] + +# # Convert input task to JSON for the kernel function, add user_id here + +# # Use the planner to handle the task +# await group_chat_manager.handle_input_task(input_task) + +# # Get plan from memory store +# plan = await memory_store.get_plan_by_session(input_task.session_id) + +# if not plan: # If the plan is not found, raise an error +# track_event_if_configured( +# "PlanNotFound", +# { +# "status": "Plan not found", +# "session_id": input_task.session_id, +# "description": input_task.description, +# }, +# ) +# raise HTTPException(status_code=404, detail="Plan not found") +# # Log custom event for successful input task processing +# track_event_if_configured( +# "InputTaskProcessed", +# { +# "status": f"Plan created with ID: {plan.id}", +# "session_id": input_task.session_id, +# "plan_id": plan.id, +# "description": input_task.description, +# }, +# ) +# if client: +# try: +# client.close() +# except Exception as e: +# logging.error(f"Error sending to AIProjectClient: {e}") +# return { +# "status": f"Plan created with ID: {plan.id}", +# "session_id": input_task.session_id, +# "plan_id": plan.id, +# "description": input_task.description, +# } + +# except Exception as e: +# # Extract clean error message for rate limit errors +# error_msg = str(e) +# if "Rate limit is exceeded" in error_msg: +# match = re.search( +# r"Rate limit is exceeded\. Try again in (\d+) seconds?\.", error_msg +# ) +# if match: +# error_msg = "Application temporarily unavailable due to quota limits. Please try again later." + +# track_event_if_configured( +# "InputTaskError", +# { +# "session_id": input_task.session_id, +# "description": input_task.description, +# "error": str(e), +# }, +# ) +# raise HTTPException( +# status_code=400, detail=f"Error creating plan: {error_msg}" +# ) from e @app.post("/api/human_feedback") @@ -670,11 +666,9 @@ async def get_plans( "UserIdNotFound", {"status_code": 400, "detail": "no user"} ) raise HTTPException(status_code=400, detail="no user") + - # Initialize agent team for this user session - await OrchestrationManager.get_current_orchestration(user_id=user_id) - - # Replace the following with code to get plan run history from the database + #### Replace the following with code to get plan run history from the database # # Initialize memory context # memory_store = await DatabaseFactory.get_database(user_id=user_id) @@ -730,8 +724,7 @@ async def get_plans( # list_of_plans_with_steps.append(plan_with_steps) return [] - - + @app.get("/api/steps/{plan_id}", response_model=List[Step]) async def get_steps_by_plan(plan_id: str, request: Request) -> List[Step]: """ @@ -895,80 +888,80 @@ async def get_agent_tools(): return [] -@app.post("/api/test/streaming/{plan_id}") -async def test_streaming_updates(plan_id: str): - """ - Test endpoint to simulate streaming updates for a plan. - This is for testing the WebSocket streaming functionality. - """ - from common.utils.websocket_streaming import (send_agent_message, - send_plan_update, - send_step_update) - - try: - # Simulate a series of streaming updates - await send_agent_message( - plan_id=plan_id, - agent_name="Data Analyst", - content="Starting analysis of the data...", - message_type="thinking", - ) - - await asyncio.sleep(1) - - await send_plan_update( - plan_id=plan_id, - step_id="step_1", - agent_name="Data Analyst", - content="Analyzing customer data patterns...", - status="in_progress", - message_type="action", - ) - - await asyncio.sleep(2) - - await send_agent_message( - plan_id=plan_id, - agent_name="Data Analyst", - content="Found 3 key insights in the customer data. Processing recommendations...", - message_type="result", - ) - - await asyncio.sleep(1) - - await send_step_update( - plan_id=plan_id, - step_id="step_1", - status="completed", - content="Data analysis completed successfully!", - ) - - await send_agent_message( - plan_id=plan_id, - agent_name="Business Advisor", - content="Reviewing the analysis results and preparing strategic recommendations...", - message_type="thinking", - ) - - await asyncio.sleep(2) - - await send_plan_update( - plan_id=plan_id, - step_id="step_2", - agent_name="Business Advisor", - content="Based on the data analysis, I recommend focusing on customer retention strategies for the identified high-value segments.", - status="completed", - message_type="result", - ) - - return { - "status": "success", - "message": f"Test streaming updates sent for plan {plan_id}", - } - - except Exception as e: - logging.error(f"Error sending test streaming updates: {e}") - raise HTTPException(status_code=500, detail=str(e)) +# @app.post("/api/test/streaming/{plan_id}") +# async def test_streaming_updates(plan_id: str): +# """ +# Test endpoint to simulate streaming updates for a plan. +# This is for testing the WebSocket streaming functionality. +# """ +# from common.utils.websocket_streaming import (send_agent_message, +# send_plan_update, +# send_step_update) + +# try: +# # Simulate a series of streaming updates +# await send_agent_message( +# plan_id=plan_id, +# agent_name="Data Analyst", +# content="Starting analysis of the data...", +# message_type="thinking", +# ) + +# await asyncio.sleep(1) + +# await send_plan_update( +# plan_id=plan_id, +# step_id="step_1", +# agent_name="Data Analyst", +# content="Analyzing customer data patterns...", +# status="in_progress", +# message_type="action", +# ) + +# await asyncio.sleep(2) + +# await send_agent_message( +# plan_id=plan_id, +# agent_name="Data Analyst", +# content="Found 3 key insights in the customer data. Processing recommendations...", +# message_type="result", +# ) + +# await asyncio.sleep(1) + +# await send_step_update( +# plan_id=plan_id, +# step_id="step_1", +# status="completed", +# content="Data analysis completed successfully!", +# ) + +# await send_agent_message( +# plan_id=plan_id, +# agent_name="Business Advisor", +# content="Reviewing the analysis results and preparing strategic recommendations...", +# message_type="thinking", +# ) + +# await asyncio.sleep(2) + +# await send_plan_update( +# plan_id=plan_id, +# step_id="step_2", +# agent_name="Business Advisor", +# content="Based on the data analysis, I recommend focusing on customer retention strategies for the identified high-value segments.", +# status="completed", +# message_type="result", +# ) + +# return { +# "status": "success", +# "message": f"Test streaming updates sent for plan {plan_id}", +# } + +# except Exception as e: +# logging.error(f"Error sending test streaming updates: {e}") +# raise HTTPException(status_code=500, detail=str(e)) # Run the app diff --git a/src/backend/common/config/app_config.py b/src/backend/common/config/app_config.py index ff8306b9a..ba01909fa 100644 --- a/src/backend/common/config/app_config.py +++ b/src/backend/common/config/app_config.py @@ -55,7 +55,7 @@ def __init__(self): self.AZURE_BING_CONNECTION_NAME = self._get_optional( "AZURE_BING_CONNECTION_NAME" ) - + self.SUPPORTED_MODELS = self._get_optional("SUPPORTED_MODELS") # Frontend settings self.FRONTEND_SITE_NAME = self._get_optional( "FRONTEND_SITE_NAME", "http://127.0.0.1:3000" @@ -74,6 +74,15 @@ def __init__(self): # Optional MCP server endpoint (for local MCP server or remote) # Example: http://127.0.0.1:8000/mcp self.MCP_SERVER_ENDPOINT = self._get_optional("MCP_SERVER_ENDPOINT") + self.MCP_SERVER_NAME = self._get_optional("MCP_SERVER_NAME", "MCPGreetingServer") + self.MCP_SERVER_DESCRIPTION = self._get_optional("MCP_SERVER_DESCRIPTION", "MCP server with greeting and planning tools") + self.TENANT_ID = self._get_optional("TENANT_ID") + self.CLIENT_ID = self._get_optional("CLIENT_ID") + self.AZURE_AI_SEARCH_CONNECTION_NAME = self._get_optional("AZURE_AI_SEARCH_CONNECTION_NAME") + self.AZURE_AI_SEARCH_INDEX_NAME = self._get_optional("AZURE_AI_SEARCH_INDEX_NAME") + self.AZURE_AI_SEARCH_ENDPOINT = self._get_optional("AZURE_AI_SEARCH_ENDPOINT") + self.AZURE_AI_SEARCH_API_KEY = self._get_optional("AZURE_AI_SEARCH_API_KEY") + self.BING_CONNECTION_NAME = self._get_optional("BING_CONNECTION_NAME") test_team_json = self._get_optional("TEST_TEAM_JSON") diff --git a/src/backend/common/database/database_factory.py b/src/backend/common/database/database_factory.py index cc8586598..8c2f9fb0e 100644 --- a/src/backend/common/database/database_factory.py +++ b/src/backend/common/database/database_factory.py @@ -3,9 +3,10 @@ import logging from typing import Optional +from common.config.app_config import config + from .cosmosdb import CosmosDBClient from .database_base import DatabaseBase -from common.config.app_config import config class DatabaseFactory: diff --git a/src/backend/common/models/messages_kernel.py b/src/backend/common/models/messages_kernel.py index e0ec3f87a..e2f2910ef 100644 --- a/src/backend/common/models/messages_kernel.py +++ b/src/backend/common/models/messages_kernel.py @@ -98,18 +98,25 @@ class Session(BaseDataModel): current_status: str message_to_user: Optional[str] = None +class UserCurrentTeam(BaseDataModel): + """Represents the current team of a user.""" + + data_type: Literal["user_current_team"] = Field("user_current_team", Literal=True) + user_id: str + team_id: str class Plan(BaseDataModel): """Represents a plan containing multiple steps.""" data_type: Literal["plan"] = Field("plan", Literal=True) - team_id: str + plan_id: str session_id: str user_id: str initial_goal: str overall_status: PlanStatus = PlanStatus.in_progress source: str = AgentType.PLANNER.value summary: Optional[str] = None + team_id: Optional[str] = None human_clarification_request: Optional[str] = None human_clarification_response: Optional[str] = None @@ -156,12 +163,15 @@ class TeamAgent(KernelBaseModel): input_key: str type: str name: str + deployment_name: str system_message: str = "" description: str = "" icon: str index_name: str = "" use_rag: bool = False use_mcp: bool = False + use_bing: bool = False + use_reasoning: bool = False coding_tools: bool = False @@ -175,6 +185,10 @@ class StartingTask(KernelBaseModel): creator: str logo: str +class TeamSelectionRequest(KernelBaseModel): + """Request model for team selection.""" + team_id: str + session_id: Optional[str] = None class TeamConfiguration(BaseDataModel): """Represents a team configuration stored in the database.""" @@ -242,7 +256,7 @@ class InputTask(KernelBaseModel): session_id: str description: str # Initial goal - team_id: str + # team_id: str class UserLanguage(KernelBaseModel): diff --git a/src/backend/common/utils/utils_date.py b/src/backend/common/utils/utils_date.py index 0e2c0a513..7e3a6f39c 100644 --- a/src/backend/common/utils/utils_date.py +++ b/src/backend/common/utils/utils_date.py @@ -1,8 +1,10 @@ import json import locale -from datetime import datetime import logging +from datetime import datetime from typing import Optional + +import regex as re from dateutil import parser diff --git a/src/backend/pyproject.toml b/src/backend/pyproject.toml index ee307a854..9d0379dea 100644 --- a/src/backend/pyproject.toml +++ b/src/backend/pyproject.toml @@ -31,5 +31,5 @@ dependencies = [ "uvicorn>=0.34.2", "pylint-pydantic>=0.3.5", "pexpect>=4.9.0", - "fastmcp==2.11.3", + "mcp>=1.13.1" ] diff --git a/src/backend/uv.lock b/src/backend/uv.lock index 4cce9c072..7f47f3b75 100644 --- a/src/backend/uv.lock +++ b/src/backend/uv.lock @@ -448,6 +448,7 @@ dependencies = [ { name = "azure-monitor-opentelemetry" }, { name = "azure-search-documents" }, { name = "fastapi" }, + { name = "mcp" }, { name = "openai" }, { name = "opentelemetry-api" }, { name = "opentelemetry-exporter-otlp-proto-grpc" }, @@ -478,6 +479,7 @@ requires-dist = [ { name = "azure-monitor-opentelemetry", specifier = ">=1.6.8" }, { name = "azure-search-documents", specifier = ">=11.5.2" }, { name = "fastapi", specifier = ">=0.115.12" }, + { name = "mcp", specifier = ">=1.13.1" }, { name = "openai", specifier = ">=1.75.0" }, { name = "opentelemetry-api", specifier = ">=1.31.1" }, { name = "opentelemetry-exporter-otlp-proto-grpc", specifier = ">=1.31.1" }, @@ -1053,6 +1055,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" }, ] +[[package]] +name = "httpx-sse" +version = "0.4.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/6e/fa/66bd985dd0b7c109a3bcb89272ee0bfb7e2b4d06309ad7b38ff866734b2a/httpx_sse-0.4.1.tar.gz", hash = "sha256:8f44d34414bc7b21bf3602713005c5df4917884f76072479b21f68befa4ea26e", size = 12998, upload-time = "2025-06-24T13:21:05.71Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/25/0a/6269e3473b09aed2dab8aa1a600c70f31f00ae1349bee30658f7e358a159/httpx_sse-0.4.1-py3-none-any.whl", hash = "sha256:cba42174344c3a5b06f255ce65b350880f962d99ead85e776f23c6618a377a37", size = 8054, upload-time = "2025-06-24T13:21:04.772Z" }, +] + [[package]] name = "idna" version = "3.10" @@ -1329,6 +1340,28 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/27/1a/1f68f9ba0c207934b35b86a8ca3aad8395a3d6dd7921c0686e23853ff5a9/mccabe-0.7.0-py2.py3-none-any.whl", hash = "sha256:6c2d30ab6be0e4a46919781807b4f0d834ebdd6c6e3dca0bda5a15f863427b6e", size = 7350, upload-time = "2022-01-24T01:14:49.62Z" }, ] +[[package]] +name = "mcp" +version = "1.13.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "httpx" }, + { name = "httpx-sse" }, + { name = "jsonschema" }, + { name = "pydantic" }, + { name = "pydantic-settings" }, + { name = "python-multipart" }, + { name = "pywin32", marker = "sys_platform == 'win32'" }, + { name = "sse-starlette" }, + { name = "starlette" }, + { name = "uvicorn", marker = "sys_platform != 'emscripten'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/66/3c/82c400c2d50afdac4fbefb5b4031fd327e2ad1f23ccef8eee13c5909aa48/mcp-1.13.1.tar.gz", hash = "sha256:165306a8fd7991dc80334edd2de07798175a56461043b7ae907b279794a834c5", size = 438198, upload-time = "2025-08-22T09:22:16.061Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/19/3f/d085c7f49ade6d273b185d61ec9405e672b6433f710ea64a90135a8dd445/mcp-1.13.1-py3-none-any.whl", hash = "sha256:c314e7c8bd477a23ba3ef472ee5a32880316c42d03e06dcfa31a1cc7a73b65df", size = 161494, upload-time = "2025-08-22T09:22:14.705Z" }, +] + [[package]] name = "more-itertools" version = "10.7.0" @@ -2492,6 +2525,25 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/81/c4/34e93fe5f5429d7570ec1fa436f1986fb1f00c3e0f43a589fe2bbcd22c3f/pytz-2025.2-py2.py3-none-any.whl", hash = "sha256:5ddf76296dd8c44c26eb8f4b6f35488f3ccbf6fbbd7adee0b7262d43f0ec2f00", size = 509225, upload-time = "2025-03-25T02:24:58.468Z" }, ] +[[package]] +name = "pywin32" +version = "311" +source = { registry = "https://pypi.org/simple" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7c/af/449a6a91e5d6db51420875c54f6aff7c97a86a3b13a0b4f1a5c13b988de3/pywin32-311-cp311-cp311-win32.whl", hash = "sha256:184eb5e436dea364dcd3d2316d577d625c0351bf237c4e9a5fabbcfa5a58b151", size = 8697031, upload-time = "2025-07-14T20:13:13.266Z" }, + { url = "https://files.pythonhosted.org/packages/51/8f/9bb81dd5bb77d22243d33c8397f09377056d5c687aa6d4042bea7fbf8364/pywin32-311-cp311-cp311-win_amd64.whl", hash = "sha256:3ce80b34b22b17ccbd937a6e78e7225d80c52f5ab9940fe0506a1a16f3dab503", size = 9508308, upload-time = "2025-07-14T20:13:15.147Z" }, + { url = "https://files.pythonhosted.org/packages/44/7b/9c2ab54f74a138c491aba1b1cd0795ba61f144c711daea84a88b63dc0f6c/pywin32-311-cp311-cp311-win_arm64.whl", hash = "sha256:a733f1388e1a842abb67ffa8e7aad0e70ac519e09b0f6a784e65a136ec7cefd2", size = 8703930, upload-time = "2025-07-14T20:13:16.945Z" }, + { url = "https://files.pythonhosted.org/packages/e7/ab/01ea1943d4eba0f850c3c61e78e8dd59757ff815ff3ccd0a84de5f541f42/pywin32-311-cp312-cp312-win32.whl", hash = "sha256:750ec6e621af2b948540032557b10a2d43b0cee2ae9758c54154d711cc852d31", size = 8706543, upload-time = "2025-07-14T20:13:20.765Z" }, + { url = "https://files.pythonhosted.org/packages/d1/a8/a0e8d07d4d051ec7502cd58b291ec98dcc0c3fff027caad0470b72cfcc2f/pywin32-311-cp312-cp312-win_amd64.whl", hash = "sha256:b8c095edad5c211ff31c05223658e71bf7116daa0ecf3ad85f3201ea3190d067", size = 9495040, upload-time = "2025-07-14T20:13:22.543Z" }, + { url = "https://files.pythonhosted.org/packages/ba/3a/2ae996277b4b50f17d61f0603efd8253cb2d79cc7ae159468007b586396d/pywin32-311-cp312-cp312-win_arm64.whl", hash = "sha256:e286f46a9a39c4a18b319c28f59b61de793654af2f395c102b4f819e584b5852", size = 8710102, upload-time = "2025-07-14T20:13:24.682Z" }, + { url = "https://files.pythonhosted.org/packages/a5/be/3fd5de0979fcb3994bfee0d65ed8ca9506a8a1260651b86174f6a86f52b3/pywin32-311-cp313-cp313-win32.whl", hash = "sha256:f95ba5a847cba10dd8c4d8fefa9f2a6cf283b8b88ed6178fa8a6c1ab16054d0d", size = 8705700, upload-time = "2025-07-14T20:13:26.471Z" }, + { url = "https://files.pythonhosted.org/packages/e3/28/e0a1909523c6890208295a29e05c2adb2126364e289826c0a8bc7297bd5c/pywin32-311-cp313-cp313-win_amd64.whl", hash = "sha256:718a38f7e5b058e76aee1c56ddd06908116d35147e133427e59a3983f703a20d", size = 9494700, upload-time = "2025-07-14T20:13:28.243Z" }, + { url = "https://files.pythonhosted.org/packages/04/bf/90339ac0f55726dce7d794e6d79a18a91265bdf3aa70b6b9ca52f35e022a/pywin32-311-cp313-cp313-win_arm64.whl", hash = "sha256:7b4075d959648406202d92a2310cb990fea19b535c7f4a78d3f5e10b926eeb8a", size = 8709318, upload-time = "2025-07-14T20:13:30.348Z" }, + { url = "https://files.pythonhosted.org/packages/c9/31/097f2e132c4f16d99a22bfb777e0fd88bd8e1c634304e102f313af69ace5/pywin32-311-cp314-cp314-win32.whl", hash = "sha256:b7a2c10b93f8986666d0c803ee19b5990885872a7de910fc460f9b0c2fbf92ee", size = 8840714, upload-time = "2025-07-14T20:13:32.449Z" }, + { url = "https://files.pythonhosted.org/packages/90/4b/07c77d8ba0e01349358082713400435347df8426208171ce297da32c313d/pywin32-311-cp314-cp314-win_amd64.whl", hash = "sha256:3aca44c046bd2ed8c90de9cb8427f581c479e594e99b5c0bb19b29c10fd6cb87", size = 9656800, upload-time = "2025-07-14T20:13:34.312Z" }, + { url = "https://files.pythonhosted.org/packages/c0/d2/21af5c535501a7233e734b8af901574572da66fcc254cb35d0609c9080dd/pywin32-311-cp314-cp314-win_arm64.whl", hash = "sha256:a508e2d9025764a8270f93111a970e1d0fbfc33f4153b388bb649b7eec4f9b42", size = 8932540, upload-time = "2025-07-14T20:13:36.379Z" }, +] + [[package]] name = "pyyaml" version = "6.0.2" @@ -2924,6 +2976,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e9/44/75a9c9421471a6c4805dbf2356f7c181a29c1879239abab1ea2cc8f38b40/sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2", size = 10235, upload-time = "2024-02-25T23:20:01.196Z" }, ] +[[package]] +name = "sse-starlette" +version = "3.0.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/42/6f/22ed6e33f8a9e76ca0a412405f31abb844b779d52c5f96660766edcd737c/sse_starlette-3.0.2.tar.gz", hash = "sha256:ccd60b5765ebb3584d0de2d7a6e4f745672581de4f5005ab31c3a25d10b52b3a", size = 20985, upload-time = "2025-07-27T09:07:44.565Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ef/10/c78f463b4ef22eef8491f218f692be838282cd65480f6e423d7730dfd1fb/sse_starlette-3.0.2-py3-none-any.whl", hash = "sha256:16b7cbfddbcd4eaca11f7b586f3b8a080f1afe952c15813455b162edea619e5a", size = 11297, upload-time = "2025-07-27T09:07:43.268Z" }, +] + [[package]] name = "starlette" version = "0.47.3" diff --git a/src/backend/v3/api/router.py b/src/backend/v3/api/router.py index 1e69bc3bd..229058a95 100644 --- a/src/backend/v3/api/router.py +++ b/src/backend/v3/api/router.py @@ -1,36 +1,134 @@ +import asyncio +import contextvars import json import logging import uuid from typing import Optional +import v3.models.messages as messages from auth.auth_utils import get_authenticated_user_details -from common.config.app_config import config from common.database.database_factory import DatabaseFactory -from common.models.messages_kernel import (GeneratePlanRequest, InputTask, - Plan, PlanStatus) +from common.models.messages_kernel import (GeneratePlanRequest, InputTask, PlanStatus, + TeamSelectionRequest, Plan) from common.utils.event_utils import track_event_if_configured from common.utils.utils_kernel import rai_success, rai_validate_team_config from fastapi import (APIRouter, BackgroundTasks, Depends, FastAPI, File, HTTPException, Request, UploadFile, WebSocket, WebSocketDisconnect) from kernel_agents.agent_factory import AgentFactory -from pydantic import BaseModel from semantic_kernel.agents.runtime import InProcessRuntime from v3.common.services.team_service import TeamService +from v3.config.settings import connection_config, current_user_id, team_config from v3.orchestration.orchestration_manager import OrchestrationManager - -class TeamSelectionRequest(BaseModel): - """Request model for team selection.""" - team_id: str - session_id: Optional[str] = None - +router = APIRouter() +logger = logging.getLogger(__name__) app_v3 = APIRouter( prefix="/api/v3", responses={404: {"description": "Not found"}}, ) +@app_v3.websocket("/socket/{process_id}") +async def start_comms(websocket: WebSocket, process_id: str): + """ Web-Socket endpoint for real-time process status updates. """ + + # Always accept the WebSocket connection first + await websocket.accept() + + user_id = None + try: + # WebSocket headers are different, try to get user info + headers = dict(websocket.headers) + authenticated_user = get_authenticated_user_details(request_headers=headers) + user_id = authenticated_user.get("user_principal_id") + if not user_id: + user_id = "00000000-0000-0000-0000-000000000000" + except Exception as e: + logging.warning(f"Could not extract user from WebSocket headers: {e}") + user_id = "00000000-0000-0000-0000-000000000000" + + current_user_id.set(user_id) + + # Add to the connection manager for backend updates + connection_config.add_connection(process_id=process_id, connection=websocket, user_id=user_id) + track_event_if_configured("WebSocketConnectionAccepted", {"process_id": process_id, "user_id": user_id}) + + # Keep the connection open - FastAPI will close the connection if this returns + try: + # Keep the connection open - FastAPI will close the connection if this returns + while True: + # no expectation that we will receive anything from the client but this keeps + # the connection open and does not take cpu cycle + try: + message = await websocket.receive_text() + logging.debug(f"Received WebSocket message from {user_id}: {message}") + except asyncio.TimeoutError: + pass + except WebSocketDisconnect: + track_event_if_configured("WebSocketDisconnect", {"process_id": process_id, "user_id": user_id}) + logging.info(f"Client disconnected from batch {process_id}") + break + except Exception as e: + # Fixed logging syntax - removed the error= parameter + logging.error(f"Error in WebSocket connection: {str(e)}") + finally: + # Always clean up the connection + await connection_config.close_connection(user_id) + +@app_v3.get("/init_team") +async def init_team( + request: Request, +): + """ Initialize the user's current team of agents """ + + # Need to store this user state in cosmos db, retrieve it here, and initialize the team + # current in-memory store is in team_config from settings.py + # For now I will set the initial install team ids as 00000000-0000-0000-0000-000000000001 (HR), + # 00000000-0000-0000-0000-000000000002 (Marketing), and 00000000-0000-0000-0000-000000000003 (Retail), + # and use this value to initialize to HR each time. + init_team_id = "00000000-0000-0000-0000-000000000001" + + try: + authenticated_user = get_authenticated_user_details(request_headers=request.headers) + user_id = authenticated_user["user_principal_id"] + if not user_id: + track_event_if_configured( + "UserIdNotFound", {"status_code": 400, "detail": "no user"} + ) + raise HTTPException(status_code=400, detail="no user") + + # Initialize memory store and service + memory_store = await DatabaseFactory.get_database(user_id=user_id) + team_service = TeamService(memory_store) + + # Verify the team exists and user has access to it + team_configuration = await team_service.get_team_configuration(init_team_id, user_id) + if team_configuration is None: + raise HTTPException( + status_code=404, + detail=f"Team configuration '{init_team_id}' not found or access denied" + ) + + # Set as current team in memory + team_config.set_current_team(user_id=user_id, team_configuration=team_configuration) + + # Initialize agent team for this user session + await OrchestrationManager.get_current_or_new_orchestration(user_id=user_id, team_config=team_configuration) + + return { + "status": "Request started successfully", + "team_id": init_team_id + } + + except Exception as e: + track_event_if_configured( + "InitTeamFailed", + { + "error": str(e), + }, + ) + raise HTTPException(status_code=400, detail=f"Error starting request: {e}") from e @app_v3.post("/create_plan") async def process_request(background_tasks: BackgroundTasks, input_task: InputTask, request: Request): @@ -82,30 +180,32 @@ async def process_request(background_tasks: BackgroundTasks, input_task: InputTa type: string description: Error message """ - if not await rai_success(input_task.description, False): - track_event_if_configured( - "RAI failed", - { - "status": "Plan not created - RAI check failed", - "description": input_task.description, - "session_id": input_task.session_id, - }, - ) - raise HTTPException( - status_code=400, - detail={ - "error_type": "RAI_VALIDATION_FAILED", - "message": "Content Safety Check Failed", - "description": "Your request contains content that doesn't meet our safety guidelines. Please modify your request to ensure it's appropriate and try again.", - "suggestions": [ - "Remove any potentially harmful, inappropriate, or unsafe content", - "Use more professional and constructive language", - "Focus on legitimate business or educational objectives", - "Ensure your request complies with content policies", - ], - "user_action": "Please revise your request and try again", - }, - ) + + + # if not await rai_success(input_task.description, False): + # track_event_if_configured( + # "RAI failed", + # { + # "status": "Plan not created - RAI check failed", + # "description": input_task.description, + # "session_id": input_task.session_id, + # }, + # ) + # raise HTTPException( + # status_code=400, + # detail={ + # "error_type": "RAI_VALIDATION_FAILED", + # "message": "Content Safety Check Failed", + # "description": "Your request contains content that doesn't meet our safety guidelines. Please modify your request to ensure it's appropriate and try again.", + # "suggestions": [ + # "Remove any potentially harmful, inappropriate, or unsafe content", + # "Use more professional and constructive language", + # "Focus on legitimate business or educational objectives", + # "Ensure your request complies with content policies", + # ], + # "user_action": "Please revise your request and try again", + # }, + # ) authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] @@ -116,21 +216,71 @@ async def process_request(background_tasks: BackgroundTasks, input_task: InputTa ) raise HTTPException(status_code=400, detail="no user") - if not input_task.team_id: - track_event_if_configured( - "TeamIDNofound", {"status_code": 400, "detail": "no team id"} - ) - raise HTTPException(status_code=400, detail="no team id") + # if not input_task.team_id: + # track_event_if_configured( + # "TeamIDNofound", {"status_code": 400, "detail": "no team id"} + # ) + # raise HTTPException(status_code=400, detail="no team id") if not input_task.session_id: input_task.session_id = str(uuid.uuid4()) + try: + plan_id = str(uuid.uuid4()) + # Initialize memory store and service + memory_store = await DatabaseFactory.get_database(user_id=user_id) + plan = Plan( + id=plan_id, + plan_id=plan_id, + user_id=user_id, + session_id=input_task.session_id, + team_id=None, #TODO add current_team_id + initial_goal=input_task.description, + overall_status=PlanStatus.in_progress, + ) + await memory_store.add_plan(plan) + + + track_event_if_configured( + "PlanCreated", + { + "status": "success", + "plan_id": plan.plan_id, + "session_id": input_task.session_id, + "user_id": user_id, + "team_id": "", #TODO add current_team_id + "description": input_task.description, + }, + ) + except Exception as e: + print(f"Error creating plan: {e}") + track_event_if_configured( + "PlanCreationFailed", + { + "status": "error", + "description": input_task.description, + "session_id": input_task.session_id, + "user_id": user_id, + "error": str(e), + }, + ) + raise HTTPException(status_code=500, detail="Failed to create plan") try: - background_tasks.add_task(OrchestrationManager.run_orchestration, user_id, input_task) + current_user_id.set(user_id) # Set context + current_context = contextvars.copy_context() # Capture context + # background_tasks.add_task( + # lambda: current_context.run(lambda:OrchestrationManager().run_orchestration, user_id, input_task) + # ) + + async def run_with_context(): + return await current_context.run(OrchestrationManager().run_orchestration, user_id, input_task) + + background_tasks.add_task(run_with_context) return { "status": "Request started successfully", "session_id": input_task.session_id, + "plan_id": plan_id, } except Exception as e: @@ -144,6 +294,10 @@ async def process_request(background_tasks: BackgroundTasks, input_task: InputTa ) raise HTTPException(status_code=400, detail=f"Error starting request: {e}") from e +@app_v3.post("/api/human_feedback") +async def human_feedback_endpoint(human_feedback: messages.HumanFeedback, request: Request): + pass + @app_v3.post("/upload_team_config") async def upload_team_config_endpoint(request: Request, file: UploadFile = File(...)): @@ -515,6 +669,9 @@ async def delete_team_config_endpoint(team_id: str, request: Request): ) try: + # To do: Check if the team is the users current team, or if it is + # used in any active sessions/plans. Refuse request if so. + # Initialize memory store and service memory_store = await DatabaseFactory.get_database(user_id=user_id) team_service = TeamService(memory_store) @@ -569,7 +726,7 @@ async def get_model_deployments_endpoint(request: Request): try: team_service = TeamService() - deployments = await team_service.list_model_deployments() + deployments = [] #await team_service.extract_models_from_agent() summary = await team_service.get_deployment_status_summary() return {"deployments": deployments, "summary": summary} except Exception as e: @@ -580,53 +737,7 @@ async def get_model_deployments_endpoint(request: Request): @app_v3.post("/select_team") async def select_team_endpoint(selection: TeamSelectionRequest, request: Request): """ - Update team selection for a plan or session. - - Used when users change teams on the plan page. - - --- - tags: - - Team Selection - parameters: - - name: user_principal_id - in: header - type: string - required: true - description: User ID extracted from the authentication header - - name: body - in: body - required: true - schema: - type: object - properties: - team_id: - type: string - description: The ID of the team to select - session_id: - type: string - description: Optional session ID to associate with the team selection - responses: - 200: - description: Team selection updated successfully - schema: - type: object - properties: - status: - type: string - message: - type: string - team_id: - type: string - team_name: - type: string - session_id: - type: string - 400: - description: Invalid request - 401: - description: Missing or invalid user information - 404: - description: Team configuration not found + Select the current team for the user session. """ # Validate user authentication authenticated_user = get_authenticated_user_details(request_headers=request.headers) @@ -645,7 +756,7 @@ async def select_team_endpoint(selection: TeamSelectionRequest, request: Request team_service = TeamService(memory_store) # Verify the team exists and user has access to it - team_config = await team_service.get_team_configuration(selection.team_id, user_id) + team_configuration = await team_service.get_team_configuration(selection.team_id, user_id) if team_config is None: raise HTTPException( status_code=404, @@ -655,8 +766,8 @@ async def select_team_endpoint(selection: TeamSelectionRequest, request: Request # Generate session ID if not provided session_id = selection.session_id or str(uuid.uuid4()) - # Here you could store the team selection in user preferences, session data, etc. - # For now, we'll just validate and return the selection + # save to in-memory config for current user + team_config.set_current_team(user_id=user_id, team_configuration=team_configuration) # Track the team selection event track_event_if_configured( @@ -664,7 +775,7 @@ async def select_team_endpoint(selection: TeamSelectionRequest, request: Request { "status": "success", "team_id": selection.team_id, - "team_name": team_config.name, + "team_name": team_configuration.name, "user_id": user_id, "session_id": session_id, }, @@ -672,12 +783,12 @@ async def select_team_endpoint(selection: TeamSelectionRequest, request: Request return { "status": "success", - "message": f"Team '{team_config.name}' selected successfully", + "message": f"Team '{team_configuration.name}' selected successfully", "team_id": selection.team_id, - "team_name": team_config.name, + "team_name": team_configuration.name, "session_id": session_id, - "agents_count": len(team_config.agents), - "team_description": team_config.description, + "agents_count": len(team_configuration.agents), + "team_description": team_configuration.description, } except HTTPException: @@ -725,4 +836,4 @@ async def get_search_indexes_endpoint(request: Request): return {"search_summary": summary} except Exception as e: logging.error(f"Error retrieving search indexes: {str(e)}") - raise HTTPException(status_code=500, detail="Internal server error occurred") + raise HTTPException(status_code=500, detail="Internal server error occurred") \ No newline at end of file diff --git a/src/backend/v3/callbacks/response_handlers.py b/src/backend/v3/callbacks/response_handlers.py index 800f4a6ae..c6e8d7773 100644 --- a/src/backend/v3/callbacks/response_handlers.py +++ b/src/backend/v3/callbacks/response_handlers.py @@ -2,48 +2,53 @@ Enhanced response callbacks for employee onboarding agent system. Provides detailed monitoring and response handling for different agent types. """ - +import asyncio import sys from semantic_kernel.contents import (ChatMessageContent, StreamingChatMessageContent) +from v3.config.settings import connection_config, current_user_id -coderagent = False -def agent_response_callback(message: ChatMessageContent) -> None: +def agent_response_callback(message: ChatMessageContent, user_id: str = None) -> None: """Observer function to print detailed information about streaming messages.""" - global coderagent # import sys # Get agent name to determine handling agent_name = message.name or "Unknown Agent" - - # Debug information about the message - message_type = type(message).__name__ - metadata = getattr(message, 'metadata', {}) - # when streaming code - list the coder info first once - - if 'code' in metadata and metadata['code'] is True: - if coderagent == False: - print(f"\n🧠 **{agent_name}** [{message_type}]") - print("-" * (len(agent_name) + len(message_type) + 10)) - coderagent = True - print(message.content, end='', flush=False) - return - elif coderagent == True: - coderagent = False + role = getattr(message, 'role', 'unknown') - print(f"\n🧠 **{agent_name}** [{message_type}] (role: {role})") - print("-" * (len(agent_name) + len(message_type) + 10)) + # Send to WebSocket + if user_id: + try: + asyncio.create_task(connection_config.send_status_update_async({ + "type": "agent_message", + "data": {"agent_name": agent_name, "content": message.content, "role": role} + }, user_id)) + except Exception as e: + print(f"Error sending WebSocket message: {e}") + + print(f"\n **{agent_name}** (role: {role})") + if message.items[-1].content_type == 'function_call': print(f"šŸ”§ Function call: {message.items[-1].function_name}, Arguments: {message.items[-1].arguments}") - if metadata: - print(f"šŸ“‹ Metadata: {metadata}") + # Add this function after your agent_response_callback function -async def streaming_agent_response_callback(streaming_message: StreamingChatMessageContent, is_final: bool) -> None: +async def streaming_agent_response_callback(streaming_message: StreamingChatMessageContent, is_final: bool, user_id: str = None) -> None: """Simple streaming callback to show real-time agent responses.""" if streaming_message.name != "CoderAgent": # Print streaming content as it arrives if hasattr(streaming_message, 'content') and streaming_message.content: print(streaming_message.content, end='', flush=False) + + # Send to WebSocket + if user_id: + try: + await connection_config.send_status_update_async({ + "type": "streaming_message", + "data": {"agent_name": streaming_message.name or "Unknown Agent", "content": streaming_message.content, "is_final": is_final} + }, user_id) + except Exception as e: + print(f"Error sending streaming WebSocket message: {e}") \ No newline at end of file diff --git a/src/backend/v3/common/services/foundry_service.py b/src/backend/v3/common/services/foundry_service.py index 9a09aadd9..84d554dc0 100644 --- a/src/backend/v3/common/services/foundry_service.py +++ b/src/backend/v3/common/services/foundry_service.py @@ -1,9 +1,10 @@ -from typing import Any, Dict, List import logging import re -from azure.ai.projects.aio import AIProjectClient +from typing import Any, Dict, List + #from git import List import aiohttp +from azure.ai.projects.aio import AIProjectClient from common.config.app_config import config @@ -54,8 +55,7 @@ async def list_model_deployments(self) -> List[Dict[str, Any]]: try: # Get Azure Management API token (not Cognitive Services token) - credential = config.get_azure_credentials() - token = credential.get_token(config.AZURE_MANAGEMENT_SCOPE) + token = await config.get_access_token() # Extract Azure OpenAI resource name from endpoint URL openai_endpoint = config.AZURE_OPENAI_ENDPOINT diff --git a/src/backend/v3/common/services/team_service.py b/src/backend/v3/common/services/team_service.py index d163634be..7974599f3 100644 --- a/src/backend/v3/common/services/team_service.py +++ b/src/backend/v3/common/services/team_service.py @@ -6,23 +6,14 @@ from typing import Any, Dict, List, Optional, Tuple from azure.core.credentials import AzureKeyCredential -from azure.core.exceptions import ( - ClientAuthenticationError, - HttpResponseError, - ResourceNotFoundError, -) +from azure.core.exceptions import (ClientAuthenticationError, + HttpResponseError, ResourceNotFoundError) from azure.identity import DefaultAzureCredential from azure.search.documents.indexes import SearchIndexClient - -from common.models.messages_kernel import ( - TeamConfiguration, - TeamAgent, - StartingTask, -) - - from common.config.app_config import config from common.database.database_base import DatabaseBase +from common.models.messages_kernel import (StartingTask, TeamAgent, + TeamConfiguration) from v3.common.services.foundry_service import FoundryService @@ -141,12 +132,15 @@ def _validate_and_parse_agent(self, agent_data: Dict[str, Any]) -> TeamAgent: input_key=agent_data["input_key"], type=agent_data["type"], name=agent_data["name"], + deployment_name=agent_data.get("deployment_name", ""), + icon=agent_data["icon"], system_message=agent_data.get("system_message", ""), description=agent_data.get("description", ""), - icon=agent_data["icon"], - index_name=agent_data.get("index_name", ""), use_rag=agent_data.get("use_rag", False), use_mcp=agent_data.get("use_mcp", False), + use_bing=agent_data.get("use_bing", False), + use_reasoning=agent_data.get("use_reasoning", False), + index_name=agent_data.get("index_name", ""), coding_tools=agent_data.get("coding_tools", False), ) @@ -204,19 +198,19 @@ async def get_team_configuration( """ try: # Get the specific configuration using the team-specific method - team_config = await self.memory_context.get_team_by_id(team_id) + team_config = await self.memory_context.get_team(team_id) if team_config is None: return None # Verify the configuration belongs to the user - if team_config.user_id != user_id: - self.logger.warning( - "Access denied: config %s does not belong to user %s", - team_id, - user_id, - ) - return None + # if team_config.user_id != user_id: + # self.logger.warning( + # "Access denied: config %s does not belong to user %s", + # team_id, + # user_id, + # ) + # return None return team_config diff --git a/src/backend/v3/config/settings.py b/src/backend/v3/config/settings.py index 475815645..945993ac9 100644 --- a/src/backend/v3/config/settings.py +++ b/src/backend/v3/config/settings.py @@ -3,11 +3,24 @@ Handles Azure OpenAI, MCP, and environment setup. """ +import asyncio +import contextvars +import json +import logging +import uuid +from typing import Dict, Optional + from common.config.app_config import config +from common.models.messages_kernel import TeamConfiguration +from fastapi import WebSocket from semantic_kernel.agents.orchestration.magentic import MagenticOrchestration from semantic_kernel.connectors.ai.open_ai import ( AzureChatCompletion, OpenAIChatPromptExecutionSettings) +logger = logging.getLogger(__name__) + +# Create a context variable to track current user +current_user_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar('current_user_id', default=None) class AzureConfig: """Azure OpenAI and authentication configuration.""" @@ -21,7 +34,7 @@ def __init__(self): # Create credential self.credential = config.get_azure_credentials() - def create_chat_completion_service(self, use_reasoning_model=False): + async def create_chat_completion_service(self, use_reasoning_model: bool=False): """Create Azure Chat Completion service.""" model_name = ( self.reasoning_model if use_reasoning_model else self.standard_model @@ -30,7 +43,7 @@ def create_chat_completion_service(self, use_reasoning_model=False): return AzureChatCompletion( deployment_name=model_name, endpoint=self.endpoint, - ad_token_provider=config.get_access_token(), + ad_token_provider= await config.get_access_token(), ) def create_execution_settings(self): @@ -42,11 +55,11 @@ class MCPConfig: """MCP server configuration.""" def __init__(self): - self.url = "http://127.0.0.1:8000/mcp/" - self.name = "MCPGreetingServer" - self.description = "MCP server with greeting and planning tools" + self.url = config.MCP_SERVER_ENDPOINT + self.name = config.MCP_SERVER_NAME + self.description = config.MCP_SERVER_DESCRIPTION - def get_headers(self, token): + def get_headers(self, token: str): """Get MCP headers with authentication token.""" return ( {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} @@ -67,9 +80,143 @@ def __init__(self): def get_current_orchestration(self, user_id: str) -> MagenticOrchestration: """get existing orchestration instance.""" return self.orchestrations.get(user_id, None) + +class ConnectionConfig: + """Connection manager for WebSocket connections.""" + + def __init__(self): + self.connections: Dict[str, WebSocket] = {} + # Map user_id to process_id for context-based messaging + self.user_to_process: Dict[str, str] = {} + + def add_connection(self, process_id: str, connection: WebSocket, user_id: str = None): + """Add a new connection.""" + # Close existing connection if it exists + if process_id in self.connections: + try: + asyncio.create_task(self.connections[process_id].close()) + except Exception as e: + logger.error(f"Error closing existing connection for user {process_id}: {e}") + + self.connections[process_id] = connection + # Map user to process for context-based messaging + if user_id: + user_id = str(user_id) + # If this user already has a different process mapped, close that old connection + old_process_id = self.user_to_process.get(user_id) + if old_process_id and old_process_id != process_id: + old_connection = self.connections.get(old_process_id) + if old_connection: + try: + asyncio.create_task(old_connection.close()) + del self.connections[old_process_id] + logger.info(f"Closed old connection {old_process_id} for user {user_id}") + except Exception as e: + logger.error(f"Error closing old connection for user {user_id}: {e}") + + self.user_to_process[user_id] = process_id + logger.info(f"WebSocket connection added for process: {process_id} (user: {user_id})") + else: + logger.info(f"WebSocket connection added for process: {process_id}") + + def remove_connection(self, process_id): + """Remove a connection.""" + process_id = str(process_id) + if process_id in self.connections: + del self.connections[process_id] + + # Remove from user mapping if exists + for user_id, mapped_process_id in list(self.user_to_process.items()): + if mapped_process_id == process_id: + del self.user_to_process[user_id] + logger.debug(f"Removed user mapping: {user_id} -> {process_id}") + break + + def get_connection(self, process_id): + """Get a connection.""" + return self.connections.get(process_id) + + async def close_connection(self, process_id): + """Remove a connection.""" + connection = self.get_connection(process_id) + if connection: + try: + await connection.close() + logger.info("Connection closed for batch ID: %s", process_id) + except Exception as e: + logger.error(f"Error closing connection for {process_id}: {e}") + else: + logger.warning("No connection found for batch ID: %s", process_id) + + # Always remove from connections dict + self.remove_connection(process_id) + logger.info("Connection removed for batch ID: %s", process_id) + + async def send_status_update_async(self, message: any, user_id: Optional[str] = None): + """Send a status update to a specific client.""" + # If no process_id provided, get from context + if user_id is None: + user_id = current_user_id.get() + + if not user_id: + logger.warning("No user_id available for WebSocket message") + return + + process_id = self.user_to_process.get(user_id) + if not process_id: + logger.warning("No active WebSocket process found for user ID: %s", user_id) + logger.debug(f"Available user mappings: {list(self.user_to_process.keys())}") + return + + connection = self.get_connection(process_id) + if connection: + try: + str_message = json.dumps(message, default=str) + await connection.send_text(str_message) + logger.debug(f"Message sent to user {user_id} via process {process_id}") + except Exception as e: + logger.error(f"Failed to send message to user {user_id}: {e}") + # Clean up stale connection + self.remove_connection(process_id) + else: + logger.warning("No connection found for process ID: %s (user: %s)", process_id, user_id) + # Clean up stale mapping + if user_id in self.user_to_process: + del self.user_to_process[user_id] + + def send_status_update(self, message: str, process_id: str): + """Send a status update to a specific client (sync wrapper).""" + process_id = str(process_id) + connection = self.get_connection(process_id) + if connection: + try: + # Use asyncio.create_task instead of run_coroutine_threadsafe + asyncio.create_task(connection.send_text(message)) + except Exception as e: + logger.error(f"Failed to send message to process {process_id}: {e}") + else: + logger.warning("No connection found for process ID: %s", process_id) + +class TeamConfig: + """Team configuration for agents.""" + + def __init__(self): + self.teams: Dict[str, TeamConfiguration] = {} + + def set_current_team(self, user_id: str, team_configuration: TeamConfiguration): + """Add a new team configuration.""" + + # To do: close current team of agents if any + + self.teams[user_id] = team_configuration + def get_current_team(self, user_id: str) -> TeamConfiguration: + """Get the current team configuration.""" + return self.teams.get(user_id, None) # Global config instances azure_config = AzureConfig() mcp_config = MCPConfig() orchestration_config = OrchestrationConfig() +connection_config = ConnectionConfig() +team_config = TeamConfig() diff --git a/src/backend/v3/magentic_agents/common/lifecycle.py b/src/backend/v3/magentic_agents/common/lifecycle.py index 1f7742797..36acada97 100644 --- a/src/backend/v3/magentic_agents/common/lifecycle.py +++ b/src/backend/v3/magentic_agents/common/lifecycle.py @@ -60,29 +60,29 @@ async def _after_open(self) -> None: """Subclasses must build self._agent here.""" raise NotImplementedError - # Internals - def _build_mcp_headers(self) -> dict: - if not self.mcp_cfg.client_id: - return {} - self.cred = InteractiveBrowserCredential( - tenant_id=self.mcp_cfg.tenant_id or None, - client_id=self.mcp_cfg.client_id, - ) - tok = self.cred.get_token(f"api://{self.mcp_cfg.client_id}/access_as_user") - return { - "Authorization": f"Bearer {tok.token}", - "Content-Type": "application/json", - } + # For use when implementing bearer token auth + # def _build_mcp_headers(self) -> dict: + # if not self.mcp_cfg.client_id: + # return {} + # self.cred = InteractiveBrowserCredential( + # tenant_id=self.mcp_cfg.tenant_id or None, + # client_id=self.mcp_cfg.client_id, + # ) + # tok = self.cred.get_token(f"api://{self.mcp_cfg.client_id}/access_as_user") + # return { + # "Authorization": f"Bearer {tok.token}", + # "Content-Type": "application/json", + # } async def _enter_mcp_if_configured(self) -> None: if not self.mcp_cfg: return - headers = self._build_mcp_headers() + #headers = self._build_mcp_headers() plugin = MCPStreamableHttpPlugin( name=self.mcp_cfg.name, description=self.mcp_cfg.description, url=self.mcp_cfg.url, - headers=headers, + #headers=headers, ) # Enter MCP async context via the stack to ensure correct LIFO cleanup if self._stack is None: diff --git a/src/backend/v3/magentic_agents/magentic_agent_factory.py b/src/backend/v3/magentic_agents/magentic_agent_factory.py index e32170aa6..e60b7d390 100644 --- a/src/backend/v3/magentic_agents/magentic_agent_factory.py +++ b/src/backend/v3/magentic_agents/magentic_agent_factory.py @@ -8,12 +8,14 @@ from types import SimpleNamespace from typing import List, Union +from common.models.messages_kernel import TeamConfiguration +from v3.config.settings import current_user_id from v3.magentic_agents.foundry_agent import FoundryAgentTemplate from v3.magentic_agents.models.agent_models import (BingConfig, MCPConfig, SearchConfig) from v3.magentic_agents.proxy_agent import ProxyAgent from v3.magentic_agents.reasoning_agent import ReasoningAgentTemplate - +from common.config.app_config import config class UnsupportedModelError(Exception): """Raised when an unsupported model is specified.""" @@ -32,12 +34,12 @@ def __init__(self): self.logger = logging.getLogger(__name__) self._agent_list: List = [] - @staticmethod - def parse_team_config(file_path: Union[str, Path]) -> SimpleNamespace: - """Parse JSON file into objects using SimpleNamespace.""" - with open(file_path, 'r') as f: - data = json.load(f) - return json.loads(json.dumps(data), object_hook=lambda d: SimpleNamespace(**d)) + # @staticmethod + # def parse_team_config(file_path: Union[str, Path]) -> SimpleNamespace: + # """Parse JSON file into objects using SimpleNamespace.""" + # with open(file_path, 'r') as f: + # data = json.load(f) + # return json.loads(json.dumps(data), object_hook=lambda d: SimpleNamespace(**d)) async def create_agent_from_config(self, agent_obj: SimpleNamespace) -> Union[FoundryAgentTemplate, ReasoningAgentTemplate, ProxyAgent]: """ @@ -59,10 +61,11 @@ async def create_agent_from_config(self, agent_obj: SimpleNamespace) -> Union[Fo if not deployment_name and agent_obj.name.lower() == "proxyagent": self.logger.info("Creating ProxyAgent") - return ProxyAgent() + user_id = current_user_id.get() + return ProxyAgent(user_id=user_id) # Validate supported models - supported_models = json.loads(os.getenv("SUPPORTED_MODELS")) + supported_models = json.loads(config.SUPPORTED_MODELS) if deployment_name not in supported_models: raise UnsupportedModelError(f"Model '{deployment_name}' not supported. Supported: {supported_models}") @@ -92,7 +95,7 @@ async def create_agent_from_config(self, agent_obj: SimpleNamespace) -> Union[Fo # Create appropriate agent if use_reasoning: # Get reasoning specific configuration - azure_openai_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT") + azure_openai_endpoint = config.AZURE_OPENAI_ENDPOINT agent = ReasoningAgentTemplate( agent_name=agent_obj.name, @@ -119,34 +122,31 @@ async def create_agent_from_config(self, agent_obj: SimpleNamespace) -> Union[Fo self.logger.info(f"Successfully created and initialized agent '{agent_obj.name}'") return agent - async def get_agents(self, file_path: str) -> List: + async def get_agents(self, team_config_input: TeamConfiguration) -> List: """ Create and return a team of agents from JSON configuration. Args: - file_path: Path to the JSON team configuration file - team_model: Optional model override for all agents in the team + team_config_input: team configuration object from cosmos db Returns: List of initialized agent instances """ - self.logger.info(f"Loading team configuration from: {file_path}") + # self.logger.info(f"Loading team configuration from: {file_path}") try: - team = self.parse_team_config(file_path) - self.logger.info(f"Parsed team '{team.name}' with {len(team.agents)} agents") - agents = [] + initalized_agents = [] - for i, agent_cfg in enumerate(team.agents, 1): + for i, agent_cfg in enumerate(team_config_input.agents, 1): try: - self.logger.info(f"Creating agent {i}/{len(team.agents)}: {agent_cfg.name}") + self.logger.info(f"Creating agent {i}/{len(team_config_input.agents)}: {agent_cfg.name}") agent = await self.create_agent_from_config(agent_cfg) - agents.append(agent) + initalized_agents.append(agent) self._agent_list.append(agent) # Keep track for cleanup - self.logger.info(f"āœ… Agent {i}/{len(team.agents)} created: {agent_cfg.name}") + self.logger.info(f"āœ… Agent {i}/{len(team_config_input.agents)} created: {agent_cfg.name}") except (UnsupportedModelError, InvalidConfigurationError) as e: self.logger.warning(f"Skipped agent {agent_cfg.name}: {e}") @@ -155,8 +155,8 @@ async def get_agents(self, file_path: str) -> List: self.logger.error(f"Failed to create agent {agent_cfg.name}: {e}") continue - self.logger.info(f"Successfully created {len(agents)}/{len(team.agents)} agents for team '{team.name}'") - return agents + self.logger.info(f"Successfully created {len(initalized_agents)}/{len(team_config_input.agents)} agents for team '{team_config_input.name}'") + return initalized_agents except Exception as e: self.logger.error(f"Failed to load team configuration: {e}") diff --git a/src/backend/v3/magentic_agents/models/agent_models.py b/src/backend/v3/magentic_agents/models/agent_models.py index 4e7769e27..9afdcf0fe 100644 --- a/src/backend/v3/magentic_agents/models/agent_models.py +++ b/src/backend/v3/magentic_agents/models/agent_models.py @@ -2,7 +2,7 @@ import os from dataclasses import dataclass - +from common.config.app_config import config @dataclass(slots=True) class MCPConfig: @@ -15,12 +15,12 @@ class MCPConfig: @classmethod def from_env(cls) -> "MCPConfig": - url = os.getenv("MCP_SERVER_ENDPOINT") - name = os.getenv("MCP_SERVER_NAME") - description = os.getenv("MCP_SERVER_DESCRIPTION") - tenant_id = os.getenv("TENANT_ID") - client_id = os.getenv("CLIENT_ID") - + url = config.MCP_SERVER_ENDPOINT + name = config.MCP_SERVER_NAME + description = config.MCP_SERVER_DESCRIPTION + tenant_id = config.TENANT_ID + client_id = config.CLIENT_ID + # Raise exception if any required environment variable is missing if not all([url, name, description, tenant_id, client_id]): raise ValueError(f"{cls.__name__} Missing required environment variables") @@ -40,7 +40,7 @@ class BingConfig: @classmethod def from_env(cls) -> "BingConfig": - connection_name = os.getenv("BING_CONNECTION_NAME") + connection_name = config.BING_CONNECTION_NAME # Raise exception if required environment variable is missing if not connection_name: @@ -60,11 +60,11 @@ class SearchConfig: @classmethod def from_env(cls) -> "SearchConfig": - connection_name = os.getenv("AZURE_AI_SEARCH_CONNECTION_NAME") - index_name = os.getenv("AZURE_AI_SEARCH_INDEX_NAME") - endpoint = os.getenv("AZURE_AI_SEARCH_ENDPOINT") - api_key = os.getenv("AZURE_AI_SEARCH_API_KEY") - + connection_name = config.AZURE_AI_SEARCH_CONNECTION_NAME + index_name = config.AZURE_AI_SEARCH_INDEX_NAME + endpoint = config.AZURE_AI_SEARCH_ENDPOINT + api_key = config.AZURE_AI_SEARCH_API_KEY + # Raise exception if any required environment variable is missing if not all([connection_name, index_name, endpoint]): raise ValueError(f"{cls.__name__} Missing required Azure Search environment variables") @@ -74,4 +74,4 @@ def from_env(cls) -> "SearchConfig": index_name=index_name, endpoint=endpoint, api_key=api_key, - ) \ No newline at end of file + ) diff --git a/src/backend/v3/magentic_agents/proxy_agent.py b/src/backend/v3/magentic_agents/proxy_agent.py index d2f7bf0a9..d6feab062 100644 --- a/src/backend/v3/magentic_agents/proxy_agent.py +++ b/src/backend/v3/magentic_agents/proxy_agent.py @@ -4,18 +4,24 @@ import logging import uuid from collections.abc import AsyncIterable -from typing import AsyncIterator +from typing import AsyncIterator, Optional +import v3.models.messages as agent_messages +from pydantic import Field from semantic_kernel.agents import ( # pylint: disable=no-name-in-module AgentResponseItem, AgentThread) from semantic_kernel.agents.agent import Agent -from semantic_kernel.contents import AuthorRole, ChatMessageContent +from semantic_kernel.contents import (AuthorRole, ChatMessageContent, + StreamingChatMessageContent) from semantic_kernel.contents.chat_history import ChatHistory from semantic_kernel.contents.history_reducer.chat_history_reducer import \ ChatHistoryReducer from semantic_kernel.exceptions.agent_exceptions import \ AgentThreadOperationException from typing_extensions import override +from v3.callbacks.response_handlers import (agent_response_callback, + streaming_agent_response_callback) +from v3.config.settings import current_user_id class DummyAgentThread(AgentThread): @@ -82,15 +88,51 @@ def __init__(self, message: ChatMessageContent, thread: AgentThread): class ProxyAgent(Agent): """Simple proxy agent that prompts for human clarification.""" + + # Declare as Pydantic field + user_id: Optional[str] = Field(default=None, description="User ID for WebSocket messaging") - def __init__(self): + def __init__(self, user_id: str = None, **kwargs): + # Get user_id from parameter or context, fallback to empty string + effective_user_id = user_id or current_user_id.get() or "" super().__init__( - name="ProxyAgent", + name="ProxyAgent", description="""Call this agent when you need to clarify requests by asking the human user for more information. Ask it for more details about any unclear requirements, missing information, - or if you need the user to elaborate on any aspect of the task.""" + or if you need the user to elaborate on any aspect of the task.""", + user_id=effective_user_id, + **kwargs ) self.instructions = "" + + def _create_message_content(self, content: str, thread_id: str = None) -> ChatMessageContent: + """Create a ChatMessageContent with proper metadata.""" + return ChatMessageContent( + role=AuthorRole.ASSISTANT, + content=content, + name=self.name, + metadata={"thread_id": thread_id} if thread_id else {} + ) + + async def _trigger_response_callbacks(self, message_content: ChatMessageContent): + """Manually trigger the same response callbacks used by other agents.""" + # Get current user_id dynamically instead of using stored value + current_user = current_user_id.get() or self.user_id or "" + + # Trigger the standard agent response callback + agent_response_callback(message_content, current_user) + + async def _trigger_streaming_callbacks(self, content: str, is_final: bool = False): + """Manually trigger streaming callbacks for real-time updates.""" + # Get current user_id dynamically instead of using stored value + current_user = current_user_id.get() or self.user_id or "" + streaming_message = StreamingChatMessageContent( + role=AuthorRole.ASSISTANT, + content=content, + name=self.name, + choice_index=0 + ) + await streaming_agent_response_callback(streaming_message, is_final, current_user) async def invoke(self, message: str,*, thread: AgentThread | None = None,**kwargs) -> AsyncIterator[ChatMessageContent]: """Ask human user for clarification about the message.""" @@ -101,12 +143,12 @@ async def invoke(self, message: str,*, thread: AgentThread | None = None,**kwarg construct_thread=lambda: DummyAgentThread(), expected_type=DummyAgentThread, ) - # Replace with websocket call when available - print(f"\nšŸ¤” ProxyAgent: Another agent is asking for clarification about:") - print(f" Request: {message}") - print("-" * 60) + # Send clarification request via response handlers + clarification_request = f"I need clarification about: {message}" + clarification_message = self._create_message_content(clarification_request, thread.id) + await self._trigger_response_callbacks(clarification_message) - # Get human input + # Get human input - replace this with awaiting a websocket call or API handler when available human_response = input("Please provide clarification: ").strip() if not human_response: @@ -114,12 +156,10 @@ async def invoke(self, message: str,*, thread: AgentThread | None = None,**kwarg response = f"Human clarification: {human_response}" - chat_message = ChatMessageContent( - role=AuthorRole.ASSISTANT, - content=response, - name=self.name, - metadata={"thread_id": thread.id} - ) + # Send response via response handlers + response_message = self._create_message_content(response, thread.id) + + chat_message = response_message yield AgentResponseItem( message=chat_message, @@ -144,10 +184,10 @@ async def invoke_stream(self, messages, thread=None, **kwargs) -> AsyncIterator[ else: message = str(messages) - # Replace with websocket call when available - print(f"\nProxyAgent: Another agent is asking for clarification about:") - print(f" Request: {message}") - print("-" * 60) + # Send clarification request via streaming callbacks + clarification_request = f"I need clarification about: {message}" + self._create_message_content(clarification_request, thread.id) + await self._trigger_streaming_callbacks(clarification_request) # Get human input - replace with websocket call when available human_response = input("Please provide clarification: ").strip() @@ -156,13 +196,8 @@ async def invoke_stream(self, messages, thread=None, **kwargs) -> AsyncIterator[ human_response = "No additional clarification provided." response = f"Human clarification: {human_response}" - - chat_message = ChatMessageContent( - role=AuthorRole.ASSISTANT, - content=response, - name=self.name, - metadata={"thread_id": thread.id} - ) + + chat_message = self._create_message_content(response, thread.id) yield AgentResponseItem( message=chat_message, @@ -184,6 +219,6 @@ async def get_response(self, chat_history, **kwargs): content="No clarification provided." ) -async def create_proxy_agent(): +async def create_proxy_agent(user_id: str = None): """Factory function for human proxy agent.""" - return ProxyAgent() \ No newline at end of file + return ProxyAgent(user_id=user_id) \ No newline at end of file diff --git a/src/backend/v3/magentic_agents/reasoning_agent.py b/src/backend/v3/magentic_agents/reasoning_agent.py index 8dcc41540..615b423fc 100644 --- a/src/backend/v3/magentic_agents/reasoning_agent.py +++ b/src/backend/v3/magentic_agents/reasoning_agent.py @@ -2,6 +2,7 @@ import os from azure.identity import DefaultAzureCredential as SyncDefaultAzureCredential +from common.config.app_config import config from semantic_kernel import Kernel from semantic_kernel.agents import ChatCompletionAgent # pylint: disable=E0611 from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion @@ -37,17 +38,11 @@ def __init__(self, agent_name: str, async def _after_open(self) -> None: self.kernel = Kernel() - # Token provider for SK chat completion - sync_cred = SyncDefaultAzureCredential() - - def ad_token_provider() -> str: - token = sync_cred.get_token("https://cognitiveservices.azure.com/.default") - return token.token chat = AzureChatCompletion( deployment_name=self._model_deployment_name, endpoint=self._openai_endpoint, - ad_token_provider=ad_token_provider + ad_token_provider= await config.get_access_token() ) self.kernel.add_service(chat) diff --git a/src/backend/v3/models/messages.py b/src/backend/v3/models/messages.py new file mode 100644 index 000000000..8ad28aba4 --- /dev/null +++ b/src/backend/v3/models/messages.py @@ -0,0 +1,112 @@ +"""Messages from the backend to the frontend via WebSocket.""" + +from dataclasses import dataclass +from typing import Any, Dict, List, Literal, Optional + +from semantic_kernel.kernel_pydantic import Field, KernelBaseModel +from v3.models.models import MPlan, PlanStatus + + +@dataclass(slots=True) +class AgentMessage: + """Message from the backend to the frontend via WebSocket.""" + agent_name: str + timestamp: str + content: str + +@dataclass(slots=True) +class AgentStreamStart: + """Start of a streaming message from the backend to the frontend via WebSocket.""" + agent_name: str + +@dataclass(slots=True) +class AgentStreamEnd: + """End of a streaming message from the backend to the frontend via WebSocket.""" + agent_name: str + +@dataclass(slots=True) +class AgentMessageStreaming: + """Streaming message from the backend to the frontend via WebSocket.""" + agent_name: str + content: str + is_final: bool = False + +@dataclass(slots=True) +class AgentToolMessage: + """Message from an agent using a tool.""" + agent_name: str + tool_name: str + input: str + +@dataclass(slots=True) +class PlanApprovalRequest: + """Request for plan approval from the frontend.""" + plan: MPlan + status: PlanStatus + + context: dict | None = None + +@dataclass(slots=True) +class PlanApprovalResponse: + """Response for plan approval from the frontend.""" + approved: bool + feedback: str | None = None + +@dataclass(slots=True) +class ReplanApprovalRequest: + """Request for replan approval from the frontend.""" + reason: str + context: dict | None = None + +@dataclass(slots=True) +class ReplanApprovalResponse: + """Response for replan approval from the frontend.""" + approved: bool + feedback: str | None = None + +@dataclass(slots=True) +class UserClarificationRequest: + """Request for user clarification from the frontend.""" + question: str + context: dict | None = None + +@dataclass(slots=True) +class UserClarificationResponse: + """Response for user clarification from the frontend.""" + def __init__(self, answer: str): + self.answer = answer + +@dataclass(slots=True) +class FinalResultMessage: + """Final result message from the backend to the frontend.""" + result: str + summary: str | None = None + context: dict | None = None + +class HumanFeedback(KernelBaseModel): + """Message containing human feedback on a step.""" + + step_id: Optional[str] = None + plan_id: str + session_id: str + approved: bool + human_feedback: Optional[str] = None + updated_action: Optional[str] = None + + +class HumanClarification(KernelBaseModel): + """Message containing human clarification on a plan.""" + + plan_id: str + session_id: str + human_clarification: str + +class ApprovalRequest(KernelBaseModel): + """Message sent to HumanAgent to request approval for a step.""" + + step_id: str + plan_id: str + session_id: str + user_id: str + action: str + agent_name: str diff --git a/src/backend/v3/orchestration/human_approval_manager.py b/src/backend/v3/orchestration/human_approval_manager.py index 0ebc3abaf..2e358fe51 100644 --- a/src/backend/v3/orchestration/human_approval_manager.py +++ b/src/backend/v3/orchestration/human_approval_manager.py @@ -6,10 +6,14 @@ import re from typing import Any, List, Optional +import v3.models.messages as messages from semantic_kernel.agents import Agent from semantic_kernel.agents.orchestration.magentic import ( MagenticContext, StandardMagenticManager) +from semantic_kernel.agents.orchestration.prompts._magentic_prompts import \ + ORCHESTRATOR_TASK_LEDGER_FACTS_PROMPT from semantic_kernel.contents import ChatMessageContent +from v3.config.settings import connection_config, current_user_id from v3.models.models import MPlan, MStep @@ -22,12 +26,26 @@ class HumanApprovalMagenticManager(StandardMagenticManager): # Define Pydantic fields to avoid validation errors approval_enabled: bool = True magentic_plan: Optional[MPlan] = None + current_user_id: Optional[str] = None + def __init__(self, *args, **kwargs): # Remove any custom kwargs before passing to parent - super().__init__(*args, **kwargs) + + # Use object.__setattr__ to bypass Pydantic validation + # object.__setattr__(self, 'current_user_id', None) + + custom_addition = """ +To address this request we have assembled the following team: + +{{$team}} + +Please check with the team members to list all relevant tools they have access to, and their required parameters.""" + + kwargs['task_ledger_facts_prompt'] = ORCHESTRATOR_TASK_LEDGER_FACTS_PROMPT + custom_addition + super().__init__(*args, **kwargs) - async def plan(self, magentic_context) -> Any: + async def plan(self, magentic_context: MagenticContext) -> Any: """ Override the plan method to create the plan first, then ask for approval before execution. """ @@ -38,70 +56,85 @@ async def plan(self, magentic_context) -> Any: elif not isinstance(task_text, str): task_text = str(task_text) - print(f"\nšŸŽÆ Human-in-the-Loop Magentic Manager Creating Plan:") + print(f"\n Human-in-the-Loop Magentic Manager Creating Plan:") print(f" Task: {task_text}") print("-" * 60) # First, let the parent create the actual plan - print("šŸ“‹ Creating execution plan...") + print(" Creating execution plan...") plan = await super().plan(magentic_context) - self.magentic_plan = self.plan_to_obj( magentic_context, self.task_ledger) + + # Request approval from the user before executing the plan + approval_message = messages.PlanApprovalRequest( + plan=self.magentic_plan, + status="PENDING_APPROVAL", + context={ + "task": task_text, + "participant_descriptions": magentic_context.participant_descriptions + } if hasattr(magentic_context, 'participant_descriptions') else {} + ) + + # Send the current plan to the frontend via WebSocket + #await connection_config.send_status_update_async(approval_message,) - # If planning failed or returned early, just return the result - if isinstance(plan, ChatMessageContent): - # Now show the actual plan and ask for approval - plan_approved = await self._get_plan_approval_with_details( - task_text, - magentic_context.participant_descriptions, - plan - ) - if not plan_approved: - print("āŒ Plan execution cancelled by user") - return ChatMessageContent( - role="assistant", - content="Plan execution was cancelled by the user." - ) - - # If we get here, plan is approved - return the plan for execution - print("āœ… Plan approved - proceeding with execution...") - return plan + # Send the approval request to the user's WebSocket + # The user_id will be automatically retrieved from context + await connection_config.send_status_update_async({ + "type": "plan_approval_request", + "data": approval_message + }) - # If plan is not a ChatMessageContent, still show it and ask for approval - if self._approval_settings['enabled']: - plan_approved = await self._get_plan_approval_with_details( - task_text, - magentic_context.participant_descriptions, - plan - ) - if not plan_approved: - print("āŒ Plan execution cancelled by user") - return ChatMessageContent( - role="assistant", - content="Plan execution was cancelled by the user." - ) + # Wait for user approval (you'll need to implement this) + approval_response = await self._wait_for_user_approval() - # If we get here, plan is approved - return the plan for execution - print("āœ… Plan approved - proceeding with execution...") - return plan + if approval_response and approval_response.approved: + print("Plan approved - proceeding with execution...") + return plan + else: + print("Plan execution cancelled by user") + await connection_config.send_status_update_async({ + "type": "plan_approval_response", + "data": approval_response + }) + raise Exception("Plan execution cancelled by user") + # return ChatMessageContent( + # role="assistant", + # content="Plan execution was cancelled by the user." + # ) + + + async def _wait_for_user_approval(self) -> Optional[messages.PlanApprovalResponse]: + """Wait for user approval response.""" + user_id = current_user_id.get() + # Temporarily use console input for approval - will switch to WebSocket or API in future + response = input("\nApprove this execution plan? [y/n]: ").strip().lower() + if response in ['y', 'yes']: + return messages.PlanApprovalResponse(approved=True) + elif response in ['n', 'no']: + return messages.PlanApprovalResponse(approved=False) + else: + print("Invalid input. Please enter 'y' for yes or 'n' for no.") + return await self._wait_for_user_approval() + async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatMessageContent: """ Override to ensure final answer is prepared after all steps are executed. """ - print("\nšŸ“ Magentic Manager - Preparing final answer...") + print("\n Magentic Manager - Preparing final answer...") return await super().prepare_final_answer(magentic_context) async def _get_plan_approval_with_details(self, task: str, participant_descriptions: dict, plan: Any) -> bool: while True: - approval = input("\nā“ Approve this execution plan? [y/n/details]: ").strip().lower() + approval = input("\ Approve this execution plan? [y/n/details]: ").strip().lower() if approval in ['y', 'yes']: - print("āœ… Plan approved by user") + print(" Plan approved by user") return True elif approval in ['n', 'no']: - print("āŒ Plan rejected by user") + print(" Plan rejected by user") return False # elif approval in ['d', 'details']: # self._show_detailed_plan_info(task, participant_descriptions, plan) diff --git a/src/backend/v3/orchestration/orchestration_manager.py b/src/backend/v3/orchestration/orchestration_manager.py index d27befed8..a2f31b25d 100644 --- a/src/backend/v3/orchestration/orchestration_manager.py +++ b/src/backend/v3/orchestration/orchestration_manager.py @@ -1,28 +1,41 @@ # Copyright (c) Microsoft. All rights reserved. """ Orchestration manager to handle the orchestration logic. """ +import contextvars import os import uuid -from typing import List +from contextvars import ContextVar +from typing import List, Optional from azure.identity import DefaultAzureCredential as SyncDefaultAzureCredential +from common.config.app_config import config +from common.models.messages_kernel import TeamConfiguration from semantic_kernel.agents.orchestration.magentic import MagenticOrchestration from semantic_kernel.agents.runtime import InProcessRuntime # Create custom execution settings to fix schema issues from semantic_kernel.connectors.ai.open_ai import ( AzureChatCompletion, OpenAIChatPromptExecutionSettings) +from semantic_kernel.contents import (ChatMessageContent, + StreamingChatMessageContent) from v3.callbacks.response_handlers import (agent_response_callback, streaming_agent_response_callback) -from v3.config.settings import config, orchestration_config +from v3.config.settings import (config, connection_config, current_user_id, + orchestration_config) from v3.magentic_agents.magentic_agent_factory import MagenticAgentFactory from v3.orchestration.human_approval_manager import \ HumanApprovalMagenticManager +# Context variable to hold the current user ID +current_user_id: ContextVar[Optional[str]] = contextvars.ContextVar("current_user_id", default=None) class OrchestrationManager: """Manager for handling orchestration logic.""" + + def __init__(self): + self.user_id: Optional[str] = None + @classmethod - async def init_orchestration(cls, agents: List)-> MagenticOrchestration: + async def init_orchestration(cls, agents: List, user_id: str = None)-> MagenticOrchestration: """Main function to run the agents.""" # Custom execution settings that should work with Azure OpenAI @@ -31,43 +44,55 @@ async def init_orchestration(cls, agents: List)-> MagenticOrchestration: temperature=0.1 ) - # Create a token provider function for Azure OpenAI credential = SyncDefaultAzureCredential() def get_token(): token = credential.get_token("https://cognitiveservices.azure.com/.default") return token.token - + # 1. Create a Magentic orchestration with Azure OpenAI magentic_orchestration = MagenticOrchestration( members=agents, manager=HumanApprovalMagenticManager( chat_completion_service=AzureChatCompletion( - deployment_name=os.getenv("AZURE_OPENAI_MODEL_NAME"), - endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"), + deployment_name=config.AZURE_OPENAI_DEPLOYMENT_NAME, + endpoint=config.AZURE_OPENAI_ENDPOINT, ad_token_provider=get_token # Use token provider function ), execution_settings=execution_settings ), - agent_response_callback=agent_response_callback, - streaming_agent_response_callback=streaming_agent_response_callback, # Add streaming callback + agent_response_callback=cls._user_aware_agent_callback(user_id), + streaming_agent_response_callback=cls._user_aware_streaming_callback(user_id) ) return magentic_orchestration + @staticmethod + def _user_aware_agent_callback(user_id: str): + """Factory method that creates a callback with captured user_id""" + def callback(message: ChatMessageContent): + return agent_response_callback(message, user_id) + return callback + + @staticmethod + def _user_aware_streaming_callback(user_id: str): + """Factory method that creates a streaming callback with captured user_id""" + async def callback(streaming_message: StreamingChatMessageContent, is_final: bool): + return await streaming_agent_response_callback(streaming_message, is_final, user_id) + return callback + @classmethod - async def get_current_orchestration(cls, user_id: str) -> MagenticOrchestration: + async def get_current_or_new_orchestration(self, user_id: str, team_config: TeamConfiguration) -> MagenticOrchestration: """get existing orchestration instance.""" current_orchestration = orchestration_config.get_current_orchestration(user_id) if current_orchestration is None: factory = MagenticAgentFactory() - # to do: change to parsing teams from cosmos db - agents = await factory.get_agents(config.AGENT_TEAM_FILE) - orchestration_config.orchestrations[user_id] = await cls.init_orchestration(agents) + agents = await factory.get_agents(team_config_input=team_config) + orchestration_config.orchestrations[user_id] = await self.init_orchestration(agents, user_id) return orchestration_config.get_current_orchestration(user_id) - @classmethod - async def run_orchestration(cls, user_id, input_task) -> None: + async def run_orchestration(self, user_id, input_task) -> None: """ Run the orchestration with user input loop.""" + token = current_user_id.set(user_id) job_id = str(uuid.uuid4()) orchestration_config.approvals[job_id] = None @@ -77,6 +102,13 @@ async def run_orchestration(cls, user_id, input_task) -> None: if magentic_orchestration is None: raise ValueError("Orchestration not initialized for user.") + try: + if hasattr(magentic_orchestration, '_manager') and hasattr(magentic_orchestration._manager, 'current_user_id'): + magentic_orchestration._manager.current_user_id = user_id + print(f"šŸ” DEBUG: Set user_id on manager = {user_id}") + except Exception as e: + print(f"Error setting user_id on manager: {e}") + runtime = InProcessRuntime() runtime.start() @@ -92,6 +124,17 @@ async def run_orchestration(cls, user_id, input_task) -> None: value = await orchestration_result.get() print(f"\nFinal result:\n{value}") print("=" * 50) + + # Send final result via WebSocket + await connection_config.send_status_update_async({ + "type": "final_result", + "data": { + "content": str(value), + "status": "completed", + "timestamp": str(uuid.uuid4()) # or use actual timestamp + } + }, user_id) + print(f"Final result sent via WebSocket to user {user_id}") except Exception as e: print(f"Error: {e}") print(f"Error type: {type(e).__name__}") @@ -103,3 +146,5 @@ async def run_orchestration(cls, user_id, input_task) -> None: print(f"Unexpected error: {e}") finally: await runtime.stop_when_idle() + current_user_id.reset(token) + diff --git a/src/backend/v3/scenarios/__init__.py b/src/backend/v3/scenarios/__init__.py deleted file mode 100644 index 99ae28fd1..000000000 --- a/src/backend/v3/scenarios/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# Scenarios package for employee onboarding test cases and examples diff --git a/src/backend/v3/scenarios/onboarding_cases.py b/src/backend/v3/scenarios/onboarding_cases.py deleted file mode 100644 index 435856e63..000000000 --- a/src/backend/v3/scenarios/onboarding_cases.py +++ /dev/null @@ -1,141 +0,0 @@ -""" -Employee onboarding and other scenarios and test cases for the Magentic orchestration system. -Provides realistic use cases to demonstrate multi-agent collaboration. -""" - -class MagenticScenarios: - """Collection of employee onboarding scenarios for testing and demonstration.""" - - # Basic onboarding scenarios - WELCOME_NEW_HIRE = """ - A new software engineer named Sarah is starting at our tech company next Monday. - Help create a comprehensive first-week onboarding plan that includes: - - Welcome activities and team introductions - - Required training modules and timeline - - Equipment setup and access provisioning - - Key company policies to review - - First-week goals and expectations - - Research current best practices for remote software engineer onboarding in 2025. - """ - - ONBOARDING_METRICS_ANALYSIS = """ - Analyze our employee onboarding effectiveness using this sample data: - - Average time to first meaningful contribution: 45 days - - 90-day retention rate: 85% - - Employee satisfaction score (1-10): 7.2 - - Training completion rate: 78% - - Manager feedback score: 8.1 - - Compare these metrics to industry benchmarks and recommend improvements. - Create visualizations showing our performance vs. industry standards. - """ - - COMPLIANCE_RESEARCH = """ - Research the latest compliance requirements for employee onboarding in the technology sector for 2025. - Focus on: - - Data privacy and security training requirements - - Remote work compliance considerations - - Diversity, equity, and inclusion training mandates - - Industry-specific certifications needed - - Provide a comprehensive compliance checklist with implementation timeline. - """ - - REMOTE_ONBOARDING_OPTIMIZATION = """ - Our company is transitioning to a fully remote workforce. Design an optimized remote onboarding experience that addresses: - - Virtual team integration strategies - - Digital tool training and setup - - Remote culture building activities - - Asynchronous learning paths - - Virtual mentorship programs - - Research the most effective remote onboarding tools and platforms available in 2025. - """ - - ONBOARDING_COST_ANALYSIS = """ - Calculate the total cost of our current onboarding program and identify optimization opportunities: - - HR staff time allocation (40 hours per new hire) - - Training material costs ($500 per hire) - - Technology setup and licensing ($1,200 per hire) - - Manager mentoring time (20 hours per hire) - - Lost productivity during ramp-up period - - Research cost-effective alternatives and calculate potential ROI improvements. - """ - - MANAGER_TRAINING_PROGRAM = """ - Design a comprehensive training program for managers who will be onboarding new team members: - - Essential management skills for onboarding - - Communication best practices for new hires - - Goal setting and expectation management - - Cultural integration techniques - - Performance tracking during probation period - - Include current research on effective management practices for Gen Z employees entering the workforce. - """ - - TECH_STACK_ONBOARDING = """ - Create a detailed technical onboarding plan for a new developer joining our team that uses: - - React/TypeScript frontend - - Python/Django backend - - PostgreSQL database - - AWS cloud infrastructure - - Docker containerization - - Git/GitHub workflow - - Include learning resources, hands-on exercises, and milestone checkpoints for each technology. - Research the latest learning resources and tutorials for 2025. - """ - - FEEDBACK_ANALYSIS = """ - Analyze this employee feedback from our recent onboarding survey and recommend improvements: - - Positive feedback: - - "Great team culture and welcoming environment" - - "Clear initial project assignments" - - "Excellent technical mentorship" - - Areas for improvement: - - "Administrative processes were confusing" - - "Too much information in first week" - - "Unclear long-term career path discussion" - - "Limited social interaction opportunities" - - Provide specific, actionable recommendations with implementation priorities. - """ - - OFFICIAL_DEMO = """ - I am preparing a report on the energy efficiency of different machine learning model architectures. - Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 - on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). - Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 VM - for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model - per task type (image classification, text classification, and text generation). - """ - - @classmethod - def get_all_scenarios(cls): - """Get all onboarding scenarios as a dictionary.""" - return { - "welcome_new_hire": cls.WELCOME_NEW_HIRE, - "metrics_analysis": cls.ONBOARDING_METRICS_ANALYSIS, - "compliance_research": cls.COMPLIANCE_RESEARCH, - "remote_optimization": cls.REMOTE_ONBOARDING_OPTIMIZATION, - "cost_analysis": cls.ONBOARDING_COST_ANALYSIS, - "manager_training": cls.MANAGER_TRAINING_PROGRAM, - "tech_stack": cls.TECH_STACK_ONBOARDING, - "feedback_analysis": cls.FEEDBACK_ANALYSIS, - "official_demo": cls.OFFICIAL_DEMO, - } - - @classmethod - def get_scenario_names(cls): - """Get list of available scenario names.""" - return list(cls.get_all_scenarios().keys()) - - @classmethod - def get_scenario(cls, name: str): - """Get a specific scenario by name.""" - scenarios = cls.get_all_scenarios() - return scenarios.get(name, None) \ No newline at end of file diff --git a/src/frontend/src/App.tsx b/src/frontend/src/App.tsx index e5f5ce4ff..79d54ede3 100644 --- a/src/frontend/src/App.tsx +++ b/src/frontend/src/App.tsx @@ -1,9 +1,12 @@ import React from 'react'; import './App.css'; import { BrowserRouter as Router, Routes, Route, Navigate } from 'react-router-dom'; -import { HomePage, PlanPage, PlanCreatePage } from './pages'; +import { HomePage, PlanPage } from './pages'; +import { useWebSocket } from './hooks/useWebSocket'; function App() { + const { isConnected, isConnecting, error } = useWebSocket(); + return ( diff --git a/src/frontend/src/api/apiService.tsx b/src/frontend/src/api/apiService.tsx index 6f1b79361..0914083cc 100644 --- a/src/frontend/src/api/apiService.tsx +++ b/src/frontend/src/api/apiService.tsx @@ -114,8 +114,12 @@ export class APIService { * @param inputTask The task description and optional session ID * @returns Promise with the response containing plan ID and status */ - async createPlan(inputTask: InputTask): Promise<{ plan_id: string; status: string; session_id: string }> { - return apiClient.post(API_ENDPOINTS.CREATE_PLAN, inputTask); + // async createPlan(inputTask: InputTask): Promise<{ plan_id: string; status: string; session_id: string }> { + // return apiClient.post(API_ENDPOINTS.CREATE_PLAN, inputTask); + // } + + async createPlan(inputTask: InputTask): Promise<{ status: string; session_id: string }> { + return apiClient.post(API_ENDPOINTS.CREATE_PLAN, inputTask); } /** diff --git a/src/frontend/src/api/config.tsx b/src/frontend/src/api/config.tsx index cd015245b..586c16890 100644 --- a/src/frontend/src/api/config.tsx +++ b/src/frontend/src/api/config.tsx @@ -120,16 +120,62 @@ export function getUserId(): string { */ export function headerBuilder(headers?: Record): Record { let userId = getUserId(); - console.log('headerBuilder: Using user ID:', userId); + //console.log('headerBuilder: Using user ID:', userId); let defaultHeaders = { "x-ms-client-principal-id": String(userId) || "", // Custom header }; - console.log('headerBuilder: Created headers:', defaultHeaders); + //console.log('headerBuilder: Created headers:', defaultHeaders); return { ...defaultHeaders, ...(headers ? headers : {}) }; } + +/** + * Initialize team on the backend - takes about 20 seconds + * @returns Promise with team initialization response + */ +// export async function initializeTeam(): Promise<{ +// status: string; +// team_id: string; +// }> { +// const apiUrl = getApiUrl(); +// if (!apiUrl) { +// throw new Error('API URL not configured'); +// } + +// const headers = headerBuilder({ +// 'Content-Type': 'application/json', +// }); + +// console.log('initializeTeam: Starting team initialization...'); + +// try { +// const response = await fetch(`${apiUrl}/init_team`, { +// method: 'GET', +// headers, +// }); + +// if (!response.ok) { +// const errorText = await response.text(); +// throw new Error(errorText || `HTTP error! status: ${response.status}`); +// } + +// const data = await response.json(); +// console.log('initializeTeam: Team initialization completed:', data); + +// // Validate the expected response format +// if (data.status !== 'Request started successfully' || !data.team_id) { +// throw new Error('Invalid response format from init_team endpoint'); +// } + +// return data; +// } catch (error) { +// console.error('initializeTeam: Error initializing team:', error); +// throw error; +// } +// } + export const toBoolean = (value: any): boolean => { if (typeof value !== 'string') { return false; @@ -145,5 +191,6 @@ export default { setEnvData, config, USER_ID, - API_URL + API_URL, + //initializeTeam }; \ No newline at end of file diff --git a/src/frontend/src/components/common/SettingsButton.tsx b/src/frontend/src/components/common/SettingsButton.tsx index 3b6eecfb4..d83ae67dc 100644 --- a/src/frontend/src/components/common/SettingsButton.tsx +++ b/src/frontend/src/components/common/SettingsButton.tsx @@ -41,6 +41,8 @@ import { import { TeamConfig } from '../../models/Team'; import { TeamService } from '../../services/TeamService'; +import '../../index.css' + // Icon mapping function to convert string icons to FluentUI icons const getIconFromString = (iconString: string): React.ReactNode => { const iconMap: Record = { @@ -514,7 +516,7 @@ const SettingsButton: React.FC = ({ appearance="subtle" size="small" onClick={(e) => handleDeleteTeam(team, e)} - style={{ color: '#d13438' }} + className="delete-team-button" /> @@ -751,16 +753,8 @@ const SettingsButton: React.FC = ({ {/* Delete Confirmation Dialog */} setDeleteConfirmOpen(data.open)}> - - + + āš ļø Delete Team Configuration
@@ -802,6 +796,7 @@ const SettingsButton: React.FC = ({ disabled={deleteLoading} style={{ backgroundColor: '#d13438', color: 'white' }} onClick={confirmDeleteTeam} + data-testid="delete-team-confirm" > {deleteLoading ? 'Deleting...' : 'Delete for Everyone'} diff --git a/src/frontend/src/components/content/HomeInput.tsx b/src/frontend/src/components/content/HomeInput.tsx index 8ce4742f7..367cfc00f 100644 --- a/src/frontend/src/components/content/HomeInput.tsx +++ b/src/frontend/src/components/content/HomeInput.tsx @@ -17,6 +17,7 @@ import { TeamConfig } from "../../models/Team"; import { TaskService } from "../../services/TaskService"; import { NewTaskService } from "../../services/NewTaskService"; import { RAIErrorCard, RAIErrorData } from "../errors"; +import { apiService } from "../../api/apiService"; import ChatInput from "@/coral/modules/ChatInput"; import InlineToaster, { useInlineToaster } from "../toast/InlineToaster"; @@ -83,13 +84,16 @@ const HomeInput: React.FC = ({ textareaRef.current.style.height = "auto"; } - if (response.plan_id && response.plan_id !== null) { + if (response.session_id && response.session_id !== null) { showToast("Plan created!", "success"); dismissToast(id); // Navigate to create page (no team ID in URL anymore) console.log('HomeInput: Navigating to plan creation with team:', selectedTeam?.name); - navigate(`/plan/${response.plan_id}`); + console.log('HomeInput: Navigating to plan creation with session:', response.session_id); + console.log('HomeInput: Plan created with session:', response.session_id); + + navigate(`/plan/${response.session_id}`); } else { showToast("Failed to create plan", "error"); dismissToast(id); diff --git a/src/frontend/src/components/content/PlanChat.tsx b/src/frontend/src/components/content/PlanChat.tsx index 56ef652d0..d612be720 100644 --- a/src/frontend/src/components/content/PlanChat.tsx +++ b/src/frontend/src/components/content/PlanChat.tsx @@ -1,24 +1,13 @@ +import { useEffect, useRef, useState, useCallback } from "react"; +import { DiamondRegular, CheckmarkCircleRegular, ClockRegular, ErrorCircleRegular, } from "@fluentui/react-icons"; +import { Body1, Button, Spinner, Tag, ToolbarDivider} from "@fluentui/react-components"; import HeaderTools from "@/coral/components/Header/HeaderTools"; import { Copy, Send } from "@/coral/imports/bundleicons"; import ChatInput from "@/coral/modules/ChatInput"; import remarkGfm from "remark-gfm"; import rehypePrism from "rehype-prism"; import { AgentType, ChatMessage, PlanChatProps, role } from "@/models"; -import { StreamingPlanUpdate } from "@/services/WebSocketService"; -import { - Body1, - Button, - Spinner, - Tag, - ToolbarDivider, -} from "@fluentui/react-components"; -import { DiamondRegular, HeartRegular } from "@fluentui/react-icons"; -import { useEffect, useRef, useState } from "react"; - -// Type guard to check if a message has streaming properties -const hasStreamingProperties = (msg: ChatMessage): msg is ChatMessage & { streaming?: boolean; status?: string; message_type?: string; } => { - return 'streaming' in msg || 'status' in msg || 'message_type' in msg; -}; +import { StreamingPlanUpdate, webSocketService } from "@/services/WebSocketService"; import ReactMarkdown from "react-markdown"; import "../../styles/PlanChat.css"; import "../../styles/Chat.css"; @@ -27,6 +16,26 @@ import { TaskService } from "@/services/TaskService"; import InlineToaster from "../toast/InlineToaster"; import ContentNotFound from "../NotFound/ContentNotFound"; + +// Type guard to check if a message has streaming properties +const hasStreamingProperties = (msg: ChatMessage): msg is ChatMessage & { + streaming?: boolean; + status?: string; + message_type?: string; + step_id?: string; +} => { + return 'streaming' in msg || 'status' in msg || 'message_type' in msg; +}; + +interface GroupedMessage { + id: string; + agent_name: string; + messages: StreamingPlanUpdate[]; + status: string; + latest_timestamp: string; + step_id?: string; +} + const PlanChat: React.FC = ({ planData, input, @@ -40,82 +49,281 @@ const PlanChat: React.FC = ({ const messages = planData?.messages || []; const [showScrollButton, setShowScrollButton] = useState(false); const [inputHeight, setInputHeight] = useState(0); + const [groupedStreamingMessages, setGroupedStreamingMessages] = useState([]); const messagesContainerRef = useRef(null); const inputContainerRef = useRef(null); - // Debug logging - console.log('PlanChat - planData:', planData); - console.log('PlanChat - messages:', messages); - console.log('PlanChat - messages.length:', messages.length); + // Helper function to normalize timestamp + const normalizeTimestamp = (timestamp?: string | number): string => { + if (!timestamp) return new Date().toISOString(); + if (typeof timestamp === 'number') { + // Backend sends float timestamp, convert to ISO string + return new Date(timestamp * 1000).toISOString(); + } + return timestamp; + }; - // Scroll to Bottom useEffect + // Group streaming messages by agent + const groupStreamingMessages = useCallback((messages: StreamingPlanUpdate[]): GroupedMessage[] => { + const groups: { [key: string]: GroupedMessage } = {}; - useEffect(() => { - scrollToBottom(); - }, [messages, streamingMessages]); + messages.forEach((msg) => { + // Create a unique key for grouping (agent + step) + const groupKey = `${msg.agent_name || 'system'}_${msg.step_id || 'general'}`; + + if (!groups[groupKey]) { + groups[groupKey] = { + id: groupKey, + agent_name: msg.agent_name || 'BOT', + messages: [], + status: msg.status || 'in_progress', + latest_timestamp: normalizeTimestamp(msg.timestamp), + step_id: msg.step_id, + }; + } + + groups[groupKey].messages.push(msg); + + // Update status to latest + const msgTimestamp = normalizeTimestamp(msg.timestamp); + const groupTimestamp = groups[groupKey].latest_timestamp; + if (msgTimestamp > groupTimestamp) { + groups[groupKey].status = msg.status || groups[groupKey].status; + groups[groupKey].latest_timestamp = msgTimestamp; + } + }); - //Scroll to Bottom Buttom + return Object.values(groups).sort((a, b) => + new Date(a.latest_timestamp).getTime() - new Date(b.latest_timestamp).getTime() + ); + }, []); + + // Update grouped messages when streaming messages change + useEffect(() => { + if (streamingMessages.length > 0) { + const grouped = groupStreamingMessages(streamingMessages); + setGroupedStreamingMessages(grouped); + } else { + // Clear grouped messages when no streaming messages + setGroupedStreamingMessages([]); + } + }, [streamingMessages, groupStreamingMessages]); + + // Auto-scroll behavior + useEffect(() => { + scrollToBottom(); + }, [messages, groupedStreamingMessages]); useEffect(() => { - const container = messagesContainerRef.current; - if (!container) return; + const container = messagesContainerRef.current; + if (!container) return; + + const handleScroll = () => { + const { scrollTop, scrollHeight, clientHeight } = container; + setShowScrollButton(scrollTop + clientHeight < scrollHeight - 100); + }; + + container.addEventListener("scroll", handleScroll); + return () => container.removeEventListener("scroll", handleScroll); + }, []); - const handleScroll = () => { - const { scrollTop, scrollHeight, clientHeight } = container; - setShowScrollButton(scrollTop + clientHeight < scrollHeight - 100); + useEffect(() => { + if (inputContainerRef.current) { + setInputHeight(inputContainerRef.current.offsetHeight); + } + }, [input]); + + const scrollToBottom = () => { + messagesContainerRef.current?.scrollTo({ + top: messagesContainerRef.current.scrollHeight, + behavior: "smooth", + }); + setShowScrollButton(false); }; - container.addEventListener("scroll", handleScroll); - return () => container.removeEventListener("scroll", handleScroll); - }, []); + // Get status icon for streaming messages + const getStatusIcon = (status: string) => { + switch (status) { + case 'completed': + return ; + case 'error': + case 'failed': + return ; + case 'in_progress': + return ; + default: + return ; + } + }; - useEffect(() => { - if (inputContainerRef.current) { - setInputHeight(inputContainerRef.current.offsetHeight); + // Get message type display text + const getMessageTypeText = (messageType?: string, status?: string) => { + if (status === 'completed') return 'Completed'; + if (status === 'error' || status === 'failed') return 'Failed'; + + switch (messageType) { + case 'thinking': + return 'Thinking...'; + case 'action': + return 'Working...'; + case 'result': + return 'Result'; + case 'clarification_needed': + return 'Needs Input'; + case 'plan_approval_request': + return 'Approval Required'; + default: + return status === 'in_progress' ? 'In Progress' : 'Live'; + } + }; + + if (!planData && !loading) { + return ( + + ); } - }, [input]); // or [inputValue, submittingChatDisableInput] - - const scrollToBottom = () => { - messagesContainerRef.current?.scrollTo({ - top: messagesContainerRef.current.scrollHeight, - behavior: "smooth", - }); - setShowScrollButton(false); - }; - if (!planData) + // Render a grouped streaming message + const renderGroupedStreamingMessage = (group: GroupedMessage) => { + const latestMessage = group.messages[group.messages.length - 1]; + const hasMultipleMessages = group.messages.length > 1; + return ( - +
+
+
+ + {TaskService.cleanTextToSpaces(group.agent_name)} + + + BOT + + + {getMessageTypeText(latestMessage.message_type, group.status)} + +
+
+ + +
+ {hasMultipleMessages ? ( + // Show combined content for multiple messages +
+ {group.messages.map((msg, idx) => ( +
+ {msg.content && ( + + {TaskService.cleanHRAgent(msg.content)} + + )} +
+ ))} +
+ ) : ( + // Single message content + + {TaskService.cleanHRAgent(latestMessage.content || "") || ""} + + )} + + +
+
+
+
+ + } + appearance="filled" + size="extra-small" + > + Live updates from agent + +
+
+
+
+
); + }; - // If no messages exist, show the initial task as the first message - const displayMessages = messages.length > 0 ? messages : [ - { - source: AgentType.HUMAN, - content: planData.plan?.initial_goal || "Task started", - timestamp: planData.plan?.timestamp || new Date().toISOString() - } - ]; + // Combine regular messages and streaming messages for display + // const allMessages = [ + // ...messages, + // // Add streaming messages as regular chat messages for display + // ...groupedStreamingMessages.map(group => ({ + // source: group.agent_name, + // content: group.messages.map(msg => msg.content).join('\n\n'), + // streaming: true, + // status: group.status, + // message_type: group.messages[group.messages.length - 1].message_type, + // step_id: group.step_id + // })) + // ]; - // Merge streaming messages with existing messages - const allMessages: ChatMessage[] = [...displayMessages]; +// Check if agents are actively working +const agentsWorking = streamingMessages.length > 0 && + groupedStreamingMessages.some(group => + group.status === 'in_progress' || + group.messages.some(msg => msg.status === 'in_progress') + ); + +// Get the name of the currently working agent +const getWorkingAgentName = (): string | null => { + if (!agentsWorking) return null; - // Add streaming messages as assistant messages - streamingMessages.forEach(streamMsg => { - if (streamMsg.content) { - allMessages.push({ - source: streamMsg.agent_name || 'AI Assistant', - content: streamMsg.content, - timestamp: new Date().toISOString(), - streaming: true, - status: streamMsg.status, - message_type: streamMsg.message_type - }); - } - }); + // Find the first agent that's currently in progress + const workingGroup = groupedStreamingMessages.find(group => + group.status === 'in_progress' || + group.messages.some(msg => msg.status === 'in_progress') + ); + + return workingGroup ? workingGroup.agent_name : null; +}; + +const workingAgentName = getWorkingAgentName(); - console.log('PlanChat - all messages including streaming:', allMessages); +// Disable input when agents are working +const shouldDisableInput = !planData?.enableChat || submittingChatDisableInput || agentsWorking; + +// Generate dynamic placeholder text +const getPlaceholderText = (): string => { + if (workingAgentName) { + return `${TaskService.cleanTextToSpaces(workingAgentName)} is working on your plan...`; + } + return "Add more info to this task..."; +}; + + console.log('PlanChat - streamingMessages:', streamingMessages); + console.log('PlanChat - submittingChatDisableInput:', submittingChatDisableInput); + console.log('PlanChat - shouldDisableInput:', shouldDisableInput); return (
@@ -135,12 +343,13 @@ const PlanChat: React.FC = ({ )}
- {allMessages.map((msg, index) => { + {/* Render regular messages */} + {messages.map((msg, index) => { const isHuman = msg.source === AgentType.HUMAN; return (
{!isHuman && ( @@ -213,6 +422,9 @@ const PlanChat: React.FC = ({
); })} + + {/* Render streaming messages */} + {groupedStreamingMessages.map(group => renderGroupedStreamingMessage(group))}
@@ -223,8 +435,8 @@ const PlanChat: React.FC = ({ shape="circular" style={{ bottom: inputHeight, - position: "absolute", // ensure this or your class handles it - right: 16, // optional, for right alignment + position: "absolute", + right: 16, zIndex: 5, }} > @@ -238,18 +450,14 @@ const PlanChat: React.FC = ({ value={input} onChange={setInput} onEnter={() => OnChatSubmit(input)} - disabledChat={ - planData?.enableChat ? submittingChatDisableInput : true - } - placeholder="Add more info to this task..." + disabledChat={shouldDisableInput} + placeholder={getPlaceholderText()} >
@@ -258,4 +466,4 @@ const PlanChat: React.FC = ({ ); }; -export default PlanChat; +export default PlanChat; \ No newline at end of file diff --git a/src/frontend/src/hooks/index.tsx b/src/frontend/src/hooks/index.tsx index 497005355..70bfbf9c7 100644 --- a/src/frontend/src/hooks/index.tsx +++ b/src/frontend/src/hooks/index.tsx @@ -1 +1,2 @@ export { default as useRAIErrorHandling } from './useRAIErrorHandling'; +export { useWebSocket } from './useWebSocket'; \ No newline at end of file diff --git a/src/frontend/src/hooks/useWebSocket.tsx b/src/frontend/src/hooks/useWebSocket.tsx new file mode 100644 index 000000000..0d1b9b52d --- /dev/null +++ b/src/frontend/src/hooks/useWebSocket.tsx @@ -0,0 +1,83 @@ +// Warning: Vibe coded as a simple websocket test + +import { useEffect, useRef, useState } from 'react'; +import { webSocketService, StreamMessage } from '../services/WebSocketService'; + +export interface WebSocketState { + isConnected: boolean; + isConnecting: boolean; + error: string | null; +} + +export const useWebSocket = () => { + const [state, setState] = useState({ + isConnected: false, + isConnecting: false, + error: null + }); + + const hasConnected = useRef(false); + + useEffect(() => { + // Prevent multiple connections + if (hasConnected.current) return; + hasConnected.current = true; + + const connectWebSocket = async () => { + setState(prev => ({ ...prev, isConnecting: true, error: null })); + + try { + await webSocketService.connect(); + setState(prev => ({ ...prev, isConnected: true, isConnecting: false })); + } catch (error) { + console.error('Failed to connect to WebSocket:', error); + setState(prev => ({ + ...prev, + isConnected: false, + isConnecting: false, + error: 'Failed to connect to server' + })); + } + }; + + // Set up connection status listener + const unsubscribeStatus = webSocketService.on('connection_status', (message: StreamMessage) => { + if (message.data?.connected !== undefined) { + setState(prev => ({ + ...prev, + isConnected: message.data.connected, + isConnecting: false, + error: message.data.connected ? null : prev.error + })); + } + }); + + // Set up error listener + const unsubscribeError = webSocketService.on('error', (message: StreamMessage) => { + setState(prev => ({ + ...prev, + error: message.data?.error || 'WebSocket error occurred' + })); + }); + + // Connect + connectWebSocket(); + + // Cleanup on unmount + return () => { + unsubscribeStatus(); + unsubscribeError(); + webSocketService.disconnect(); + hasConnected.current = false; + }; + }, []); + + return { + ...state, + webSocketService, + reconnect: () => { + setState(prev => ({ ...prev, isConnecting: true, error: null })); + return webSocketService.connect(); + } + }; +}; \ No newline at end of file diff --git a/src/frontend/src/index.css b/src/frontend/src/index.css index d9a069000..47b8fea14 100644 --- a/src/frontend/src/index.css +++ b/src/frontend/src/index.css @@ -44,3 +44,49 @@ body, html, :root { --chartPointColor: var(--colorBrandBackground); --chartPointBorderColor: var(--colorBrandForeground1); } + + +/* Delete dialog layout override */ +.fui-Dialog__content[data-testid="delete-dialog"] { + display: flex !important; + flex-direction: column !important; + grid-template-columns: none !important; + grid-template-rows: none !important; + grid-template: none !important; +} + +.fui-Dialog__content[data-testid="delete-dialog"] .fui-DialogBody { + display: flex !important; + flex-direction: column !important; + grid-template-columns: none !important; + grid-template-rows: none !important; + grid-template: none !important; +} + +/* Alternative approach - target all dialog content that contains delete buttons */ +/* .fui-Dialog__content:has(button[data-testid="delete-team-confirm"]) { + display: flex !important; + flex-direction: column !important; + grid-template-columns: none !important; + grid-template-rows: none !important; +} + +.fui-Dialog__content:has(button[data-testid="delete-team-confirm"]) .fui-DialogBody { + display: flex !important; + flex-direction: column !important; + grid-template-columns: none !important; + grid-template-rows: none !important; +} */ + +/* Delete button red color override */ +.delete-team-button, +.delete-team-button .fui-Button__icon { + color: #d13438 !important; + background-color: transparent !important; +} + +.delete-team-button:hover, +.delete-team-button:hover .fui-Button__icon { + background-color: #fdf2f2 !important; + color: #a4262c !important; +} \ No newline at end of file diff --git a/src/frontend/src/pages/HomePage.tsx b/src/frontend/src/pages/HomePage.tsx index 45fdef560..fcd1f0c34 100644 --- a/src/frontend/src/pages/HomePage.tsx +++ b/src/frontend/src/pages/HomePage.tsx @@ -35,43 +35,129 @@ const HomePage: React.FC = () => { /** * Load teams and set default team on component mount */ - useEffect(() => { - const loadDefaultTeam = async () => { - let defaultTeam = TeamService.getStoredTeam(); - if (defaultTeam) { - setSelectedTeam(defaultTeam); - console.log('Default team loaded from storage:', defaultTeam.name); + // useEffect(() => { + // const loadDefaultTeam = async () => { + // let defaultTeam = TeamService.getStoredTeam(); + // if (defaultTeam) { + // setSelectedTeam(defaultTeam); + // console.log('Default team loaded from storage:', defaultTeam.name); + + // setIsLoadingTeam(false); + // return true; + // } + // setIsLoadingTeam(true); + // try { + // const teams = await TeamService.getUserTeams(); + // console.log('All teams loaded:', teams); + // if (teams.length > 0) { + // // Always prioritize "Business Operations Team" as default + // const hrTeam = teams.find(team => team.name === "Human Resources Team"); + // defaultTeam = hrTeam || teams[0]; + + // TeamService.storageTeam(defaultTeam); + // setSelectedTeam(defaultTeam); + // console.log('Default team loaded:', defaultTeam.name, 'with', defaultTeam.starting_tasks?.length || 0, 'starting tasks'); + // console.log('Team logo:', defaultTeam.logo); + // console.log('Team description:', defaultTeam.description); + + // } else { + // console.log('No teams found - user needs to upload a team configuration'); + // // Even if no teams are found, we clear the loading state to show the "no team" message + // } + // } catch (error) { + // console.error('Error loading default team:', error); + // } finally { + // setIsLoadingTeam(false); + // } + // }; + + // loadDefaultTeam(); + // }, []); - setIsLoadingTeam(false); - return true; +useEffect(() => { + const initTeam = async () => { + setIsLoadingTeam(true); + + try { + console.log('Initializing team from backend...'); + + // Call the backend init_team endpoint (takes ~20 seconds) + const initResponse = await TeamService.initializeTeam(); + + if (initResponse.data?.status === 'Request started successfully' && initResponse.data?.team_id) { + console.log('Team initialization completed:', initResponse.data?.team_id); + + // Now fetch the actual team details using the team_id + const teams = await TeamService.getUserTeams(); + const initializedTeam = teams.find(team => team.team_id === initResponse.data?.team_id); + + if (initializedTeam) { + setSelectedTeam(initializedTeam); + TeamService.storageTeam(initializedTeam); + + console.log('Team loaded successfully:', initializedTeam.name); + console.log('Team agents:', initializedTeam.agents?.length || 0); + + showToast( + `${initializedTeam.name} team initialized successfully with ${initializedTeam.agents?.length || 0} agents`, + "success" + ); + } else { + // Fallback: if we can't find the specific team, use HR team or first available + console.log('Specific team not found, using default selection logic'); + const hrTeam = teams.find(team => team.name === "Human Resources Team"); + const defaultTeam = hrTeam || teams[0]; + + if (defaultTeam) { + setSelectedTeam(defaultTeam); + TeamService.storageTeam(defaultTeam); + showToast( + `${defaultTeam.name} team loaded as default`, + "success" + ); + } + } + + } else { + throw new Error('Invalid response from init_team endpoint'); } - setIsLoadingTeam(true); + + } catch (error) { + console.error('Error initializing team from backend:', error); + showToast("Team initialization failed, using fallback", "warning"); + + // Fallback to the old client-side method try { + console.log('Using fallback: client-side team loading...'); const teams = await TeamService.getUserTeams(); - console.log('All teams loaded:', teams); if (teams.length > 0) { - // Always prioritize "Business Operations Team" as default - const businessOpsTeam = teams.find(team => team.name === "Business Operations Team"); - defaultTeam = businessOpsTeam || teams[0]; - TeamService.storageTeam(defaultTeam); + const hrTeam = teams.find(team => team.name === "Human Resources Team"); + const defaultTeam = hrTeam || teams[0]; setSelectedTeam(defaultTeam); - console.log('Default team loaded:', defaultTeam.name, 'with', defaultTeam.starting_tasks?.length || 0, 'starting tasks'); - console.log('Team logo:', defaultTeam.logo); - console.log('Team description:', defaultTeam.description); - console.log('Is Business Operations Team:', defaultTeam.name === "Business Operations Team"); + TeamService.storageTeam(defaultTeam); + + showToast( + `${defaultTeam.name} team loaded (fallback mode)`, + "info" + ); } else { console.log('No teams found - user needs to upload a team configuration'); - // Even if no teams are found, we clear the loading state to show the "no team" message + showToast( + "No teams found. Please upload a team configuration.", + "warning" + ); } - } catch (error) { - console.error('Error loading default team:', error); - } finally { - setIsLoadingTeam(false); + } catch (fallbackError) { + console.error('Fallback team loading also failed:', fallbackError); + showToast("Failed to load team configuration", "error"); } - }; + } finally { + setIsLoadingTeam(false); + } + }; - loadDefaultTeam(); - }, []); + initTeam(); +}, []); /** * Handle new task creation from the "New task" button @@ -109,12 +195,12 @@ const HomePage: React.FC = () => { console.log('Teams refreshed after upload:', teams.length); if (teams.length > 0) { - // Always keep "Business Operations Team" as default, even after new uploads - const businessOpsTeam = teams.find(team => team.name === "Business Operations Team"); - const defaultTeam = businessOpsTeam || teams[0]; + // Always keep "Human Resources Team" as default, even after new uploads + const hrTeam = teams.find(team => team.name === "Human Resources Team"); + const defaultTeam = hrTeam || teams[0]; setSelectedTeam(defaultTeam); console.log('Default team after upload:', defaultTeam.name); - console.log('Business Operations Team remains default'); + console.log('Human Resources Team remains default'); showToast( `Team uploaded successfully! ${defaultTeam.name} remains your default team.`, "success" diff --git a/src/frontend/src/pages/PlanCreatePage.tsx b/src/frontend/src/pages/PlanCreatePage.tsx deleted file mode 100644 index d672c2895..000000000 --- a/src/frontend/src/pages/PlanCreatePage.tsx +++ /dev/null @@ -1,340 +0,0 @@ -import React, { useCallback, useEffect, useState } from "react"; -import { useParams, useNavigate } from "react-router-dom"; -import { - Text, - ToggleButton, -} from "@fluentui/react-components"; -import "../styles/PlanPage.css"; -import CoralShellColumn from "../coral/components/Layout/CoralShellColumn"; -import CoralShellRow from "../coral/components/Layout/CoralShellRow"; -import Content from "../coral/components/Content/Content"; -import { NewTaskService } from "../services/NewTaskService"; -import { PlanDataService } from "../services/PlanDataService"; -import { Step, ProcessedPlanData } from "@/models"; -import PlanPanelLeft from "@/components/content/PlanPanelLeft"; -import ContentToolbar from "@/coral/components/Content/ContentToolbar"; -import PlanChat from "@/components/content/PlanChat"; -import PlanPanelRight from "@/components/content/PlanPanelRight"; -import InlineToaster, { - useInlineToaster, -} from "../components/toast/InlineToaster"; -import Octo from "../coral/imports/Octopus.png"; // šŸ™ Animated PNG loader -import PanelRightToggles from "@/coral/components/Header/PanelRightToggles"; -import { TaskListSquareLtr } from "@/coral/imports/bundleicons"; -import LoadingMessage, { loadingMessages } from "@/coral/components/LoadingMessage"; -import { RAIErrorCard, RAIErrorData } from "../components/errors"; -import { apiClient } from "../api/apiClient"; -import { TeamConfig } from "../models/Team"; -import { TeamService } from "../services/TeamService"; - -/** - * Page component for creating and viewing a plan being generated - */ -const PlanCreatePage: React.FC = () => { - const { planId, teamId } = useParams<{ planId: string; teamId?: string }>(); - const navigate = useNavigate(); - const { showToast, dismissToast } = useInlineToaster(); - - const [input, setInput] = useState(""); - const [planData, setPlanData] = useState(null); - const [allPlans, setAllPlans] = useState([]); - const [loading, setLoading] = useState(true); - const [submittingChatDisableInput, setSubmitting] = useState(false); - const [error, setError] = useState(null); - const [processingSubtaskId, setProcessingSubtaskId] = useState( - null - ); - const [reloadLeftList, setReloadLeftList] = useState(true); - const [raiError, setRAIError] = useState(null); - const [planGenerated, setPlanGenerated] = useState(false); - const [selectedTeam, setSelectedTeam] = useState(null); - - const [loadingMessage, setLoadingMessage] = useState(loadingMessages[0]); - - // šŸŒ€ Cycle loading messages while loading - useEffect(() => { - if (!loading) return; - let index = 0; - const interval = setInterval(() => { - index = (index + 1) % loadingMessages.length; - setLoadingMessage(loadingMessages[index]); - }, 2000); - return () => clearInterval(interval); - }, [loading]); - - // Load team data if teamId is provided - useEffect(() => { - const loadTeamData = async () => { - if (teamId) { - console.log('Loading team data for ID:', teamId); - try { - const team = await TeamService.getTeamById(teamId); - if (team) { - setSelectedTeam(team); - console.log('Team loaded for plan creation:', team.name); - } else { - console.warn('Team not found for ID:', teamId); - } - } catch (error) { - console.error('Error loading team data:', error); - } - } - }; - - loadTeamData(); - }, [teamId]); - - useEffect(() => { - const currentPlan = allPlans.find( - (plan) => plan.plan.id === planId - ); - setPlanData(currentPlan || null); - }, [allPlans, planId]); - - const generatePlan = useCallback(async () => { - if (!planId) return; - - try { - setLoading(true); - setError(null); - - let toastId = showToast("Generating plan steps...", "progress"); - - // Call the generate_plan endpoint using apiClient for proper authentication - const result = await apiClient.post('/generate_plan', { - plan_id: planId - }); - - dismissToast(toastId); - showToast("Plan generated successfully!", "success"); - setPlanGenerated(true); - - // Now load the plan data to display it - await loadPlanData(false); - - } catch (err) { - console.error("Failed to generate plan:", err); - setError( - err instanceof Error ? err : new Error("Failed to generate plan") - ); - setLoading(false); - } - }, [planId, showToast, dismissToast]); - - const loadPlanData = useCallback( - async (navigate: boolean = true) => { - if (!planId) return; - - try { - setInput(""); // Clear input on new load - if (navigate) { - setPlanData(null); - setLoading(true); - setError(null); - setProcessingSubtaskId(null); - } - - setError(null); - const data = await PlanDataService.fetchPlanData(planId, navigate); - let plans = [...allPlans]; - const existingIndex = plans.findIndex(p => p.plan.id === data.plan.id); - if (existingIndex !== -1) { - plans[existingIndex] = data; - } else { - plans.push(data); - } - setAllPlans(plans); - - // If plan has steps and we haven't generated yet, mark as generated - if (data.plan.steps && data.plan.steps.length > 0 && !planGenerated) { - setPlanGenerated(true); - } - - } catch (err) { - console.log("Failed to load plan data:", err); - setError( - err instanceof Error ? err : new Error("Failed to load plan data") - ); - } finally { - setLoading(false); - } - }, - [planId, allPlans, planGenerated] - ); - - const handleOnchatSubmit = useCallback( - async (chatInput: string) => { - if (!chatInput.trim()) { - showToast("Please enter a clarification", "error"); - return; - } - setInput(""); - setRAIError(null); // Clear any previous RAI errors - if (!planData?.plan) return; - setSubmitting(true); - let id = showToast("Submitting clarification", "progress"); - try { - await PlanDataService.submitClarification( - planData.plan.id, - planData.plan.session_id, - chatInput - ); - setInput(""); - dismissToast(id); - showToast("Clarification submitted successfully", "success"); - await loadPlanData(false); - } catch (error: any) { - dismissToast(id); - - // Check if this is an RAI validation error - let errorDetail = null; - try { - // Try to parse the error detail if it's a string - if (typeof error?.response?.data?.detail === 'string') { - errorDetail = JSON.parse(error.response.data.detail); - } else { - errorDetail = error?.response?.data?.detail; - } - } catch (parseError) { - // If parsing fails, use the original error - errorDetail = error?.response?.data?.detail; - } - - // Handle RAI validation errors with better UX - if (errorDetail?.error_type === 'RAI_VALIDATION_FAILED') { - setRAIError(errorDetail); - } else { - // Handle other errors with toast messages - showToast("Failed to submit clarification", "error"); - } - } finally { - setInput(""); - setSubmitting(false); - } - }, - [planData, loadPlanData] - ); - - const handleApproveStep = useCallback( - async (step: Step, total: number, completed: number, approve: boolean) => { - setProcessingSubtaskId(step.id); - const toastMessage = approve ? "Approving step" : "Rejecting step"; - let id = showToast(toastMessage, "progress"); - setSubmitting(true); - try { - let approveRejectDetails = await PlanDataService.stepStatus(step, approve); - dismissToast(id); - showToast(`Step ${approve ? "approved" : "rejected"} successfully`, "success"); - if (approveRejectDetails && Object.keys(approveRejectDetails).length > 0) { - await loadPlanData(false); - } - setReloadLeftList(true); - } catch (error) { - dismissToast(id); - showToast(`Failed to ${approve ? "approve" : "reject"} step`, "error"); - } finally { - setProcessingSubtaskId(null); - setSubmitting(false); - } - }, - [loadPlanData] - ); - - useEffect(() => { - const initializePage = async () => { - // Load the basic plan data first - await loadPlanData(true); - }; - - initializePage(); - }, []); - - // Separate effect for plan generation when plan data is loaded - useEffect(() => { - if (planData && (!planData.plan.steps || planData.plan.steps.length === 0) && !planGenerated && !loading) { - generatePlan(); - } - }, [planData, planGenerated, loading]); - - const handleNewTaskButton = () => { - NewTaskService.handleNewTaskFromPlan(navigate); - }; - - if (!planId) { - return ( -
- Error: No plan ID provided -
- ); - } - - return ( - - - setReloadLeftList(false)} - selectedTeam={selectedTeam} - /> - - - {/* šŸ™ Only replaces content body, not page shell */} - {loading ? ( - <> - - - ) : ( - <> - - - } - /> - - - - {/* Show RAI error if present */} - {raiError && ( -
- { - setRAIError(null); - }} - onDismiss={() => setRAIError(null)} - /> -
- )} - - - - )} -
- - -
-
- ); -}; - -export default PlanCreatePage; diff --git a/src/frontend/src/pages/PlanPage.tsx b/src/frontend/src/pages/PlanPage.tsx index d4d0ea577..2a87687ea 100644 --- a/src/frontend/src/pages/PlanPage.tsx +++ b/src/frontend/src/pages/PlanPage.tsx @@ -8,6 +8,7 @@ import { ToastBody, useToastController, } from "@fluentui/react-components"; +import { apiService } from "../api/apiService"; import "../styles/PlanPage.css"; import CoralShellColumn from "../coral/components/Layout/CoralShellColumn"; import CoralShellRow from "../coral/components/Layout/CoralShellRow"; @@ -31,6 +32,7 @@ import { TeamConfig } from "../models/Team"; import { TeamService } from "../services/TeamService"; import { webSocketService, StreamMessage, StreamingPlanUpdate } from "../services/WebSocketService"; + /** * Page component for displaying a specific plan * Accessible via the route /plan/{plan_id} @@ -63,12 +65,18 @@ const PlanPage: React.FC = () => { // WebSocket connection and streaming setup useEffect(() => { const initializeWebSocket = async () => { - try { - await webSocketService.connect(); + // Only connect if not already connected + if (!webSocketService.isConnected()) { + try { + await webSocketService.connect(); + setWsConnected(true); + } catch (error) { + console.error('Failed to connect to WebSocket:', error); + setWsConnected(false); + } + } else { + // Already connected setWsConnected(true); - } catch (error) { - console.error('Failed to connect to WebSocket:', error); - setWsConnected(false); } }; @@ -119,11 +127,12 @@ const PlanPage: React.FC = () => { unsubscribeError(); webSocketService.disconnect(); }; - }, [planId, showToast]); + }, [planId]); // Subscribe to plan updates when planId changes - useEffect(() => { - if (planId && wsConnected) { + useEffect(() => { + if (planId && wsConnected && !planId.startsWith('sid_')) { + // Only subscribe if we have a real plan_id (not session_id) console.log('Subscribing to plan updates for:', planId); webSocketService.subscribeToPlan(planId); @@ -159,8 +168,8 @@ const PlanPage: React.FC = () => { console.log('All teams loaded:', teams); if (teams.length > 0) { // Always prioritize "Business Operations Team" as default - const businessOpsTeam = teams.find(team => team.name === "Business Operations Team"); - defaultTeam = businessOpsTeam || teams[0]; + const hrTeam = teams.find(team => team.name === "Human Resources Team"); + defaultTeam = hrTeam || teams[0]; TeamService.storageTeam(defaultTeam); setSelectedTeam(defaultTeam); console.log('Default team loaded:', defaultTeam.name); @@ -196,15 +205,17 @@ const PlanPage: React.FC = () => { setError(null); const data = await PlanDataService.fetchPlanData(planId,navigate); - let plans = [...allPlans]; + + setAllPlans(currentPlans => { + const plans = [...currentPlans]; const existingIndex = plans.findIndex(p => p.plan.id === data.plan.id); if (existingIndex !== -1) { plans[existingIndex] = data; } else { plans.push(data); } - setAllPlans(plans); - //setPlanData(data); + return plans; + }); } catch (err) { console.log("Failed to load plan data:", err); setError( @@ -303,8 +314,74 @@ const PlanPage: React.FC = () => { useEffect(() => { - loadPlanData(true); - }, [loadPlanData]); + + const initializePlanLoading = async () => { + if (!planId) return; + + // Check if this looks like a session_id (starts with "sid_") + if (planId.startsWith('sid_')) { + console.log('Detected session_id, resolving to plan_id:', planId); + + try { + // Try to find the plan by session_id + const plans = await apiService.getPlans(); + const matchingPlan = plans.find(plan => plan.session_id === planId); + + if (matchingPlan) { + // Found the plan! Replace URL with correct plan_id + console.log('Resolved session_id to plan_id:', matchingPlan.id); + navigate(`/plan/${matchingPlan.id}`, { replace: true }); + return; // Navigation will trigger reload with correct ID + } else { + // Plan not created yet, start polling + console.log('Plan not found yet, starting polling for session:', planId); + let attempts = 0; + const maxAttempts = 20; // Poll for up to 20 seconds + + const pollForPlan = async () => { + attempts++; + if (attempts > maxAttempts) { + console.error('Plan creation timed out after polling'); + setError(new Error('Plan creation is taking longer than expected. Please check your task list or try creating a new plan.')); + setLoading(false); + return; + } + + try { + const plans = await apiService.getPlans(); + const plan = plans.find(p => p.session_id === planId); + + if (plan) { + console.log(`Found plan after ${attempts} attempts:`, plan.id); + navigate(`/plan/${plan.id}`, { replace: true }); + } else { + // Wait and try again + setTimeout(pollForPlan, 1000); // Poll every second + } + } catch (error) { + console.error('Polling error:', error); + if (attempts < maxAttempts) { + setTimeout(pollForPlan, 2000); // Wait longer on error + } + } + }; + + pollForPlan(); + } + } catch (error) { + console.error('Session resolution error:', error); + setError(error instanceof Error ? error : new Error('Failed to resolve plan from session')); + setLoading(false); + } + } else { + + console.log('Using plan_id directly:', planId); + loadPlanData(true); + } + }; + + initializePlanLoading (); +}, [planId, navigate, loadPlanData]); const handleNewTaskButton = () => { NewTaskService.handleNewTaskFromPlan(navigate); @@ -341,32 +418,32 @@ const PlanPage: React.FC = () => { /** * Handle team upload completion - refresh team list */ - const handleTeamUpload = useCallback(async () => { - try { - const teams = await TeamService.getUserTeams(); - console.log('Teams refreshed after upload:', teams.length); - - if (teams.length > 0) { - // Always keep "Business Operations Team" as default, even after new uploads - const businessOpsTeam = teams.find(team => team.name === "Business Operations Team"); - const defaultTeam = businessOpsTeam || teams[0]; - setSelectedTeam(defaultTeam); - console.log('Default team after upload:', defaultTeam.name); - - dispatchToast( - - Team Uploaded Successfully! - - Team uploaded. {defaultTeam.name} remains your default team. - - , - { intent: "success" } - ); - } - } catch (error) { - console.error('Error refreshing teams after upload:', error); + const handleTeamUpload = useCallback(async () => { + try { + const teams = await TeamService.getUserTeams(); + console.log('Teams refreshed after upload:', teams.length); + + if (teams.length > 0) { + // Always keep "Human Resources Team" as default, even after new uploads + const hrTeam = teams.find(team => team.name === "Human Resources Team"); + const defaultTeam = hrTeam || teams[0]; + setSelectedTeam(defaultTeam); + console.log('Default team after upload:', defaultTeam.name); + + dispatchToast( + + Team Uploaded Successfully! + + Team uploaded. {defaultTeam.name} remains your default team. + + , + { intent: "success" } + ); } - }, [dispatchToast]); + } catch (error) { + console.error('Error refreshing teams after upload:', error); + } +}, [dispatchToast]); if (!planId) { return ( diff --git a/src/frontend/src/pages/index.tsx b/src/frontend/src/pages/index.tsx index 3c3853af6..2d4732818 100644 --- a/src/frontend/src/pages/index.tsx +++ b/src/frontend/src/pages/index.tsx @@ -1,3 +1,2 @@ export { default as HomePage } from './HomePage'; export { default as PlanPage } from './PlanPage'; -export { default as PlanCreatePage } from './PlanCreatePage'; \ No newline at end of file diff --git a/src/frontend/src/services/TaskService.tsx b/src/frontend/src/services/TaskService.tsx index 5cdb04249..7e164e0fd 100644 --- a/src/frontend/src/services/TaskService.tsx +++ b/src/frontend/src/services/TaskService.tsx @@ -206,7 +206,7 @@ export class TaskService { static async createPlan( description: string, teamId?: string - ): Promise<{ plan_id: string; status: string; session_id: string }> { + ): Promise<{ status: string; session_id: string }> { const sessionId = this.generateSessionId(); const inputTask: InputTask = { diff --git a/src/frontend/src/services/TeamService.tsx b/src/frontend/src/services/TeamService.tsx index 065f6ec35..3791530b1 100644 --- a/src/frontend/src/services/TeamService.tsx +++ b/src/frontend/src/services/TeamService.tsx @@ -19,6 +19,46 @@ export class TeamService { } } + /** + * Initialize user's team with default HR team configuration + * This calls the backend /init_team endpoint which sets up the default team + */ + static async initializeTeam(): Promise<{ + success: boolean; + data?: { + status: string; + team_id: string; + }; + error?: string; + }> { + try { + console.log('Calling /v3/init_team endpoint...'); + const response = await apiClient.get('/v3/init_team'); + + console.log('Team initialization response:', response); + + return { + success: true, + data: response + }; + } catch (error: any) { + console.error('Team initialization failed:', error); + + let errorMessage = 'Failed to initialize team'; + + if (error.response?.data?.detail) { + errorMessage = error.response.data.detail; + } else if (error.message) { + errorMessage = error.message; + } + + return { + success: false, + error: errorMessage + }; + } + } + static getStoredTeam(): TeamConfig | null { if (typeof window === 'undefined' || !window.localStorage) return null; try { diff --git a/src/mcp_server/,env b/src/mcp_server/,env new file mode 100644 index 000000000..be5cf0e39 --- /dev/null +++ b/src/mcp_server/,env @@ -0,0 +1,16 @@ +# MCP Server Configuration + +# Server Settings +MCP_HOST=0.0.0.0 +MCP_PORT=9000 +MCP_DEBUG=false +MCP_SERVER_NAME=MACAE MCP Server + +# Authentication Settings +MCP_ENABLE_AUTH=true +AZURE_TENANT_ID=your-tenant-id-here +AZURE_CLIENT_ID=your-client-id-here +AZURE_JWKS_URI=https://login.microsoftonline.com/your-tenant-id/discovery/v2.0/keys +AZURE_ISSUER=https://sts.windows.net/your-tenant-id/ +AZURE_AUDIENCE=api://your-client-id +DATASET_PATH=./datasets \ No newline at end of file diff --git a/src/mcp_server/mcp_server.py b/src/mcp_server/mcp_server.py index dab1e22e2..72be4cb43 100644 --- a/src/mcp_server/mcp_server.py +++ b/src/mcp_server/mcp_server.py @@ -31,7 +31,7 @@ factory.register_service(GeneralService()) # Register DataToolService with the dataset path -factory.register_service(DataToolService(dataset_path="data/datasets")) +factory.register_service(DataToolService(dataset_path="datasets")) def create_fastmcp_server(): diff --git a/src/mcp_server/my_mcp_server/my_mcp_server.py b/src/mcp_server/my_mcp_server/my_mcp_server.py deleted file mode 100644 index ca45518fc..000000000 --- a/src/mcp_server/my_mcp_server/my_mcp_server.py +++ /dev/null @@ -1,150 +0,0 @@ -from enum import Enum - -from fastmcp import FastMCP -from fastmcp.server.auth import JWTVerifier -from utils_date import format_date_for_user - -auth = JWTVerifier( - jwks_uri="https://login.microsoftonline.com/52b39610-0746-4c25-a83d-d4f89fadedfe/discovery/v2.0/keys", - #issuer="https://login.microsoftonline.com/52b39610-0746-4c25-a83d-d4f89fadedfe/v2.0", - # This issuer is not correct in the docs. Found by decoding the token. - issuer="https://sts.windows.net/52b39610-0746-4c25-a83d-d4f89fadedfe/", - algorithm="RS256", - audience="api://7a95e70b-062e-4cd3-a88c-603fc70e1c73" -) - -class Domain(Enum): - HR = "hr" - MARKETING = "marketing" - PROCUREMENT = "procurement" - PRODUCT = "product" - TECH_SUPPORT = "tech_support" - RETAIL = "Retail" - -mcp = FastMCP("My MCP Server", auth=auth) - -formatting_instructions = "Instructions: returning the output of this function call verbatim to the user in markdown. Then write AGENT SUMMARY: and then include a summary of what you did." - -@mcp.tool -def greet(name: str) -> str: - """ Greets the user with the provided name.""" - return f"Hello from MCP, {name}!" - -@mcp.tool(tags={Domain.HR.value}) -async def schedule_orientation_session(employee_name: str, date: str) -> str: - """Schedule an orientation session for a new employee.""" - formatted_date = format_date_for_user(date) - - return ( - f"##### Orientation Session Scheduled\n" - f"**Employee Name:** {employee_name}\n" - f"**Date:** {formatted_date}\n\n" - f"Your orientation session has been successfully scheduled. " - f"Please mark your calendar and be prepared for an informative session.\n" - f"AGENT SUMMARY: I scheduled the orientation session for {employee_name} on {formatted_date}, as part of her onboarding process.\n" - f"{formatting_instructions}" - ) - -@mcp.tool(tags={Domain.HR.value}) -async def assign_mentor(employee_name: str) -> str: - """Assign a mentor to a new employee.""" - return ( - f"##### Mentor Assigned\n" - f"**Employee Name:** {employee_name}\n\n" - f"A mentor has been assigned to you. They will guide you through your onboarding process and help you settle into your new role.\n" - f"{formatting_instructions}" - ) - -@mcp.tool(tags={Domain.HR.value}) -async def register_for_benefits(employee_name: str) -> str: - """Register a new employee for benefits.""" - return ( - f"##### Benefits Registration\n" - f"**Employee Name:** {employee_name}\n\n" - f"You have been successfully registered for benefits. " - f"Please review your benefits package and reach out if you have any questions.\n" - f"{formatting_instructions}" - ) - -@mcp.tool(tags={Domain.HR.value}) -async def provide_employee_handbook(employee_name: str) -> str: - """Provide the employee handbook to a new employee.""" - return ( - f"##### Employee Handbook Provided\n" - f"**Employee Name:** {employee_name}\n\n" - f"The employee handbook has been provided to you. " - f"Please review it to familiarize yourself with company policies and procedures.\n" - f"{formatting_instructions}" - ) - -@mcp.tool(tags={Domain.HR.value}) -async def initiate_background_check(employee_name: str) -> str: - """Initiate a background check for a new employee.""" - return ( - f"##### Background Check Initiated\n" - f"**Employee Name:** {employee_name}\n\n" - f"A background check has been initiated for {employee_name}. " - f"You will be notified once the check is complete.\n" - f"{formatting_instructions}" - ) - -@mcp.tool(tags={Domain.HR.value}) -async def request_id_card(employee_name: str) -> str: - """Request an ID card for a new employee.""" - return ( - f"##### ID Card Request\n" - f"**Employee Name:** {employee_name}\n\n" - f"Your request for an ID card has been successfully submitted. " - f"Please allow 3-5 business days for processing. You will be notified once your ID card is ready for pickup.\n" - f"{formatting_instructions}" - ) - -@mcp.tool(tags={Domain.HR.value}) -async def set_up_payroll(employee_name: str) -> str: - """Set up payroll for a new employee.""" - return ( - f"##### Payroll Setup\n" - f"**Employee Name:** {employee_name}\n\n" - f"Your payroll has been successfully set up. " - f"Please review your payroll details and ensure everything is correct.\n" - f"{formatting_instructions}" - ) - -@mcp.tool(tags={Domain.TECH_SUPPORT.value}) -async def send_welcome_email(employee_name: str, email_address: str) -> str: - """Send a welcome email to a new employee as part of onboarding.""" - return ( - f"##### Welcome Email Sent\n" - f"**Employee Name:** {employee_name}\n" - f"**Email Address:** {email_address}\n\n" - f"A welcome email has been successfully sent to {employee_name} at {email_address}.\n" - f"{formatting_instructions}" - ) - -@mcp.tool(tags={Domain.TECH_SUPPORT.value}) -async def set_up_office_365_account(employee_name: str, email_address: str) -> str: - """Set up an Office 365 account for an employee.""" - return ( - f"##### Office 365 Account Setup\n" - f"**Employee Name:** {employee_name}\n" - f"**Email Address:** {email_address}\n\n" - f"An Office 365 account has been successfully set up for {employee_name} at {email_address}.\n" - f"{formatting_instructions}" - ) - -@mcp.tool(tags={Domain.TECH_SUPPORT.value}) -async def configure_laptop(employee_name: str, laptop_model: str) -> str: - """Configure a laptop for a new employee.""" - return ( - f"##### Laptop Configuration\n" - f"**Employee Name:** {employee_name}\n" - f"**Laptop Model:** {laptop_model}\n\n" - f"The laptop {laptop_model} has been successfully configured for {employee_name}.\n" - f"{formatting_instructions}" - ) - -if __name__ == "__main__": - mcp.run() - -# Start as http server: -# fastmcp run my_mcp_server.py -t streamable-http -l DEBUG -p 8080 \ No newline at end of file diff --git a/src/mcp_server/my_mcp_server/utils_date.py b/src/mcp_server/my_mcp_server/utils_date.py deleted file mode 100644 index d346e3cd0..000000000 --- a/src/mcp_server/my_mcp_server/utils_date.py +++ /dev/null @@ -1,24 +0,0 @@ -import locale -from datetime import datetime -import logging -from typing import Optional - - -def format_date_for_user(date_str: str, user_locale: Optional[str] = None) -> str: - """ - Format date based on user's desktop locale preference. - - Args: - date_str (str): Date in ISO format (YYYY-MM-DD). - user_locale (str, optional): User's locale string, e.g., 'en_US', 'en_GB'. - - Returns: - str: Formatted date respecting locale or raw date if formatting fails. - """ - try: - date_obj = datetime.strptime(date_str, "%Y-%m-%d") - locale.setlocale(locale.LC_TIME, user_locale or '') - return date_obj.strftime("%B %d, %Y") - except Exception as e: - logging.warning(f"Date formatting failed for '{date_str}': {e}") - return date_str diff --git a/src/mcp_server/services/data_tool_service.py b/src/mcp_server/services/data_tool_service.py index e22650a1f..d323efb4b 100644 --- a/src/mcp_server/services/data_tool_service.py +++ b/src/mcp_server/services/data_tool_service.py @@ -29,6 +29,11 @@ def __init__(self, dataset_path: str): self.dataset_path = dataset_path self.allowed_files = set(ALLOWED_FILES) + @property + def tool_count(self) -> int: + """Return the number of tools provided by this service.""" + return 2 + def _find_file(self, filename: str) -> str: """ Searches recursively within the dataset_path for an exact filename match (case-sensitive).