diff --git a/web/backend-fastapi/app/__init__.py b/web/backend-fastapi/app/__init__.py new file mode 100644 index 000000000..52d7dc265 --- /dev/null +++ b/web/backend-fastapi/app/__init__.py @@ -0,0 +1,4 @@ +# FastMCP Backend - Main App Package +""" +HMVC-based FastAPI backend for FastMCP +""" diff --git a/web/backend-fastapi/app/controllers/agent.py b/web/backend-fastapi/app/controllers/agent.py deleted file mode 100644 index d405a8445..000000000 --- a/web/backend-fastapi/app/controllers/agent.py +++ /dev/null @@ -1,198 +0,0 @@ -from fastapi import APIRouter, Response -from pydantic import BaseModel -from fastmcp import Client -from fastmcp.exceptions import ToolError -from ..mcp_agents import agents_mcp -from ..models import neo4j -from ..models.azure import AzureAI -import os -import json -from fastapi.logger import logger - -router = APIRouter() - - -# Currently unused -class AgentHistory(BaseModel): - prompt: str - response: str - - -class AgentRequest(BaseModel): - prompt: str - # chatHistory: List[AgentHistory] commented out until needed - - -def get_database_schema(labels: list[str] = []): - """Get the database schema dynamically from Neo4j.""" - neo4j_worker = neo4j.neo4j() - - # Get comprehensive schema information - schema_query = """ - CALL apoc.meta.schema() - YIELD value - RETURN value - """ - - schema = neo4j_worker.query(schema_query) - - # Filter the schema to only include desired node labels and relationship types - if len(labels) > 0: - schema_obj = schema[0].get("value", {}) - # Only keep nodes with specified labels - filtered_nodes = { - label: properties - for label, properties in schema_obj.items() - if label in labels - } - schema = filtered_nodes - # Format the schema information - schema_info = f""" - COMPREHENSIVE DATABASE SCHEMA: - {schema} - - This schema shows: - - Node labels with their properties and types - - Relationship types and their directions - - Property constraints and indexes - - Cardinality information - - Please note that the field document_title actually contains the title of the document. - Therefore, if I wanted information about a specific document, such as the Motor Vehicle Act, I would search in the document_title field. - Use this information to construct accurate Cypher queries. - """ - return schema_info - - -def get_initial_context(schema_info): - """Set database schema information as a system message.""" - schema_message = f""" - You are an AI assistant that helps users answer questions about Laws in British Columbia. - You must utilize the provided list of tools to build enough context to answer the user's question. - Keep your responses concise and relevant to the user's question. - - For explicit searches with cypher queries, this is the database schema information you need to know: - {schema_info} - Utilize this schema to construct accurate Cypher queries when needed. - Always specify the node label that you want to search on, as this schema may not contain all labels in the database. - - Tools may be used more than once within a single conversation. - You can use the tools to search for information, but you cannot modify the database. - """ - return schema_message - - -@router.post("/agent/") -async def agentic_chat(request: AgentRequest = None): - if request is None: - return Response( - content="No request body provided", - status_code=400, - ) - - if not isinstance(request, AgentRequest): - return Response( - content="Input should be a valid AgentRequest object", - status_code=400, - ) - - initial_question = request.prompt - - # Azure Configuration - endpoint = os.getenv("AZURE_AI_ENDPOINT", "") - key = os.getenv("AZURE_AI_KEY", "") - azure = AzureAI(endpoint, key) - - # Max iterations for model to loop if insufficient context is provided - max_iterations = 10 - - try: - # Establish MCP client connection - client = Client(agents_mcp) - async with client: - raw_tools = await client.list_tools() - # Convert tools to a format compatible with Azure - tools = [ - { - "type": "function", - "function": { - "name": tool.name, - "description": tool.description, - "parameters": tool.inputSchema, - }, - } - for tool in raw_tools - ] - # Supply with database schema first - schema = get_database_schema(["v3"]) - azure.add_system_message(get_initial_context(schema)) - - # Continue with the conversation - response = azure.call_agent_with_history(initial_question, tools=tools) - - finish_reason = response.get("finish_reason") - current_iteration = 0 - # Process the response until we reach a stopping condition - while finish_reason != "stop" and current_iteration < max_iterations: - if finish_reason == "tool_calls": - tool_calls = response.get("message").get("tool_calls") - for tool_call in tool_calls: - tool_call_id = tool_call.get("id") # Get the tool call ID - tool_name = tool_call.get("function").get("name") - arguments_str = tool_call.get("function").get("arguments") - - # Parse the JSON string to get a Python object - try: - arguments = json.loads(arguments_str) - logger.info( - f"Calling tool: {tool_name} with arguments: {arguments}", - ) - except json.JSONDecodeError as e: - logger.error( - f"Error parsing arguments: {e}", - ) - continue - # Handle tool execution with error handling - try: - result = await client.call_tool(tool_name, arguments) - logger.info(f"Tool {tool_name} returned: {result}") - # Add the successful tool response - azure.add_tool_response(tool_call_id, result) - except ToolError as tool_error: - error_message = ( - f"Tool error in {tool_name}: {str(tool_error)}" - ) - logger.error(error_message) - # Pass the error back to the agent so it can adjust - azure.add_tool_response( - tool_call_id, {"error": error_message} - ) - except Exception as e: - error_message = f"Unexpected error in {tool_name}: {str(e)}" - logger.error(error_message) - # Pass the error back to the agent - azure.add_tool_response( - tool_call_id, {"error": error_message} - ) - - # Continue the conversation without adding a new user message - response = azure.call_agent_with_history( - "", tools=tools, role="user" - ) - finish_reason = response.get("finish_reason") - current_iteration += 1 - elif finish_reason == "length": - logger.warning( - "Input length exceeded the limit. Stopping further processing." - ) - break - else: - logger.warning("Unexpected finish reason:", finish_reason) - break - response_text = response.get("message").get("content", "").strip() - # TODO: Filter out tool calls before returning? - return {"response": response_text, "history": azure.history} - - except Exception as e: - logger.error("An error occurred during agent processing:", exc_info=True) - raise e diff --git a/web/backend-fastapi/app/controllers/analytics.py b/web/backend-fastapi/app/controllers/analytics.py deleted file mode 100644 index 90e8520ae..000000000 --- a/web/backend-fastapi/app/controllers/analytics.py +++ /dev/null @@ -1,30 +0,0 @@ -from fastapi import APIRouter, Request, HTTPException -from app.services.analytics_service import AnalyticsService - -router = APIRouter() -analytics_service = AnalyticsService() - -# post request to save initial analytics -@router.post("/saveAnalytics") -async def save_analytics(request: Request): - try: - analytics_data = await request.json() - session_id = analytics_data.get('sessionId') - - if not session_id: - raise HTTPException(status_code=400, detail="sessionId is required in the analytics data") - - message = await analytics_service.save_analytics(analytics_data) - return {"message": message} - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - -# patch request to update analytics -@router.patch("/updateAnalytics") -async def update_analytics(request: Request): - try: - updates = await request.json() - message = await analytics_service.update_analytics(updates) - return {"message": message} - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) diff --git a/web/backend-fastapi/app/controllers/chat_RAG.py b/web/backend-fastapi/app/controllers/chat_RAG.py deleted file mode 100644 index dc2b4dc03..000000000 --- a/web/backend-fastapi/app/controllers/chat_RAG.py +++ /dev/null @@ -1,58 +0,0 @@ -from collections import defaultdict -from fastapi import APIRouter, Body, HTTPException -from app.models import neo4j, trulens, rag -from app.models.rag_states import ( - get_state_map, - StateType, - default_state, - get_states_for_frontend, -) -from ..common.chat_objects import ChatRequest - -router = APIRouter() -kg = None -tru = None - - -@router.post("/chat/") -async def chat(chat_request: ChatRequest = Body(ChatRequest)): - chat_history = [] - # Ensure the input is a valid dictionary or object - if not isinstance(chat_request, ChatRequest): - raise ValueError("Input should be a valid ChatRequest object") - # Global variables initialization - global kg, tru - rag_fn = rag.get_full_rag() - if kg is None: - kg = neo4j.neo4j() - if tru is None: - tru = trulens.connect_trulens() - - # Determine which state to use - state_map = get_state_map() - state_entry = state_map.get(chat_request.key, default_state) - if state_entry is None: - raise HTTPException(status_code=404, detail="RAG state not found") - state = state_entry.get("state") - tru_rag = trulens.tru_rag(rag_fn, state.trulens_id) - - with tru_rag as recording: - # Key used to determine class called for query - if state.type == StateType.INTERNAL: - # For internal operations with Neo4j - responses = rag_fn.query( - chat_request.prompt, - chat_history, - kg, - state, - ) - else: - # For external sources, like Azure - responses = [] - record = recording.get() - return {"responses": responses, "recording": record.record_id} - - -@router.get("/chat/states/") -async def get_states(): - return get_states_for_frontend() diff --git a/web/backend-fastapi/app/controllers/feedback.py b/web/backend-fastapi/app/controllers/feedback.py deleted file mode 100644 index 4e406cc22..000000000 --- a/web/backend-fastapi/app/controllers/feedback.py +++ /dev/null @@ -1,98 +0,0 @@ -from fastapi import APIRouter, Form -from app.models import trulens, rag, neo4j -from langchain_community.embeddings import HuggingFaceEmbeddings -import logging - -router = APIRouter() - -kg = None -tru = None -embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") - - -@router.post("/submit/") -async def submit_question(prompt: str = Form(...)): - # Add appropriate imports - - # Global variables initialization - global kg, tru, APP_ID, embeddings - rag_fn = rag.get_top_k() - if kg is None: - kg = neo4j.neo4j() - if tru is None: - tru = trulens.connect_trulens() - tru_rag = trulens.tru_rag(rag_fn) - with tru_rag as recording: - responses = rag_fn.query(prompt, None, embeddings, kg) - record = recording.get() - # Process question prompt - return {"responses": responses, "recording": record.record_id} - - -@router.post("/feedback/") -async def feedback( - feedback: str = Form(...), - index: str = Form(...), - recording_id: str = Form(...), - bulk: bool = Form(False), - trulens_id: str = Form("unknown"), -): - # Global variables initialization - global tru - if tru is None: - tru = trulens.connect_trulens() - - # Process feedback - rows = trulens.process_feedback( - tru, trulens_id, index, feedback, recording_id, bulk - ) - if rows: - return {"status": True, "rows": rows} - else: - return {"status": False} - - -@router.post("/feedbackrag/") -async def feedbackrag( - feedback: str = Form(...), - recording_id: str = Form(...), - comment: str = Form(None), - trulens_id: str = Form("unknown"), -): - global tru - if tru is None: - tru = trulens.connect_trulens() - try: - rows = trulens.process_rag_feedback( - feedback, trulens_id, recording_id, tru, comment - ) - if rows: - return { - "status": True, - "message": "Feedback submitted successfully", - "rows": rows, - } - return {"status": False, "message": "No feedback data found"} - except Exception as e: - logging.error(f"Error processing feedback: {str(e)}") - return { - "status": False, - "message": "An internal error has occurred. Please try again later.", - } - - -@router.get("/fetch_feedback/") -async def fetch_all_feedback(trulens_id: str = "unknown"): - # Add appropriate imports - - # Global variables initialization - global tru - if tru is None: - tru = trulens.connect_trulens() - - # Fetch all feedback - rows = trulens.fetch_all_feedback(trulens_id) - if rows: - return {"status": True, "rows": rows} - else: - return {"status": False} diff --git a/web/backend-fastapi/app/controllers/login.py b/web/backend-fastapi/app/controllers/login.py deleted file mode 100644 index b4660e271..000000000 --- a/web/backend-fastapi/app/controllers/login.py +++ /dev/null @@ -1,44 +0,0 @@ -from fastapi import APIRouter, Form -from app.models import neo4j, trulens, rag -from fastapi.responses import JSONResponse -import requests -import json -import os - -router = APIRouter() - -@router.get("/") -async def read_main(): - return {"msg": "Hello World"} - -@router.get("/login") -async def login(): - return JSONResponse(content={"valid": True}) - -@router.get("/validate") -async def validate(): - return JSONResponse(content={"status": "ok"}) - - -@router.post("/refreshtoken/") -async def refresh_token(refresh_token: str = Form(...)): - '''Refresh the token for the user''' - endpoint = 'https://dev.loginproxy.gov.bc.ca/auth/realms/standard/protocol/openid-connect/token' - headers = { - 'Content-Type': 'application/x-www-form-urlencoded' - } - print(refresh_token) - data = { - 'client_id': 'a-i-pathfinding-project-5449', - 'grant_type': 'refresh_token', - 'refresh_token': refresh_token - } - response = requests.post(endpoint, headers=headers, data=data) - if response.status_code != 200: - raise HTTPException(status_code=response.status_code, detail="Token refresh failed") - return response.json() - -@router.get("/health") -def health_check(): - return {"status": "healthy"} - diff --git a/web/backend-fastapi/app/dependencies.py b/web/backend-fastapi/app/dependencies.py index 727a706a9..e4582c03d 100644 --- a/web/backend-fastapi/app/dependencies.py +++ b/web/backend-fastapi/app/dependencies.py @@ -2,4 +2,4 @@ from app.middleware.authentication import AuthenticationMiddleware async def get_user_info(user_info: dict = Depends(AuthenticationMiddleware)): - return user_info \ No newline at end of file + return user_info diff --git a/web/backend-fastapi/app/keycloack_config.py b/web/backend-fastapi/app/keycloack_config.py index 9a7b30894..ddea79d38 100644 --- a/web/backend-fastapi/app/keycloack_config.py +++ b/web/backend-fastapi/app/keycloack_config.py @@ -2,4 +2,4 @@ "server_url": "https://dev.loginproxy.gov.bc.ca/auth", "realm_name": "standard", "client_id": "a-i-pathfinding-project-5449", -} \ No newline at end of file +} diff --git a/web/backend-fastapi/app/mcp_agents/__init__.py b/web/backend-fastapi/app/mcp_agents/__init__.py deleted file mode 100644 index cee9d5660..000000000 --- a/web/backend-fastapi/app/mcp_agents/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -from fastmcp import FastMCP - -# 1. Import the individual MCP server instances from agent files -from app.mcp_agents.semantic_search import semantic_search_mcp -from app.mcp_agents.community_detection import community_detection_mcp -from app.mcp_agents.explicit_search import explicit_search_mcp - -# 2. Create the main, aggregated MCP server that clients will connect to -agents_mcp = FastMCP(name="MainAgentServer") - -# 3. Mount each individual agent server onto the main one with a prefix. -# This makes their tools available under names like 'semanticsearch_search' -agents_mcp.mount(semantic_search_mcp, prefix="semanticsearch") -# agents_mcp.mount(community_detection_mcp, prefix="communitydetection") # Disabled for now -agents_mcp.mount(explicit_search_mcp, prefix="explicitsearch") diff --git a/web/backend-fastapi/app/modules/__init__.py b/web/backend-fastapi/app/modules/__init__.py new file mode 100644 index 000000000..946a6ab25 --- /dev/null +++ b/web/backend-fastapi/app/modules/__init__.py @@ -0,0 +1 @@ +# HMVC Modules Package diff --git a/web/backend-fastapi/app/modules/agent/__init__.py b/web/backend-fastapi/app/modules/agent/__init__.py new file mode 100644 index 000000000..c24331100 --- /dev/null +++ b/web/backend-fastapi/app/modules/agent/__init__.py @@ -0,0 +1,3 @@ +from .controllers.agent_controller import router + +__all__ = ["router"] diff --git a/web/backend-fastapi/app/modules/agent/agents/__init__.py b/web/backend-fastapi/app/modules/agent/agents/__init__.py new file mode 100644 index 000000000..0335f10ac --- /dev/null +++ b/web/backend-fastapi/app/modules/agent/agents/__init__.py @@ -0,0 +1,71 @@ +from fastmcp import FastMCP +from typing import Dict, List, Optional +import os +import importlib.util + + +class AgentRegistry: + """Central registry for all MCP agents with auto-discovery capabilities""" + + def __init__(self): + self._agents: Dict[str, FastMCP] = {} + self._orchestrator: Optional[FastMCP] = None + self._capabilities: Dict[str, List[str]] = {} + self._discover_agents() + + def _discover_agents(self): + """Auto-discover all agents in subdirectories""" + # Import search agents + from .search.semantic_search import semantic_search_mcp + from .search.explicit_search import explicit_search_mcp + from .search.community_detection import community_detection_mcp + + # Register search agents + self._agents["semantic_search"] = semantic_search_mcp + self._agents["explicit_search"] = explicit_search_mcp + self._agents["community_detection"] = community_detection_mcp + + # Define capabilities + self._capabilities["search"] = ["semantic_search", "explicit_search", "community_detection"] + + def get_agent(self, name: str) -> Optional[FastMCP]: + """Get a specific agent by name""" + return self._agents.get(name) + + def get_agents_by_capability(self, capability: str) -> List[FastMCP]: + """Get all agents that have a specific capability""" + agent_names = self._capabilities.get(capability, []) + return [self._agents[name] for name in agent_names if name in self._agents] + + def list_agents(self) -> List[str]: + """List all available agent names""" + return list(self._agents.keys()) + + def list_capabilities(self) -> List[str]: + """List all available capabilities""" + return list(self._capabilities.keys()) + + def create_main_server(self) -> FastMCP: + """Create the main aggregated MCP server""" + main_server = FastMCP(name="MainAgentServer") + + # Mount search agents + for agent_name, agent in self._agents.items(): + if agent_name in self._capabilities.get("search", []): + prefix = agent_name.replace("_", "") + main_server.mount(agent, prefix=prefix) + + return main_server + + def get_combined_mcp(self) -> FastMCP: + """Get the combined MCP server with all agents""" + return self.create_main_server() + + +# Create global registry instance +agent_registry = AgentRegistry() + +# Create the main server for backward compatibility +agents_mcp = agent_registry.create_main_server() + +__all__ = ["agent_registry", "agents_mcp"] diff --git a/web/backend-fastapi/app/modules/agent/agents/orchestrator/__init__.py b/web/backend-fastapi/app/modules/agent/agents/orchestrator/__init__.py new file mode 100644 index 000000000..53c814ccb --- /dev/null +++ b/web/backend-fastapi/app/modules/agent/agents/orchestrator/__init__.py @@ -0,0 +1,4 @@ +# Orchestrator Agent Package +from .orchestrator_agent import orchestrator_mcp + +__all__ = ["orchestrator_mcp"] diff --git a/web/backend-fastapi/app/modules/agent/agents/orchestrator/orchestrator_agent.py b/web/backend-fastapi/app/modules/agent/agents/orchestrator/orchestrator_agent.py new file mode 100644 index 000000000..482e24243 --- /dev/null +++ b/web/backend-fastapi/app/modules/agent/agents/orchestrator/orchestrator_agent.py @@ -0,0 +1,185 @@ +from fastmcp import FastMCP +from typing import Dict, List, Any, Optional +import json +from fastapi.logger import logger + +# Import registry to access sub-agents +from .. import agent_registry + +orchestrator_mcp = FastMCP(name="OrchestratorAgent") + + +@orchestrator_mcp.tool( + name="orchestrate_task", + description="""High-level orchestrator that can deploy and coordinate multiple specialized agents to complete complex tasks. + + This agent can: + - Analyze task requirements and determine which specialized agents are needed + - Coordinate multiple agents to work together on complex queries + - Combine results from different agents into cohesive responses + - Route tasks to the most appropriate specialized agents + """ +) +def orchestrate_task( + task: str, + required_capabilities: Optional[List[str]] = None, + execution_strategy: str = "parallel" +) -> Dict[str, Any]: + """ + Orchestrate a complex task using multiple specialized agents + + Args: + task: The high-level task to complete + required_capabilities: List of capabilities needed (e.g., ["search", "analysis"]) + execution_strategy: How to execute subtasks ("parallel", "sequential", "adaptive") + + Returns: + Dict containing orchestrated response and execution details + """ + logger.info(f"Orchestrator received task: {task}") + + # If no capabilities specified, analyze the task to determine what's needed + if not required_capabilities: + required_capabilities = _analyze_task_requirements(task) + + logger.info(f"Required capabilities: {required_capabilities}") + + # Get available agents for each capability + available_agents = {} + for capability in required_capabilities: + agents = agent_registry.get_agents_by_capability(capability) + available_agents[capability] = [agent.name for agent in agents] + + # Create execution plan + execution_plan = _create_execution_plan(task, required_capabilities, execution_strategy) + + # For now, return a structured response indicating orchestration capability + # Future implementation will actually execute the plan + response = { + "orchestrator_response": f"Task '{task}' analyzed and ready for execution", + "required_capabilities": required_capabilities, + "available_agents": available_agents, + "execution_plan": execution_plan, + "execution_strategy": execution_strategy, + "status": "planned", + "next_steps": [ + "Execute sub-tasks using specialized agents", + "Combine and synthesize results", + "Return unified response" + ] + } + + logger.info(f"Orchestration plan created: {response}") + return response + + +@orchestrator_mcp.tool( + name="list_agent_capabilities", + description="List all available agent capabilities and the agents that provide them" +) +def list_agent_capabilities() -> Dict[str, Any]: + """ + List all available capabilities and agents + + Returns: + Dict containing capabilities and agent mappings + """ + capabilities = {} + for capability in agent_registry.list_capabilities(): + agents = agent_registry.get_agents_by_capability(capability) + capabilities[capability] = [agent.name for agent in agents] + + return { + "available_capabilities": capabilities, + "total_agents": len(agent_registry.list_agents()), + "agent_list": agent_registry.list_agents() + } + + +@orchestrator_mcp.tool( + name="get_agent_status", + description="Get the status and capabilities of a specific agent" +) +def get_agent_status(agent_name: str) -> Dict[str, Any]: + """ + Get detailed information about a specific agent + + Args: + agent_name: Name of the agent to query + + Returns: + Dict containing agent status and capabilities + """ + agent = agent_registry.get_agent(agent_name) + + if not agent: + return { + "status": "not_found", + "message": f"Agent '{agent_name}' not found", + "available_agents": agent_registry.list_agents() + } + + return { + "status": "active", + "agent_name": agent_name, + "agent_type": agent.name, + "capabilities": _get_agent_capabilities(agent_name), + "tools_available": len(getattr(agent, 'tools', [])) if hasattr(agent, 'tools') else 0 + } + + +def _analyze_task_requirements(task: str) -> List[str]: + """ + Analyze a task to determine what capabilities are needed + + This is a simple implementation - can be enhanced with AI/ML + """ + task_lower = task.lower() + capabilities = [] + + # Check for search-related keywords + search_keywords = ["search", "find", "look", "query", "retrieve", "get information"] + if any(keyword in task_lower for keyword in search_keywords): + capabilities.append("search") + + # Add more capability detection logic here + # if any(keyword in task_lower for keyword in analysis_keywords): + # capabilities.append("analysis") + + # Default to search if no specific capability detected + if not capabilities: + capabilities.append("search") + + return capabilities + + +def _create_execution_plan(task: str, capabilities: List[str], strategy: str) -> List[str]: + """ + Create a detailed execution plan for the task + """ + plan = [] + + for capability in capabilities: + agents = agent_registry.get_agents_by_capability(capability) + if agents: + if strategy == "parallel": + plan.append(f"Execute {capability} tasks in parallel using: {[a.name for a in agents]}") + elif strategy == "sequential": + plan.append(f"Execute {capability} tasks sequentially using: {[a.name for a in agents]}") + else: # adaptive + plan.append(f"Adaptively execute {capability} tasks using best available agent") + + plan.append("Synthesize results from all agents") + plan.append("Return unified response to user") + + return plan + + +def _get_agent_capabilities(agent_name: str) -> List[str]: + """ + Get the capabilities provided by a specific agent + """ + for capability, agent_names in agent_registry._capabilities.items(): + if agent_name in agent_names: + return [capability] + return [] diff --git a/web/backend-fastapi/app/modules/agent/agents/search/__init__.py b/web/backend-fastapi/app/modules/agent/agents/search/__init__.py new file mode 100644 index 000000000..54b50fe31 --- /dev/null +++ b/web/backend-fastapi/app/modules/agent/agents/search/__init__.py @@ -0,0 +1,6 @@ +# Search Agents Package +from .semantic_search import semantic_search_mcp +from .explicit_search import explicit_search_mcp +from .community_detection import community_detection_mcp + +__all__ = ["semantic_search_mcp", "explicit_search_mcp", "community_detection_mcp"] diff --git a/web/backend-fastapi/app/mcp_agents/community_detection.py b/web/backend-fastapi/app/modules/agent/agents/search/community_detection.py similarity index 74% rename from web/backend-fastapi/app/mcp_agents/community_detection.py rename to web/backend-fastapi/app/modules/agent/agents/search/community_detection.py index aa67ded7f..bbbd619e4 100644 --- a/web/backend-fastapi/app/mcp_agents/community_detection.py +++ b/web/backend-fastapi/app/modules/agent/agents/search/community_detection.py @@ -3,7 +3,10 @@ community_detection_mcp = FastMCP(name="CommunityDetectionAgent") -@community_detection_mcp.tool +@community_detection_mcp.tool( + name="community_detection", + description="Performs community detection analysis on the graph database to find related document clusters" +) def detect(query: str) -> dict: """ Performs a community search and returns relevant results. diff --git a/web/backend-fastapi/app/mcp_agents/explicit_search.py b/web/backend-fastapi/app/modules/agent/agents/search/explicit_search.py similarity index 98% rename from web/backend-fastapi/app/mcp_agents/explicit_search.py rename to web/backend-fastapi/app/modules/agent/agents/search/explicit_search.py index 3014fbf3f..43d2aa225 100644 --- a/web/backend-fastapi/app/mcp_agents/explicit_search.py +++ b/web/backend-fastapi/app/modules/agent/agents/search/explicit_search.py @@ -1,6 +1,6 @@ from fastmcp import FastMCP from fastmcp.exceptions import ToolError -from ..models.neo4j import neo4j +from app.shared.models.neo4j import neo4j explicit_search_mcp = FastMCP(name="ExplicitSearchAgent") diff --git a/web/backend-fastapi/app/mcp_agents/semantic_search.py b/web/backend-fastapi/app/modules/agent/agents/search/semantic_search.py similarity index 90% rename from web/backend-fastapi/app/mcp_agents/semantic_search.py rename to web/backend-fastapi/app/modules/agent/agents/search/semantic_search.py index 99814693f..16e3fcaaa 100644 --- a/web/backend-fastapi/app/mcp_agents/semantic_search.py +++ b/web/backend-fastapi/app/modules/agent/agents/search/semantic_search.py @@ -1,7 +1,7 @@ from fastmcp import FastMCP -from ..models.rag import get_full_rag -from ..models.neo4j import neo4j -from ..models.rag_states import get_state_map +from app.shared.models.rag import get_full_rag +from app.shared.models.neo4j import neo4j +from app.shared.models.rag_states import get_state_map semantic_search_mcp = FastMCP(name="SemanticSearchAgent") states = get_state_map() diff --git a/web/backend-fastapi/app/modules/agent/controllers/agent_controller.py b/web/backend-fastapi/app/modules/agent/controllers/agent_controller.py new file mode 100644 index 000000000..ebbbf0ba4 --- /dev/null +++ b/web/backend-fastapi/app/modules/agent/controllers/agent_controller.py @@ -0,0 +1,50 @@ +from fastapi import APIRouter +from ..services.agent_service import AgentService +from ..services.orchestrator_service import OrchestratorService +from ..views.agent_views import AgentViews +from ..models.agent_model import AgentRequest, OrchestrationRequest + + +class AgentController: + def __init__(self): + self.router = APIRouter() + self.agent_service = AgentService() + self.orchestrator_service = OrchestratorService() + self.agent_views = AgentViews() + self._setup_routes() + + def _setup_routes(self): + """Setup all agent routes""" + + @self.router.post("/agent/") + async def agentic_chat(request: AgentRequest = None): + result = await self.agent_service.process_agent_chat(request) + return self.agent_views.agent_response(result) + + @self.router.post("/agent/orchestrate/") + async def orchestrate_task(request: OrchestrationRequest): + """Orchestrate complex multi-agent tasks""" + agent_request = AgentRequest(prompt=request.task) + result = await self.orchestrator_service.orchestrate_complex_task( + agent_request, + request.required_capabilities, + request.execution_strategy + ) + return self.agent_views.agent_response(result) + + @self.router.get("/agent/capabilities/") + async def get_orchestration_capabilities(): + """Get available orchestration capabilities and agents""" + result = await self.orchestrator_service.get_orchestration_capabilities() + return result + + @self.router.get("/agent/status/{agent_name}") + async def get_agent_status(agent_name: str): + """Get the status of a specific agent""" + result = await self.orchestrator_service.get_agent_status(agent_name) + return result + + +# Create router instance +agent_controller = AgentController() +router = agent_controller.router diff --git a/web/backend-fastapi/app/modules/agent/models/agent_model.py b/web/backend-fastapi/app/modules/agent/models/agent_model.py new file mode 100644 index 000000000..469903cb7 --- /dev/null +++ b/web/backend-fastapi/app/modules/agent/models/agent_model.py @@ -0,0 +1,54 @@ +from pydantic import BaseModel +from typing import List, Optional, Dict, Any + + +class AgentHistory(BaseModel): + prompt: str + response: str + + +class AgentRequest(BaseModel): + prompt: str + # chatHistory: List[AgentHistory] commented out until needed + + +class AgentResponse(BaseModel): + response: str + history: List[Dict[str, Any]] + + +class DatabaseSchema(BaseModel): + schema_info: str + labels: List[str] + + +# New orchestration models +class AgentCapability(BaseModel): + name: str + description: str + agents: List[str] + + +class OrchestrationRequest(BaseModel): + task: str + required_capabilities: Optional[List[str]] = None + execution_strategy: str = "adaptive" + priority: Optional[str] = "normal" + + +class OrchestrationResponse(BaseModel): + orchestrator_response: str + required_capabilities: List[str] + available_agents: Dict[str, List[str]] + execution_plan: List[str] + execution_strategy: str + status: str + + +class AgentStatusResponse(BaseModel): + status: str + agent_name: Optional[str] = None + agent_type: Optional[str] = None + capabilities: Optional[List[str]] = None + tools_available: Optional[int] = None + message: Optional[str] = None diff --git a/web/backend-fastapi/app/modules/agent/services/agent_service.py b/web/backend-fastapi/app/modules/agent/services/agent_service.py new file mode 100644 index 000000000..b5cf22465 --- /dev/null +++ b/web/backend-fastapi/app/modules/agent/services/agent_service.py @@ -0,0 +1,190 @@ +import os +import json +from fastapi import HTTPException, Response +from fastapi.logger import logger +from fastmcp import Client +from fastmcp.exceptions import ToolError + +from ..agents import agent_registry +from app.shared.models import neo4j +from app.shared.models.azure import AzureAI +from ..models.agent_model import AgentRequest, AgentResponse, DatabaseSchema + + +class AgentService: + def __init__(self): + self.keycloak_endpoint = os.getenv("AZURE_AI_ENDPOINT", "") + self.azure_key = os.getenv("AZURE_AI_KEY", "") + self.max_iterations = 10 + + def get_database_schema(self, labels: list[str] = []) -> DatabaseSchema: + """Get the database schema dynamically from Neo4j.""" + neo4j_worker = neo4j.neo4j() + + # Get comprehensive schema information + schema_query = """ + CALL apoc.meta.schema() + YIELD value + RETURN value + """ + + schema = neo4j_worker.query(schema_query) + + # Filter the schema to only include desired node labels and relationship types + if len(labels) > 0: + schema_obj = schema[0].get("value", {}) + # Only keep nodes with specified labels + filtered_nodes = { + label: properties + for label, properties in schema_obj.items() + if label in labels + } + schema = filtered_nodes + + # Format the schema information + schema_info = f""" + COMPREHENSIVE DATABASE SCHEMA: + {schema} + + This schema shows: + - Node labels with their properties and types + - Relationship types and their directions + - Property constraints and indexes + - Cardinality information + + Please note that the field document_title actually contains the title of the document. + Therefore, if I wanted information about a specific document, such as the Motor Vehicle Act, I would search in the document_title field. + Use this information to construct accurate Cypher queries. + """ + + return DatabaseSchema(schema_info=schema_info, labels=labels) + + def get_initial_context(self, schema_info: str) -> str: + """Set database schema information as a system message.""" + schema_message = f""" + You are an AI assistant that helps users answer questions about Laws in British Columbia. + You must utilize the provided list of tools to build enough context to answer the user's question. + Keep your responses concise and relevant to the user's question. + + For explicit searches with cypher queries, this is the database schema information you need to know: + {schema_info} + Utilize this schema to construct accurate Cypher queries when needed. + Always specify the node label that you want to search on, as this schema may not contain all labels in the database. + + Tools may be used more than once within a single conversation. + You can use the tools to search for information, but you cannot modify the database. + """ + return schema_message + + async def process_agent_chat(self, request: AgentRequest) -> AgentResponse: + """Process agent chat request""" + if request is None: + raise HTTPException(status_code=400, detail="No request body provided") + + if not isinstance(request, AgentRequest): + raise HTTPException( + status_code=400, + detail="Input should be a valid AgentRequest object" + ) + + initial_question = request.prompt + + # Azure Configuration + azure = AzureAI(self.keycloak_endpoint, self.azure_key) + + try: + # Get combined MCP from agent registry + combined_mcp = agent_registry.get_combined_mcp() + # Establish MCP client connection + client = Client(combined_mcp) + async with client: + raw_tools = await client.list_tools() + # Convert tools to a format compatible with Azure + tools = [ + { + "type": "function", + "function": { + "name": tool.name, + "description": tool.description, + "parameters": tool.inputSchema, + }, + } + for tool in raw_tools + ] + + # Supply with database schema first + schema = self.get_database_schema(["v3"]) + azure.add_system_message(self.get_initial_context(schema.schema_info)) + + # Continue with the conversation + response = azure.call_agent_with_history(initial_question, tools=tools) + + finish_reason = response.get("finish_reason") + current_iteration = 0 + + # Process the response until we reach a stopping condition + while finish_reason != "stop" and current_iteration < self.max_iterations: + if finish_reason == "tool_calls": + tool_calls = response.get("message").get("tool_calls") + for tool_call in tool_calls: + tool_call_id = tool_call.get("id") # Get the tool call ID + tool_name = tool_call.get("function").get("name") + arguments_str = tool_call.get("function").get("arguments") + + # Parse the JSON string to get a Python object + try: + arguments = json.loads(arguments_str) + logger.info( + f"Calling tool: {tool_name} with arguments: {arguments}", + ) + except json.JSONDecodeError as e: + logger.error( + f"Error parsing arguments: {e}", + ) + continue + + # Handle tool execution with error handling + try: + result = await client.call_tool(tool_name, arguments) + logger.info(f"Tool {tool_name} returned: {result}") + # Add the successful tool response + azure.add_tool_response(tool_call_id, result) + except ToolError as tool_error: + error_message = ( + f"Tool error in {tool_name}: {str(tool_error)}" + ) + logger.error(error_message) + # Pass the error back to the agent so it can adjust + azure.add_tool_response( + tool_call_id, {"error": error_message} + ) + except Exception as e: + error_message = f"Unexpected error in {tool_name}: {str(e)}" + logger.error(error_message) + # Pass the error back to the agent + azure.add_tool_response( + tool_call_id, {"error": error_message} + ) + + # Continue the conversation without adding a new user message + response = azure.call_agent_with_history( + "", tools=tools, role="user" + ) + finish_reason = response.get("finish_reason") + current_iteration += 1 + elif finish_reason == "length": + logger.warning( + "Input length exceeded the limit. Stopping further processing." + ) + break + else: + logger.warning("Unexpected finish reason:", finish_reason) + break + + response_text = response.get("message").get("content", "").strip() + # TODO: Filter out tool calls before returning? + return AgentResponse(response=response_text, history=azure.history) + + except Exception as e: + logger.error("An error occurred during agent processing:", exc_info=True) + raise e diff --git a/web/backend-fastapi/app/modules/agent/services/orchestrator_service.py b/web/backend-fastapi/app/modules/agent/services/orchestrator_service.py new file mode 100644 index 000000000..c72ce15db --- /dev/null +++ b/web/backend-fastapi/app/modules/agent/services/orchestrator_service.py @@ -0,0 +1,170 @@ +import os +import json +from typing import Dict, List, Any, Optional +from fastapi import HTTPException +from fastapi.logger import logger +from fastmcp import Client +from fastmcp.exceptions import ToolError + +from ..agents import agent_registry +from ..agents.orchestrator.orchestrator_agent import orchestrator_mcp +from app.shared.models.azure import AzureAI +from ..models.agent_model import AgentRequest, AgentResponse + + +class OrchestratorService: + """High-level orchestration service for managing complex multi-agent tasks""" + + def __init__(self): + self.azure_endpoint = os.getenv("AZURE_AI_ENDPOINT", "") + self.azure_key = os.getenv("AZURE_AI_KEY", "") + self.max_iterations = 10 + + async def orchestrate_complex_task( + self, + request: AgentRequest, + capabilities: Optional[List[str]] = None, + strategy: str = "adaptive" + ) -> AgentResponse: + """ + Orchestrate a complex task using multiple agents + + Args: + request: The agent request containing the task + capabilities: Optional list of required capabilities + strategy: Execution strategy ("parallel", "sequential", "adaptive") + """ + logger.info(f"Orchestrating complex task: {request.prompt}") + + # Azure Configuration for high-level orchestration + azure = AzureAI(self.azure_endpoint, self.azure_key) + + try: + # Establish connection to orchestrator + client = Client(orchestrator_mcp) + async with client: + # Get orchestrator tools + raw_tools = await client.list_tools() + tools = [ + { + "type": "function", + "function": { + "name": tool.name, + "description": tool.description, + "parameters": tool.inputSchema, + }, + } + for tool in raw_tools + ] + + # Add orchestration context + orchestration_context = self._get_orchestration_context(capabilities, strategy) + azure.add_system_message(orchestration_context) + + # Process with orchestrator + response = azure.call_agent_with_history(request.prompt, tools=tools) + + # Handle tool calls for orchestration + finish_reason = response.get("finish_reason") + current_iteration = 0 + + while finish_reason != "stop" and current_iteration < self.max_iterations: + if finish_reason == "tool_calls": + tool_calls = response.get("message").get("tool_calls") + for tool_call in tool_calls: + tool_call_id = tool_call.get("id") + tool_name = tool_call.get("function").get("name") + arguments_str = tool_call.get("function").get("arguments") + + try: + arguments = json.loads(arguments_str) + logger.info(f"Orchestrator calling tool: {tool_name} with {arguments}") + + result = await client.call_tool(tool_name, arguments) + logger.info(f"Orchestrator tool {tool_name} returned: {result}") + azure.add_tool_response(tool_call_id, result) + + except json.JSONDecodeError as e: + logger.error(f"Error parsing orchestrator arguments: {e}") + continue + except ToolError as tool_error: + error_message = f"Orchestrator tool error in {tool_name}: {str(tool_error)}" + logger.error(error_message) + azure.add_tool_response(tool_call_id, {"error": error_message}) + except Exception as e: + error_message = f"Unexpected orchestrator error in {tool_name}: {str(e)}" + logger.error(error_message) + azure.add_tool_response(tool_call_id, {"error": error_message}) + + # Continue orchestration conversation + response = azure.call_agent_with_history("", tools=tools, role="user") + finish_reason = response.get("finish_reason") + current_iteration += 1 + + elif finish_reason == "length": + logger.warning("Orchestration length exceeded limit. Stopping.") + break + else: + logger.warning(f"Unexpected orchestration finish reason: {finish_reason}") + break + + response_text = response.get("message").get("content", "").strip() + return AgentResponse(response=response_text, history=azure.history) + + except Exception as e: + logger.error("Error during orchestration:", exc_info=True) + raise e + + async def get_orchestration_capabilities(self) -> Dict[str, Any]: + """Get available orchestration capabilities and agents""" + try: + client = Client(orchestrator_mcp) + async with client: + result = await client.call_tool("list_agent_capabilities", {}) + return result + except Exception as e: + logger.error(f"Error getting orchestration capabilities: {e}") + return {"error": str(e)} + + async def get_agent_status(self, agent_name: str) -> Dict[str, Any]: + """Get the status of a specific agent""" + try: + client = Client(orchestrator_mcp) + async with client: + result = await client.call_tool("get_agent_status", {"agent_name": agent_name}) + return result + except Exception as e: + logger.error(f"Error getting agent status: {e}") + return {"error": str(e)} + + def _get_orchestration_context(self, capabilities: Optional[List[str]], strategy: str) -> str: + """Create orchestration context for the Azure AI""" + context = f""" + You are a high-level AI orchestrator that coordinates multiple specialized agents to complete complex tasks. + + Available agent capabilities: {agent_registry.list_capabilities()} + Available agents: {agent_registry.list_agents()} + + Your role is to: + 1. Analyze complex user requests + 2. Determine which specialized agents are needed + 3. Coordinate multiple agents to work together + 4. Synthesize results into cohesive responses + + Execution strategy: {strategy} + """ + + if capabilities: + context += f"\nRequired capabilities for this task: {capabilities}" + + context += """ + + Use the orchestration tools available to: + - orchestrate_task: Plan and coordinate complex multi-agent tasks + - list_agent_capabilities: See what agents and capabilities are available + - get_agent_status: Check the status of specific agents + + Always provide clear, actionable orchestration plans and coordinate agents effectively. + """ + + return context diff --git a/web/backend-fastapi/app/modules/agent/views/agent_views.py b/web/backend-fastapi/app/modules/agent/views/agent_views.py new file mode 100644 index 000000000..a55559bc3 --- /dev/null +++ b/web/backend-fastapi/app/modules/agent/views/agent_views.py @@ -0,0 +1,8 @@ +from ..models.agent_model import AgentResponse + + +class AgentViews: + @staticmethod + def agent_response(data: AgentResponse) -> dict: + """Format agent response""" + return data.dict() diff --git a/web/backend-fastapi/app/modules/analytics/__init__.py b/web/backend-fastapi/app/modules/analytics/__init__.py new file mode 100644 index 000000000..4e997983e --- /dev/null +++ b/web/backend-fastapi/app/modules/analytics/__init__.py @@ -0,0 +1,3 @@ +from .controllers.analytics_controller import router + +__all__ = ["router"] diff --git a/web/backend-fastapi/app/modules/analytics/controllers/analytics_controller.py b/web/backend-fastapi/app/modules/analytics/controllers/analytics_controller.py new file mode 100644 index 000000000..613608d0b --- /dev/null +++ b/web/backend-fastapi/app/modules/analytics/controllers/analytics_controller.py @@ -0,0 +1,45 @@ +from fastapi import APIRouter, Request, HTTPException +from ..services.analytics_service import AnalyticsService +from ..views.analytics_views import AnalyticsViews + + +class AnalyticsController: + def __init__(self): + self.router = APIRouter() + self.analytics_service = AnalyticsService() + self.analytics_views = AnalyticsViews() + self._setup_routes() + + def _setup_routes(self): + """Setup all analytics routes""" + + @self.router.post("/saveAnalytics") + async def save_analytics(request: Request): + try: + analytics_data = await request.json() + session_id = analytics_data.get('sessionId') + + if not session_id: + raise HTTPException( + status_code=400, + detail="sessionId is required in the analytics data" + ) + + message = await self.analytics_service.save_analytics(analytics_data) + return self.analytics_views.analytics_response(message) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + @self.router.patch("/updateAnalytics") + async def update_analytics(request: Request): + try: + updates = await request.json() + message = await self.analytics_service.update_analytics(updates) + return self.analytics_views.analytics_response(message) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +# Create router instance +analytics_controller = AnalyticsController() +router = analytics_controller.router diff --git a/web/backend-fastapi/app/modules/analytics/models/analytics_model.py b/web/backend-fastapi/app/modules/analytics/models/analytics_model.py new file mode 100644 index 000000000..4270b25d5 --- /dev/null +++ b/web/backend-fastapi/app/modules/analytics/models/analytics_model.py @@ -0,0 +1,20 @@ +from pydantic import BaseModel +from typing import List, Dict, Any, Optional + + +class AnalyticsData(BaseModel): + sessionId: str + chats: Optional[List[Dict[str, Any]]] = [] + metadata: Optional[Dict[str, Any]] = {} + + +class AnalyticsUpdate(BaseModel): + sessionId: str + newChat: Optional[Dict[str, Any]] = None + sourceUpdate: Optional[Dict[str, Any]] = None + llmResponseUpdate: Optional[Dict[str, Any]] = None + + +class AnalyticsResponse(BaseModel): + message: str + status: str = "success" diff --git a/web/backend-fastapi/app/services/analytics_service.py b/web/backend-fastapi/app/modules/analytics/services/analytics_service.py similarity index 62% rename from web/backend-fastapi/app/services/analytics_service.py rename to web/backend-fastapi/app/modules/analytics/services/analytics_service.py index bb0c00822..e02e6cd7a 100644 --- a/web/backend-fastapi/app/services/analytics_service.py +++ b/web/backend-fastapi/app/modules/analytics/services/analytics_service.py @@ -22,7 +22,14 @@ def __init__(self): async def read_data(self) -> List[Dict[str, Any]]: async with aiofiles.open(self.file_path, mode='r') as file: content = await file.read() - return json.loads(content) if content else [] + if not content: + return [] + + data = json.loads(content) + # Handle case where file contains a dict instead of list + if isinstance(data, dict): + return [] + return data if isinstance(data, list) else [] # write data to disk async def write_data(self, data: List[Dict[str, Any]]): @@ -62,13 +69,32 @@ async def update_analytics(self, updates: List[Dict[str, Any]]) -> str: if 'sourceUpdate' in update: source_update = update['sourceUpdate'] - chat = session_data['chats'][source_update['chatIndex']] - source = next(s for s in chat['sources'] if s['sourceKey'] == source_update['sourceKey']) - source.update(source_update) + chat_index = source_update['chatIndex'] + + # Find chat by matching chatIndex field in llmResponseInteraction + target_chat = None + for chat in session_data['chats']: + if chat.get('llmResponseInteraction', {}).get('chatIndex') == chat_index: + target_chat = chat + break + + if target_chat and 'sources' in target_chat: + source = next((s for s in target_chat['sources'] if s['sourceKey'] == source_update['sourceKey']), None) + if source: + source.update(source_update) if 'llmResponseUpdate' in update: llm_update = update['llmResponseUpdate'] - chat = session_data['chats'][llm_update['chatIndex']] - chat['llmResponseInteraction'].update(llm_update) + chat_index = llm_update['chatIndex'] + + # Find chat by matching chatIndex field in llmResponseInteraction + target_chat = None + for chat in session_data['chats']: + if chat.get('llmResponseInteraction', {}).get('chatIndex') == chat_index: + target_chat = chat + break + + if target_chat and 'llmResponseInteraction' in target_chat: + target_chat['llmResponseInteraction'].update(llm_update) await self.write_data(all_data) return "Analytics data updated successfully" diff --git a/web/backend-fastapi/app/modules/analytics/views/analytics_views.py b/web/backend-fastapi/app/modules/analytics/views/analytics_views.py new file mode 100644 index 000000000..3cfaeab58 --- /dev/null +++ b/web/backend-fastapi/app/modules/analytics/views/analytics_views.py @@ -0,0 +1,13 @@ +from ..models.analytics_model import AnalyticsResponse + + +class AnalyticsViews: + @staticmethod + def analytics_response(message: str) -> dict: + """Format analytics response""" + return AnalyticsResponse(message=message).dict() + + @staticmethod + def error_response(error: str) -> dict: + """Format error response""" + return AnalyticsResponse(message=error, status="error").dict() diff --git a/web/backend-fastapi/app/modules/auth/__init__.py b/web/backend-fastapi/app/modules/auth/__init__.py new file mode 100644 index 000000000..e50bdf4c9 --- /dev/null +++ b/web/backend-fastapi/app/modules/auth/__init__.py @@ -0,0 +1,3 @@ +from .controllers.auth_controller import router + +__all__ = ["router"] diff --git a/web/backend-fastapi/app/modules/auth/controllers/auth_controller.py b/web/backend-fastapi/app/modules/auth/controllers/auth_controller.py new file mode 100644 index 000000000..8f8b2fbe5 --- /dev/null +++ b/web/backend-fastapi/app/modules/auth/controllers/auth_controller.py @@ -0,0 +1,44 @@ +from fastapi import APIRouter, Form, Depends +from ..services.auth_service import AuthService +from ..views.auth_views import AuthViews +from ..models.auth_model import TokenRequest + + +class AuthController: + def __init__(self): + self.router = APIRouter() + self.auth_service = AuthService() + self.auth_views = AuthViews() + self._setup_routes() + + def _setup_routes(self): + """Setup all authentication routes""" + + @self.router.get("/") + async def read_main(): + return self.auth_views.main_response() + + @self.router.get("/login") + async def login(): + result = await self.auth_service.login() + return self.auth_views.login_response(result) + + @self.router.get("/validate") + async def validate(): + result = await self.auth_service.validate_user() + return self.auth_views.validation_response(result) + + @self.router.post("/refreshtoken/") + async def refresh_token(refresh_token: str = Form(...)): + result = await self.auth_service.refresh_token(refresh_token) + return self.auth_views.token_response(result) + + @self.router.get("/health") + async def health_check(): + result = await self.auth_service.health_check() + return self.auth_views.health_response(result) + + +# Create router instance +auth_controller = AuthController() +router = auth_controller.router diff --git a/web/backend-fastapi/app/modules/auth/models/auth_model.py b/web/backend-fastapi/app/modules/auth/models/auth_model.py new file mode 100644 index 000000000..bde94b97c --- /dev/null +++ b/web/backend-fastapi/app/modules/auth/models/auth_model.py @@ -0,0 +1,29 @@ +from pydantic import BaseModel +from typing import Optional + + +class TokenRequest(BaseModel): + refresh_token: str + + +class TokenResponse(BaseModel): + access_token: str + refresh_token: str + expires_in: int + token_type: str = "Bearer" + + +class LoginResponse(BaseModel): + valid: bool + message: Optional[str] = None + + +class ValidationResponse(BaseModel): + status: str + user_id: Optional[str] = None + roles: Optional[list] = None + + +class HealthResponse(BaseModel): + status: str + timestamp: Optional[str] = None diff --git a/web/backend-fastapi/app/modules/auth/services/auth_service.py b/web/backend-fastapi/app/modules/auth/services/auth_service.py new file mode 100644 index 000000000..f9a54f76c --- /dev/null +++ b/web/backend-fastapi/app/modules/auth/services/auth_service.py @@ -0,0 +1,52 @@ +import requests +import os +from fastapi import HTTPException +from ..models.auth_model import TokenResponse, LoginResponse, ValidationResponse + + +class AuthService: + def __init__(self): + self.keycloak_endpoint = 'https://dev.loginproxy.gov.bc.ca/auth/realms/standard/protocol/openid-connect/token' + self.client_id = 'a-i-pathfinding-project-5449' + + async def login(self) -> LoginResponse: + """Handle user login - returns basic validation""" + return LoginResponse(valid=True) + + async def validate_user(self) -> ValidationResponse: + """Validate user session""" + return ValidationResponse(status="ok") + + async def refresh_token(self, refresh_token: str) -> TokenResponse: + """Refresh user token using Keycloak""" + headers = { + 'Content-Type': 'application/x-www-form-urlencoded' + } + + data = { + 'client_id': self.client_id, + 'grant_type': 'refresh_token', + 'refresh_token': refresh_token + } + + try: + response = requests.post(self.keycloak_endpoint, headers=headers, data=data) + if response.status_code != 200: + raise HTTPException( + status_code=response.status_code, + detail="Token refresh failed" + ) + + token_data = response.json() + return TokenResponse( + access_token=token_data.get('access_token'), + refresh_token=token_data.get('refresh_token'), + expires_in=token_data.get('expires_in'), + token_type=token_data.get('token_type', 'Bearer') + ) + except requests.RequestException as e: + raise HTTPException(status_code=500, detail=f"Token refresh error: {str(e)}") + + async def health_check(self): + """Check service health""" + return {"status": "healthy"} diff --git a/web/backend-fastapi/app/modules/auth/views/auth_views.py b/web/backend-fastapi/app/modules/auth/views/auth_views.py new file mode 100644 index 000000000..49ec65eba --- /dev/null +++ b/web/backend-fastapi/app/modules/auth/views/auth_views.py @@ -0,0 +1,29 @@ +from fastapi.responses import JSONResponse +from ..models.auth_model import LoginResponse, ValidationResponse, TokenResponse, HealthResponse + + +class AuthViews: + @staticmethod + def login_response(data: LoginResponse) -> JSONResponse: + """Format login response""" + return JSONResponse(content=data.dict()) + + @staticmethod + def validation_response(data: ValidationResponse) -> JSONResponse: + """Format validation response""" + return JSONResponse(content=data.dict()) + + @staticmethod + def token_response(data: TokenResponse) -> dict: + """Format token refresh response""" + return data.dict() + + @staticmethod + def health_response(data: dict) -> dict: + """Format health check response""" + return data + + @staticmethod + def main_response() -> dict: + """Format main endpoint response""" + return {"msg": "Hello World"} diff --git a/web/backend-fastapi/app/modules/chat/__init__.py b/web/backend-fastapi/app/modules/chat/__init__.py new file mode 100644 index 000000000..3f70e213c --- /dev/null +++ b/web/backend-fastapi/app/modules/chat/__init__.py @@ -0,0 +1,3 @@ +from .controllers.chat_controller import router + +__all__ = ["router"] diff --git a/web/backend-fastapi/app/modules/chat/controllers/chat_controller.py b/web/backend-fastapi/app/modules/chat/controllers/chat_controller.py new file mode 100644 index 000000000..00400f478 --- /dev/null +++ b/web/backend-fastapi/app/modules/chat/controllers/chat_controller.py @@ -0,0 +1,30 @@ +from fastapi import APIRouter, Body +from ..services.chat_service import ChatService +from ..views.chat_views import ChatViews +from ..models.chat_model import ChatRequest + + +class ChatController: + def __init__(self): + self.router = APIRouter() + self.chat_service = ChatService() + self.chat_views = ChatViews() + self._setup_routes() + + def _setup_routes(self): + """Setup all chat routes""" + + @self.router.post("/chat/") + async def chat(chat_request: ChatRequest = Body(ChatRequest)): + result = await self.chat_service.process_chat(chat_request) + return self.chat_views.chat_response(result) + + @self.router.get("/chat/states/") + async def get_states(): + result = await self.chat_service.get_available_states() + return self.chat_views.states_response(result) + + +# Create router instance +chat_controller = ChatController() +router = chat_controller.router diff --git a/web/backend-fastapi/app/common/chat_objects.py b/web/backend-fastapi/app/modules/chat/models/chat_model.py similarity index 61% rename from web/backend-fastapi/app/common/chat_objects.py rename to web/backend-fastapi/app/modules/chat/models/chat_model.py index ebe36f14e..60f162f1a 100644 --- a/web/backend-fastapi/app/common/chat_objects.py +++ b/web/backend-fastapi/app/modules/chat/models/chat_model.py @@ -11,3 +11,12 @@ class ChatRequest(BaseModel): prompt: str chatHistory: List[ChatHistory] key: Optional[str] = None + + +class ChatResponse(BaseModel): + responses: List[str] + recording: Optional[str] = None + + +class StateResponse(BaseModel): + states: List[dict] diff --git a/web/backend-fastapi/app/modules/chat/services/chat_service.py b/web/backend-fastapi/app/modules/chat/services/chat_service.py new file mode 100644 index 000000000..7c46a001f --- /dev/null +++ b/web/backend-fastapi/app/modules/chat/services/chat_service.py @@ -0,0 +1,67 @@ +from collections import defaultdict +from fastapi import HTTPException +from app.shared.models import neo4j, trulens, rag +from app.shared.models.rag_states import ( + get_state_map, + StateType, + default_state, + get_states_for_frontend, +) +from ..models.chat_model import ChatRequest, ChatResponse, StateResponse + + +class ChatService: + def __init__(self): + self.kg = None + self.tru = None + + def _initialize_components(self): + """Initialize knowledge graph and trulens if not already done""" + if self.kg is None: + self.kg = neo4j.neo4j() + if self.tru is None: + self.tru = trulens.connect_trulens() + + async def process_chat(self, chat_request: ChatRequest) -> ChatResponse: + """Process chat request and return response""" + chat_history = [] + + # Ensure the input is a valid ChatRequest object + if not isinstance(chat_request, ChatRequest): + raise ValueError("Input should be a valid ChatRequest object") + + # Initialize components + self._initialize_components() + rag_fn = rag.get_full_rag() + + # Determine which state to use + state_map = get_state_map() + state_entry = state_map.get(chat_request.key, default_state) + if state_entry is None: + raise HTTPException(status_code=404, detail="RAG state not found") + + state = state_entry.get("state") + tru_rag = trulens.tru_rag(rag_fn, state.trulens_id) + + with tru_rag as recording: + # Key used to determine class called for query + if state.type == StateType.INTERNAL: + # For internal operations with Neo4j + response = rag_fn.query( + chat_request.prompt, + chat_history, + self.kg, + state, + ) + responses = [response] + else: + # For external sources, like Azure + responses = [] + + record = recording.get() + return ChatResponse(responses=responses, recording=record.record_id) + + async def get_available_states(self) -> StateResponse: + """Get available RAG states for frontend""" + states = get_states_for_frontend() + return StateResponse(states=states) diff --git a/web/backend-fastapi/app/modules/chat/views/chat_views.py b/web/backend-fastapi/app/modules/chat/views/chat_views.py new file mode 100644 index 000000000..e34e27b71 --- /dev/null +++ b/web/backend-fastapi/app/modules/chat/views/chat_views.py @@ -0,0 +1,14 @@ +from typing import List +from ..models.chat_model import ChatResponse, StateResponse + + +class ChatViews: + @staticmethod + def chat_response(data: ChatResponse) -> dict: + """Format chat response""" + return data.dict() + + @staticmethod + def states_response(data: StateResponse) -> List[dict]: + """Format states response""" + return data.states diff --git a/web/backend-fastapi/app/modules/feedback/__init__.py b/web/backend-fastapi/app/modules/feedback/__init__.py new file mode 100644 index 000000000..9c5bbf1bd --- /dev/null +++ b/web/backend-fastapi/app/modules/feedback/__init__.py @@ -0,0 +1,3 @@ +from .controllers.feedback_controller import router + +__all__ = ["router"] diff --git a/web/backend-fastapi/app/modules/feedback/controllers/feedback_controller.py b/web/backend-fastapi/app/modules/feedback/controllers/feedback_controller.py new file mode 100644 index 000000000..5cfde420c --- /dev/null +++ b/web/backend-fastapi/app/modules/feedback/controllers/feedback_controller.py @@ -0,0 +1,71 @@ +from fastapi import APIRouter, Form +from ..services.feedback_service import FeedbackService +from ..views.feedback_views import FeedbackViews +from ..models.feedback_model import ( + QuestionSubmissionRequest, + FeedbackRequest, + RAGFeedbackRequest, + FetchFeedbackRequest +) + + +class FeedbackController: + def __init__(self): + self.router = APIRouter() + self.feedback_service = FeedbackService() + self.feedback_views = FeedbackViews() + self._setup_routes() + + def _setup_routes(self): + """Setup all feedback routes""" + + @self.router.post("/submit/") + async def submit_question(prompt: str = Form(...)): + request = QuestionSubmissionRequest(prompt=prompt) + result = await self.feedback_service.submit_question(request) + return self.feedback_views.question_response(result) + + @self.router.post("/feedback/") + async def feedback( + feedback: str = Form(...), + index: str = Form(...), + recording_id: str = Form(...), + bulk: bool = Form(False), + trulens_id: str = Form("unknown"), + ): + request = FeedbackRequest( + feedback=feedback, + index=index, + recording_id=recording_id, + bulk=bulk, + trulens_id=trulens_id + ) + result = await self.feedback_service.process_feedback(request) + return self.feedback_views.feedback_response(result) + + @self.router.post("/feedbackrag/") + async def feedbackrag( + feedback: str = Form(...), + recording_id: str = Form(...), + comment: str = Form(None), + trulens_id: str = Form("unknown"), + ): + request = RAGFeedbackRequest( + feedback=feedback, + recording_id=recording_id, + comment=comment, + trulens_id=trulens_id + ) + result = await self.feedback_service.process_rag_feedback(request) + return self.feedback_views.feedback_response(result) + + @self.router.get("/fetch_feedback/") + async def fetch_all_feedback(trulens_id: str = "unknown"): + request = FetchFeedbackRequest(trulens_id=trulens_id) + result = await self.feedback_service.fetch_all_feedback(request) + return self.feedback_views.feedback_response(result) + + +# Create router instance +feedback_controller = FeedbackController() +router = feedback_controller.router diff --git a/web/backend-fastapi/app/modules/feedback/models/feedback_model.py b/web/backend-fastapi/app/modules/feedback/models/feedback_model.py new file mode 100644 index 000000000..673033eb0 --- /dev/null +++ b/web/backend-fastapi/app/modules/feedback/models/feedback_model.py @@ -0,0 +1,36 @@ +from pydantic import BaseModel +from typing import List, Optional, Any + + +class QuestionSubmissionRequest(BaseModel): + prompt: str + + +class FeedbackRequest(BaseModel): + feedback: str + index: str + recording_id: str + bulk: bool = False + trulens_id: str = "unknown" + + +class RAGFeedbackRequest(BaseModel): + feedback: str + recording_id: str + comment: Optional[str] = None + trulens_id: str = "unknown" + + +class QuestionResponse(BaseModel): + responses: List[Any] + recording: str + + +class FeedbackResponse(BaseModel): + status: bool + rows: Optional[List[Any]] = None + message: Optional[str] = None + + +class FetchFeedbackRequest(BaseModel): + trulens_id: str = "unknown" diff --git a/web/backend-fastapi/app/modules/feedback/services/feedback_service.py b/web/backend-fastapi/app/modules/feedback/services/feedback_service.py new file mode 100644 index 000000000..61b628b07 --- /dev/null +++ b/web/backend-fastapi/app/modules/feedback/services/feedback_service.py @@ -0,0 +1,99 @@ +import logging +from langchain_community.embeddings import HuggingFaceEmbeddings +from app.shared.models import trulens, rag, neo4j +from ..models.feedback_model import ( + QuestionSubmissionRequest, + FeedbackRequest, + RAGFeedbackRequest, + QuestionResponse, + FeedbackResponse, + FetchFeedbackRequest +) + + +class FeedbackService: + def __init__(self): + self.kg = None + self.tru = None + self.embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") + + def _initialize_components(self): + """Initialize knowledge graph and trulens if not already done""" + if self.kg is None: + self.kg = neo4j.neo4j() + if self.tru is None: + self.tru = trulens.connect_trulens() + + async def submit_question(self, request: QuestionSubmissionRequest) -> QuestionResponse: + """Submit a question and get response""" + self._initialize_components() + + rag_fn = rag.get_top_k() + tru_rag = trulens.tru_rag(rag_fn) + + with tru_rag as recording: + responses = rag_fn.query(request.prompt, None, self.embeddings, self.kg) + + record = recording.get() + return QuestionResponse(responses=responses, recording=record.record_id) + + async def process_feedback(self, request: FeedbackRequest) -> FeedbackResponse: + """Process user feedback""" + self._initialize_components() + + rows = trulens.process_feedback( + self.tru, + request.trulens_id, + request.index, + request.feedback, + request.recording_id, + request.bulk + ) + + if rows: + return FeedbackResponse(status=True, rows=rows) + else: + return FeedbackResponse(status=False) + + async def process_rag_feedback(self, request: RAGFeedbackRequest) -> FeedbackResponse: + """Process RAG-specific feedback""" + self._initialize_components() + + try: + rows = trulens.process_rag_feedback( + request.feedback, + request.trulens_id, + request.recording_id, + self.tru, + request.comment + ) + + if rows: + return FeedbackResponse( + status=True, + message="Feedback submitted successfully", + rows=rows + ) + + return FeedbackResponse( + status=False, + message="No feedback data found" + ) + + except Exception as e: + logging.error(f"Error processing feedback: {str(e)}") + return FeedbackResponse( + status=False, + message="An internal error has occurred. Please try again later." + ) + + async def fetch_all_feedback(self, request: FetchFeedbackRequest) -> FeedbackResponse: + """Fetch all feedback for a given trulens_id""" + self._initialize_components() + + rows = trulens.fetch_all_feedback(request.trulens_id) + + if rows: + return FeedbackResponse(status=True, rows=rows) + else: + return FeedbackResponse(status=False) diff --git a/web/backend-fastapi/app/modules/feedback/views/feedback_views.py b/web/backend-fastapi/app/modules/feedback/views/feedback_views.py new file mode 100644 index 000000000..ef71085f2 --- /dev/null +++ b/web/backend-fastapi/app/modules/feedback/views/feedback_views.py @@ -0,0 +1,13 @@ +from ..models.feedback_model import QuestionResponse, FeedbackResponse + + +class FeedbackViews: + @staticmethod + def question_response(data: QuestionResponse) -> dict: + """Format question submission response""" + return data.dict() + + @staticmethod + def feedback_response(data: FeedbackResponse) -> dict: + """Format feedback response""" + return data.dict() diff --git a/web/backend-fastapi/app/modules/module_registry.py b/web/backend-fastapi/app/modules/module_registry.py new file mode 100644 index 000000000..0afb9da76 --- /dev/null +++ b/web/backend-fastapi/app/modules/module_registry.py @@ -0,0 +1,33 @@ +from fastapi import FastAPI +from . import auth, chat, agent, analytics, feedback + + +class ModuleRegistry: + """Central registry for managing HMVC modules""" + + def __init__(self): + self.modules = { + 'auth': auth, + 'chat': chat, + 'agent': agent, + 'analytics': analytics, + 'feedback': feedback + } + + def register_all_modules(self, app: FastAPI): + """Register all modules with the FastAPI app""" + for module_name, module in self.modules.items(): + app.include_router(module.router, tags=[module_name]) + print(f"Registered {module_name} module") + + def get_module(self, name: str): + """Get a specific module by name""" + return self.modules.get(name) + + def list_modules(self): + """List all registered modules""" + return list(self.modules.keys()) + + +# Create singleton instance +module_registry = ModuleRegistry() diff --git a/web/backend-fastapi/app/roles.json b/web/backend-fastapi/app/roles.json index 01aff6e1a..372d517c2 100644 --- a/web/backend-fastapi/app/roles.json +++ b/web/backend-fastapi/app/roles.json @@ -1,3 +1,3 @@ { "roles": ["IMB_DEV", "kings_printer", "IMB_SLT"] -} \ No newline at end of file +} diff --git a/web/backend-fastapi/app/shared/__init__.py b/web/backend-fastapi/app/shared/__init__.py new file mode 100644 index 000000000..37ee54a5e --- /dev/null +++ b/web/backend-fastapi/app/shared/__init__.py @@ -0,0 +1 @@ +# Shared Components Package diff --git a/web/backend-fastapi/app/shared/exceptions/custom_exceptions.py b/web/backend-fastapi/app/shared/exceptions/custom_exceptions.py new file mode 100644 index 000000000..1bc2534b3 --- /dev/null +++ b/web/backend-fastapi/app/shared/exceptions/custom_exceptions.py @@ -0,0 +1,51 @@ +from fastapi import HTTPException +from typing import Optional, Any + + +class ModuleException(HTTPException): + """Base exception for module-specific errors""" + + def __init__( + self, + status_code: int, + detail: str, + module: str = "Unknown", + headers: Optional[dict] = None + ): + super().__init__(status_code=status_code, detail=detail, headers=headers) + self.module = module + + +class AuthenticationError(ModuleException): + """Authentication module specific error""" + + def __init__(self, detail: str = "Authentication failed"): + super().__init__(status_code=401, detail=detail, module="auth") + + +class ChatError(ModuleException): + """Chat module specific error""" + + def __init__(self, detail: str = "Chat processing error", status_code: int = 400): + super().__init__(status_code=status_code, detail=detail, module="chat") + + +class AgentError(ModuleException): + """Agent module specific error""" + + def __init__(self, detail: str = "Agent processing error", status_code: int = 400): + super().__init__(status_code=status_code, detail=detail, module="agent") + + +class AnalyticsError(ModuleException): + """Analytics module specific error""" + + def __init__(self, detail: str = "Analytics processing error", status_code: int = 400): + super().__init__(status_code=status_code, detail=detail, module="analytics") + + +class FeedbackError(ModuleException): + """Feedback module specific error""" + + def __init__(self, detail: str = "Feedback processing error", status_code: int = 400): + super().__init__(status_code=status_code, detail=detail, module="feedback") diff --git a/web/backend-fastapi/app/models/azure.py b/web/backend-fastapi/app/shared/models/azure.py similarity index 100% rename from web/backend-fastapi/app/models/azure.py rename to web/backend-fastapi/app/shared/models/azure.py diff --git a/web/backend-fastapi/app/models/bedrock.py b/web/backend-fastapi/app/shared/models/bedrock.py similarity index 100% rename from web/backend-fastapi/app/models/bedrock.py rename to web/backend-fastapi/app/shared/models/bedrock.py diff --git a/web/backend-fastapi/app/models/neo4j.py b/web/backend-fastapi/app/shared/models/neo4j.py similarity index 100% rename from web/backend-fastapi/app/models/neo4j.py rename to web/backend-fastapi/app/shared/models/neo4j.py diff --git a/web/backend-fastapi/app/models/rag.py b/web/backend-fastapi/app/shared/models/rag.py similarity index 83% rename from web/backend-fastapi/app/models/rag.py rename to web/backend-fastapi/app/shared/models/rag.py index e5a6b97dc..d3e691ee8 100644 --- a/web/backend-fastapi/app/models/rag.py +++ b/web/backend-fastapi/app/shared/models/rag.py @@ -1,9 +1,13 @@ from trulens.apps.app import instrument -from app.models import bedrock +from . import bedrock import json from typing import List from sentence_transformers import CrossEncoder -from ..common.chat_objects import ChatHistory +from pydantic import BaseModel + +class ChatHistory(BaseModel): + prompt: str + response: str class get_full_rag: @@ -115,3 +119,20 @@ def query(self, query: str, chat_history: List[ChatHistory], kg, state) -> str: bedrock_response = self.get_response(create_prompt, state.kwargs_key) pretty_output = self.formatoutput(context_str, bedrock_response) return json.dumps(pretty_output) + + +class get_top_k: + """RAG class for top-k retrieval used in feedback processing""" + + @instrument + def retrieve(self, question: str, kg, embeddings) -> list: + """Retrieve relevant text from vector store using embeddings""" + # This would typically use the embeddings model to find similar content + # Implementation depends on your specific vector store setup + return [] + + @instrument + def query(self, prompt: str, chat_history, embeddings, kg) -> list: + """Query for top-k results""" + # Simplified implementation - you may need to adjust based on your actual logic + return self.retrieve(prompt, kg, embeddings) diff --git a/web/backend-fastapi/app/models/rag_states/State.py b/web/backend-fastapi/app/shared/models/rag_states/State.py similarity index 100% rename from web/backend-fastapi/app/models/rag_states/State.py rename to web/backend-fastapi/app/shared/models/rag_states/State.py diff --git a/web/backend-fastapi/app/models/rag_states/__init__.py b/web/backend-fastapi/app/shared/models/rag_states/__init__.py similarity index 100% rename from web/backend-fastapi/app/models/rag_states/__init__.py rename to web/backend-fastapi/app/shared/models/rag_states/__init__.py diff --git a/web/backend-fastapi/app/models/rag_states/v2_UpdatedChunks.py b/web/backend-fastapi/app/shared/models/rag_states/v2_UpdatedChunks.py similarity index 98% rename from web/backend-fastapi/app/models/rag_states/v2_UpdatedChunks.py rename to web/backend-fastapi/app/shared/models/rag_states/v2_UpdatedChunks.py index b24cd3a82..0edc17ebd 100644 --- a/web/backend-fastapi/app/models/rag_states/v2_UpdatedChunks.py +++ b/web/backend-fastapi/app/shared/models/rag_states/v2_UpdatedChunks.py @@ -2,7 +2,7 @@ from langchain_community.embeddings import HuggingFaceEmbeddings from .State import State from ..neo4j import neo4j_vector_search -from ...common.chat_objects import ChatHistory +from ..rag import ChatHistory class UpdatedChunks(State): diff --git a/web/backend-fastapi/app/models/rag_states/v3_AtomicIndexing.py b/web/backend-fastapi/app/shared/models/rag_states/v3_AtomicIndexing.py similarity index 99% rename from web/backend-fastapi/app/models/rag_states/v3_AtomicIndexing.py rename to web/backend-fastapi/app/shared/models/rag_states/v3_AtomicIndexing.py index 98d8e1ded..f6961f729 100644 --- a/web/backend-fastapi/app/models/rag_states/v3_AtomicIndexing.py +++ b/web/backend-fastapi/app/shared/models/rag_states/v3_AtomicIndexing.py @@ -3,7 +3,7 @@ from langchain_community.embeddings import HuggingFaceEmbeddings from .State import State from ..neo4j import neo4j_vector_search -from ...common.chat_objects import ChatHistory +from ..rag import ChatHistory class AtomicIndexing(State): diff --git a/web/backend-fastapi/app/models/rag_states/v4_ImagesAndChunks.py b/web/backend-fastapi/app/shared/models/rag_states/v4_ImagesAndChunks.py similarity index 98% rename from web/backend-fastapi/app/models/rag_states/v4_ImagesAndChunks.py rename to web/backend-fastapi/app/shared/models/rag_states/v4_ImagesAndChunks.py index 5dee53bf5..60de4f3f8 100644 --- a/web/backend-fastapi/app/models/rag_states/v4_ImagesAndChunks.py +++ b/web/backend-fastapi/app/shared/models/rag_states/v4_ImagesAndChunks.py @@ -2,7 +2,7 @@ from langchain_community.embeddings import HuggingFaceEmbeddings from .State import State from ..neo4j import neo4j_vector_search -from ...common.chat_objects import ChatHistory +from ..rag import ChatHistory class ImagesAndChunks(State): diff --git a/web/backend-fastapi/app/models/trulens.py b/web/backend-fastapi/app/shared/models/trulens.py similarity index 100% rename from web/backend-fastapi/app/models/trulens.py rename to web/backend-fastapi/app/shared/models/trulens.py diff --git a/web/backend-fastapi/app/shared/utils/response_utils.py b/web/backend-fastapi/app/shared/utils/response_utils.py new file mode 100644 index 000000000..12d744011 --- /dev/null +++ b/web/backend-fastapi/app/shared/utils/response_utils.py @@ -0,0 +1,29 @@ +from fastapi.responses import JSONResponse +from typing import Any, Dict, Optional + + +class ResponseUtils: + """Utility class for standardized API responses""" + + @staticmethod + def success_response(data: Any, message: str = "Success") -> Dict[str, Any]: + """Standard success response format""" + return { + "status": "success", + "message": message, + "data": data + } + + @staticmethod + def error_response(message: str, code: str = "ERROR") -> Dict[str, Any]: + """Standard error response format""" + return { + "status": "error", + "message": message, + "code": code + } + + @staticmethod + def json_response(data: Any, status_code: int = 200) -> JSONResponse: + """Create JSONResponse with standard format""" + return JSONResponse(content=data, status_code=status_code) diff --git a/web/backend-fastapi/main.py b/web/backend-fastapi/main.py index 5c4ef7b29..dd7c52ca8 100644 --- a/web/backend-fastapi/main.py +++ b/web/backend-fastapi/main.py @@ -30,31 +30,28 @@ def apply(self): from fastapi.middleware.cors import CORSMiddleware from app.middleware.authentication import AuthenticationMiddleware from app.middleware.logging import LoggingMiddleware -from app.controllers import feedback, chat_RAG, login, analytics, agent +from app.modules.module_registry import module_registry import warnings import os from dotenv import load_dotenv from fastmcp import FastMCP -from app.mcp_agents import agents_mcp +from app.modules.agent.agents import agent_registry warnings.filterwarnings("ignore") app = FastAPI(root_path="/api") -# Include API routers -app.include_router(login.router) -app.include_router(feedback.router) -app.include_router(chat_RAG.router) -app.include_router(analytics.router) -app.include_router(agent.router) +# Register all HMVC modules +module_registry.register_all_modules(app) # Register middleware app.add_middleware(LoggingMiddleware) app.add_middleware(AuthenticationMiddleware) # Get the ASGI-compliant app from your main MCP server instance -mcp_asgi_app = agents_mcp.http_app(path="/mcp", stateless_http=True, json_response=True) +combined_mcp = agent_registry.get_combined_mcp() +mcp_asgi_app = combined_mcp.http_app(path="/mcp", stateless_http=True, json_response=True) # Mount the MCP app at the '/agents' endpoint # Critical step: MUST pass the lifespan from the FastMCP app to FastAPI. diff --git a/web/backend-fastapi/unittest_backend/__init__.py b/web/backend-fastapi/unittest_backend/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/web/backend-fastapi/unittest_backend/test_main.py b/web/backend-fastapi/unittest_backend/test_main.py index 5161e15a5..4f445e6df 100644 --- a/web/backend-fastapi/unittest_backend/test_main.py +++ b/web/backend-fastapi/unittest_backend/test_main.py @@ -1,7 +1,10 @@ import sys +import os from fastapi.testclient import TestClient -sys.path.append('../../') -from code.main import app + +# Add the parent directory to sys.path to find main.py +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) +from main import app client = TestClient(app) @@ -10,5 +13,3 @@ def test_read_main(): response = client.get("/") assert response.status_code == 200 assert response.json() == {"msg": "Hello World"} - - diff --git a/web/backend-fastapi/unittest_backend/test_mcp_agents.py b/web/backend-fastapi/unittest_backend/test_mcp_agents.py new file mode 100644 index 000000000..1eca98516 --- /dev/null +++ b/web/backend-fastapi/unittest_backend/test_mcp_agents.py @@ -0,0 +1,170 @@ +import unittest +import asyncio +from unittest.mock import patch, MagicMock +from fastmcp import Client + +# Import your agents +from app.modules.agent.agents import agent_registry +from app.modules.agent.agents.search.semantic_search import semantic_search_mcp +from app.modules.agent.agents.search.explicit_search import explicit_search_mcp +from app.modules.agent.agents.search.community_detection import community_detection_mcp + + +class TestMCPAgents(unittest.TestCase): + """Test individual MCP agents""" + + def test_agent_registry_initialization(self): + """Test that agent registry initializes correctly""" + agents = agent_registry.list_agents() + self.assertIn("semantic_search", agents) + self.assertIn("explicit_search", agents) + self.assertIn("community_detection", agents) + + capabilities = agent_registry.list_capabilities() + self.assertIn("search", capabilities) + + def test_get_agent_by_name(self): + """Test retrieving specific agents""" + semantic_agent = agent_registry.get_agent("semantic_search") + self.assertIsNotNone(semantic_agent) + self.assertEqual(semantic_agent.name, "SemanticSearchAgent") + + explicit_agent = agent_registry.get_agent("explicit_search") + self.assertIsNotNone(explicit_agent) + + # Test non-existent agent + nonexistent = agent_registry.get_agent("nonexistent") + self.assertIsNone(nonexistent) + + def test_get_combined_mcp(self): + """Test that combined MCP server is created""" + combined_mcp = agent_registry.get_combined_mcp() + self.assertIsNotNone(combined_mcp) + self.assertEqual(combined_mcp.name, "MainAgentServer") + + +class TestMCPAgentTools(unittest.TestCase): + """Test MCP agent tools using direct client""" + + def setUp(self): + """Set up test fixtures""" + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def tearDown(self): + """Clean up after tests""" + self.loop.close() + + @patch('app.shared.models.neo4j.neo4j') + @patch('app.shared.models.rag.get_full_rag') + def test_semantic_search_tool(self, mock_rag, mock_neo4j): + """Test semantic search tool""" + # Mock the RAG response + mock_rag.return_value.query.return_value = [ + {"content": "Test result 1"}, + {"content": "Test result 2"} + ] + + async def run_test(): + client = Client(semantic_search_mcp) + async with client: + # List tools + tools = await client.list_tools() + tool_names = [tool.name for tool in tools] + self.assertIn("semantic_search", tool_names) + + # Call the tool + result = await client.call_tool("semantic_search", { + "query": "test query", + "state_key": "v3" + }) + + self.assertIsInstance(result, dict) + # Verify RAG was called + mock_rag.return_value.query.assert_called_once() + + self.loop.run_until_complete(run_test()) + + @patch('app.shared.models.neo4j.neo4j') + def test_explicit_search_tool(self, mock_neo4j): + """Test explicit search tool""" + # Mock Neo4j response + mock_neo4j_instance = MagicMock() + mock_neo4j_instance.query.return_value = [ + {"name": "Test Document", "content": "Test content"} + ] + mock_neo4j.return_value = mock_neo4j_instance + + async def run_test(): + client = Client(explicit_search_mcp) + async with client: + # List tools + tools = await client.list_tools() + tool_names = [tool.name for tool in tools] + self.assertIn("explicit_search", tool_names) + + # Call the tool + result = await client.call_tool("explicit_search", { + "query": "SELECT * FROM Document LIMIT 5", + "state_key": "v3" + }) + + self.assertIsInstance(result, dict) + # Verify Neo4j was called + mock_neo4j_instance.query.assert_called_once() + + self.loop.run_until_complete(run_test()) + + def test_combined_mcp_tools(self): + """Test combined MCP server has all tools""" + async def run_test(): + combined_mcp = agent_registry.get_combined_mcp() + client = Client(combined_mcp) + + async with client: + tools = await client.list_tools() + tool_names = [tool.name for tool in tools] + + # Should have tools from all mounted agents + expected_tools = [ + "semanticsearch_semantic_search", + "explicitsearch_explicit_search", + "communitydetection_community_detection" + ] + + for expected_tool in expected_tools: + self.assertIn(expected_tool, tool_names) + + self.loop.run_until_complete(run_test()) + + +class TestMCPServerHealth(unittest.TestCase): + """Test MCP server health and connectivity""" + + def test_agent_mcp_servers_exist(self): + """Test that all MCP server instances exist""" + self.assertIsNotNone(semantic_search_mcp) + self.assertIsNotNone(explicit_search_mcp) + self.assertIsNotNone(community_detection_mcp) + + # Test server names + self.assertEqual(semantic_search_mcp.name, "SemanticSearchAgent") + self.assertEqual(explicit_search_mcp.name, "ExplicitSearchAgent") + self.assertEqual(community_detection_mcp.name, "CommunityDetectionAgent") + + def test_server_tool_registration(self): + """Test that tools are properly registered on servers""" + # Each server should have exactly one tool + self.assertEqual(len(semantic_search_mcp._tools), 1) + self.assertEqual(len(explicit_search_mcp._tools), 1) + self.assertEqual(len(community_detection_mcp._tools), 1) + + # Check tool names + self.assertIn("semantic_search", semantic_search_mcp._tools) + self.assertIn("explicit_search", explicit_search_mcp._tools) + self.assertIn("community_detection", community_detection_mcp._tools) + + +if __name__ == '__main__': + # Run the tests + unittest.main(verbosity=2)