diff --git a/src/backend/.env.sample b/src/backend/.env.sample index 911b771ff..6009c6a48 100644 --- a/src/backend/.env.sample +++ b/src/backend/.env.sample @@ -12,6 +12,8 @@ AZURE_AI_PROJECT_ENDPOINT= AZURE_AI_SUBSCRIPTION_ID= AZURE_AI_RESOURCE_GROUP= AZURE_AI_PROJECT_NAME= +AZURE_AI_AGENT_PROJECT_CONNECTION_STRING= +AZURE_AI_MODEL_DEPLOYMENT_NAME=gpt-4o APPLICATIONINSIGHTS_CONNECTION_STRING= diff --git a/src/backend/app_config.py b/src/backend/app_config.py new file mode 100644 index 000000000..e37eb19e8 --- /dev/null +++ b/src/backend/app_config.py @@ -0,0 +1,257 @@ +# app_config.py +import os +import logging +from typing import Optional, List, Dict, Any +from dotenv import load_dotenv +from azure.identity import DefaultAzureCredential, ClientSecretCredential +from azure.cosmos.aio import CosmosClient +from azure.ai.projects.aio import AIProjectClient +from semantic_kernel.kernel import Kernel +from semantic_kernel.contents import ChatHistory +from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent +# Load environment variables from .env file +load_dotenv() + +class AppConfig: + """Application configuration class that loads settings from environment variables.""" + + def __init__(self): + """Initialize the application configuration with environment variables.""" + # Azure authentication settings + self.AZURE_TENANT_ID = self._get_optional("AZURE_TENANT_ID") + self.AZURE_CLIENT_ID = self._get_optional("AZURE_CLIENT_ID") + self.AZURE_CLIENT_SECRET = self._get_optional("AZURE_CLIENT_SECRET") + + # CosmosDB settings + self.COSMOSDB_ENDPOINT = self._get_optional("COSMOSDB_ENDPOINT") + self.COSMOSDB_DATABASE = self._get_optional("COSMOSDB_DATABASE") + self.COSMOSDB_CONTAINER = self._get_optional("COSMOSDB_CONTAINER") + + # Azure OpenAI settings + self.AZURE_OPENAI_DEPLOYMENT_NAME = self._get_required("AZURE_OPENAI_DEPLOYMENT_NAME", "gpt-4o") + self.AZURE_OPENAI_API_VERSION = self._get_required("AZURE_OPENAI_API_VERSION", "2024-11-20") + self.AZURE_OPENAI_ENDPOINT = self._get_required("AZURE_OPENAI_ENDPOINT") + self.AZURE_OPENAI_SCOPES = [f"{self._get_optional('AZURE_OPENAI_SCOPE', 'https://cognitiveservices.azure.com/.default')}"] + + # Frontend settings + self.FRONTEND_SITE_NAME = self._get_optional("FRONTEND_SITE_NAME", "http://127.0.0.1:3000") + + # Azure AI settings + self.AZURE_AI_SUBSCRIPTION_ID = self._get_required("AZURE_AI_SUBSCRIPTION_ID") + self.AZURE_AI_RESOURCE_GROUP = self._get_required("AZURE_AI_RESOURCE_GROUP") + self.AZURE_AI_PROJECT_NAME = self._get_required("AZURE_AI_PROJECT_NAME") + self.AZURE_AI_AGENT_PROJECT_CONNECTION_STRING = self._get_required("AZURE_AI_AGENT_PROJECT_CONNECTION_STRING") + + # Cached clients and resources + self._azure_credentials = None + self._cosmos_client = None + self._cosmos_database = None + self._ai_project_client = None + + def _get_required(self, name: str, default: Optional[str] = None) -> str: + """Get a required configuration value from environment variables. + + Args: + name: The name of the environment variable + default: Optional default value if not found + + Returns: + The value of the environment variable or default if provided + + Raises: + ValueError: If the environment variable is not found and no default is provided + """ + if name in os.environ: + return os.environ[name] + if default is not None: + logging.warning("Environment variable %s not found, using default value", name) + return default + raise ValueError(f"Environment variable {name} not found and no default provided") + + def _get_optional(self, name: str, default: str = "") -> str: + """Get an optional configuration value from environment variables. + + Args: + name: The name of the environment variable + default: Default value if not found (default: "") + + Returns: + The value of the environment variable or the default value + """ + if name in os.environ: + return os.environ[name] + return default + + def _get_bool(self, name: str) -> bool: + """Get a boolean configuration value from environment variables. + + Args: + name: The name of the environment variable + + Returns: + True if the environment variable exists and is set to 'true' or '1', False otherwise + """ + return name in os.environ and os.environ[name].lower() in ["true", "1"] + + def get_azure_credentials(self): + """Get Azure credentials using DefaultAzureCredential. + + Returns: + DefaultAzureCredential instance for Azure authentication + """ + # Cache the credentials object + if self._azure_credentials is not None: + return self._azure_credentials + + try: + self._azure_credentials = DefaultAzureCredential() + return self._azure_credentials + except Exception as exc: + logging.warning("Failed to create DefaultAzureCredential: %s", exc) + return None + + def get_cosmos_database_client(self): + """Get a Cosmos DB client for the configured database. + + Returns: + A Cosmos DB database client + """ + try: + if self._cosmos_client is None: + self._cosmos_client = CosmosClient( + self.COSMOSDB_ENDPOINT, credential=self.get_azure_credentials() + ) + + if self._cosmos_database is None: + self._cosmos_database = self._cosmos_client.get_database_client( + self.COSMOSDB_DATABASE + ) + + return self._cosmos_database + except Exception as exc: + logging.error("Failed to create CosmosDB client: %s. CosmosDB is required for this application.", exc) + raise + + def create_kernel(self): + """Creates a new Semantic Kernel instance. + + Returns: + A new Semantic Kernel instance + """ + # Create a new kernel instance without manually configuring OpenAI services + # The agents will be created using Azure AI Agent Project pattern instead + kernel = Kernel() + return kernel + + def get_ai_project_client(self): + """Create and return an AIProjectClient for Azure AI Foundry using from_connection_string. + + Returns: + An AIProjectClient instance + """ + if self._ai_project_client is not None: + return self._ai_project_client + + try: + credential = self.get_azure_credentials() + if credential is None: + raise RuntimeError("Unable to acquire Azure credentials; ensure DefaultAzureCredential is configured") + + connection_string = self.AZURE_AI_AGENT_PROJECT_CONNECTION_STRING + self._ai_project_client = AIProjectClient.from_connection_string( + credential=credential, + conn_str=connection_string + ) + logging.info("Successfully created AIProjectClient using connection string") + return self._ai_project_client + except Exception as exc: + logging.error("Failed to create AIProjectClient: %s", exc) + raise + + async def create_azure_ai_agent( + self, + kernel: Kernel, + agent_name: str, + instructions: str, + agent_type: str = "assistant", + tools=None, + tool_resources=None, + response_format=None, + temperature: float = 0.0 + ): + """ + Creates a new Azure AI Agent with the specified name and instructions using AIProjectClient. + + Args: + kernel: The Semantic Kernel instance + agent_name: The name of the agent + instructions: The system message / instructions for the agent + agent_type: The type of agent (defaults to "assistant") + tools: Optional tool definitions for the agent + tool_resources: Optional tool resources required by the tools + response_format: Optional response format to control structured output + temperature: The temperature setting for the agent (defaults to 0.0) + + Returns: + A new AzureAIAgent instance + """ + try: + # Get the AIProjectClient + project_client = self.get_ai_project_client() + + # Tool handling: We need to distinguish between our SK functions and + # the tool definitions needed by project_client.agents.create_agent + tool_definitions = None + kernel_functions = [] + + # If tools are provided and they are SK KernelFunctions, we need to handle them differently + # than if they are already tool definitions expected by AIProjectClient + if tools: + # Check if tools are SK KernelFunctions + if all(hasattr(tool, 'name') and hasattr(tool, 'invoke') for tool in tools): + # Store the kernel functions to register with the agent later + kernel_functions = tools + # For now, we don't extract tool definitions from kernel functions + # This would require additional code to convert SK functions to AI Project tool definitions + logging.warning("Kernel functions provided as tools will be registered with the agent after creation") + else: + # Assume these are already proper tool definitions for create_agent + tool_definitions = tools + + # Create the agent using the project client + logging.info("Creating agent '%s' with model '%s'", agent_name, self.AZURE_OPENAI_DEPLOYMENT_NAME) + agent_definition = await project_client.agents.create_agent( + model=self.AZURE_OPENAI_DEPLOYMENT_NAME, + name=agent_name, + instructions=instructions, + tools=tool_definitions, # Only pass tool_definitions, not kernel functions + tool_resources=tool_resources, + temperature=temperature, + response_format=response_format + ) + + # Create the agent instance directly with project_client and definition + agent_kwargs = { + "client": project_client, + "definition": agent_definition, + "kernel": kernel + } + + + # For other agents, create using standard AzureAIAgent + agent = AzureAIAgent(**agent_kwargs) + + # Register the kernel functions with the agent if any were provided + if kernel_functions: + for function in kernel_functions: + if hasattr(agent, 'add_function'): + agent.add_function(function) + + return agent + except Exception as exc: + logging.error("Failed to create Azure AI Agent: %s", exc) + raise + + +# Create a global instance of AppConfig +config = AppConfig() \ No newline at end of file diff --git a/src/backend/app_kernel.py b/src/backend/app_kernel.py index 798aafe76..01fb21256 100644 --- a/src/backend/app_kernel.py +++ b/src/backend/app_kernel.py @@ -3,6 +3,8 @@ import logging import os import uuid +import re +import json from typing import List, Dict, Optional, Any # FastAPI imports @@ -85,52 +87,9 @@ async def input_task_endpoint(input_task: InputTask, request: Request): """ Receive the initial input task from the user. - - --- - tags: - - Input Task - parameters: - - name: user_principal_id - in: header - type: string - required: true - description: User ID extracted from the authentication header - - name: body - in: body - required: true - schema: - type: object - properties: - session_id: - type: string - description: Optional session ID, generated if not provided - description: - type: string - description: The task description - user_id: - type: string - description: The user ID associated with the task - responses: - 200: - description: Task created successfully - schema: - type: object - properties: - status: - type: string - session_id: - type: string - plan_id: - type: string - description: - type: string - user_id: - type: string - 400: - description: Missing or invalid user information """ - - if not rai_success(input_task.description): + # Fix 1: Properly await the async rai_success function + if not await rai_success(input_task.description): print("RAI failed") track_event_if_configured( @@ -150,67 +109,91 @@ async def input_task_endpoint(input_task: InputTask, request: Request): if not user_id: track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"}) - raise HTTPException(status_code=400, detail="no user") + + # Generate session ID if not provided if not input_task.session_id: input_task.session_id = str(uuid.uuid4()) - - # Get the agents for this session - agents = await get_agents(input_task.session_id, user_id) - # Send the task to the planner agent - planner_agent = agents["PlannerAgent"] - - # Convert input task to JSON for the kernel function - input_task_json = input_task.json() - - # Use the planner to handle the task - result = await planner_agent.handle_input_task( - KernelArguments(input_task_json=input_task_json) - ) - - # Extract plan ID from the result - # This is a simplified approach - in a real system, - # we would properly parse the result to get the plan ID - memory_store = planner_agent._memory_store - plan = await memory_store.get_plan_by_session(input_task.session_id) + # Fix 2: Don't try to set user_id on InputTask directly since it doesn't have that field + # Instead, include it in the JSON we'll pass to the planner - if not plan or not plan.id: + try: + # Create just the planner agent instead of all agents + kernel, memory_store = await initialize_runtime_and_context(input_task.session_id, user_id) + planner_agent = await AgentFactory.create_agent( + agent_type=AgentType.PLANNER, + session_id=input_task.session_id, + user_id=user_id + ) + + # Convert input task to JSON for the kernel function, add user_id here + input_task_data = input_task.model_dump() + input_task_data["user_id"] = user_id + input_task_json = json.dumps(input_task_data) + + # Use the planner to handle the task + result = await planner_agent.handle_input_task( + KernelArguments(input_task_json=input_task_json) + ) + + # Get plan from memory store + plan = await memory_store.get_plan_by_session(input_task.session_id) + + if not plan or not plan.id: + # If plan not found by session, try to extract plan ID from result + plan_id_match = re.search(r"Plan '([^']+)'", result) + + if plan_id_match: + plan_id = plan_id_match.group(1) + plan = await memory_store.get_plan(plan_id) + + # If still no plan found, handle the failure + if not plan or not plan.id: + track_event_if_configured( + "PlanCreationFailed", + { + "session_id": input_task.session_id, + "description": input_task.description, + } + ) + raise HTTPException(status_code=400, detail="Error: Failed to create plan") + + # Log custom event for successful input task processing track_event_if_configured( - "PlanCreationFailed", + "InputTaskProcessed", { + "status": f"Plan created with ID: {plan.id}", "session_id": input_task.session_id, + "plan_id": plan.id, "description": input_task.description, - } + }, ) + return { - "status": "Error: Failed to create plan", - "session_id": input_task.session_id, - "plan_id": "", - "description": input_task.description, - } - - # Log custom event for successful input task processing - track_event_if_configured( - "InputTaskProcessed", - { "status": f"Plan created with ID: {plan.id}", "session_id": input_task.session_id, "plan_id": plan.id, "description": input_task.description, - }, - ) + } + + except Exception as e: + logging.exception(f"Error handling input task: {e}") + track_event_if_configured( + "InputTaskError", + { + "session_id": input_task.session_id, + "description": input_task.description, + "error": str(e), + } + ) + raise HTTPException(status_code=400, detail="Error creating plan") - return { - "status": f"Plan created with ID: {plan.id}", - "session_id": input_task.session_id, - "plan_id": plan.id, - "description": input_task.description, - } @app.post("/human_feedback") async def human_feedback_endpoint(human_feedback: HumanFeedback, request: Request): + """ Receive human feedback on a step. @@ -565,7 +548,8 @@ async def get_plans( ) raise HTTPException(status_code=404, detail="Plan not found") - steps = await memory_store.get_steps_for_plan(plan.id, session_id) + # Use get_steps_by_plan to match the original implementation + steps = await memory_store.get_steps_by_plan(plan_id=plan.id) plan_with_steps = PlanWithSteps(**plan.model_dump(), steps=steps) plan_with_steps.update_step_counts() return [plan_with_steps] @@ -573,7 +557,7 @@ async def get_plans( all_plans = await memory_store.get_all_plans() # Fetch steps for all plans concurrently steps_for_all_plans = await asyncio.gather( - *[memory_store.get_steps_for_plan(plan.id, plan.session_id) for plan in all_plans] + *[memory_store.get_steps_by_plan(plan_id=plan.id) for plan in all_plans] ) # Create list of PlanWithSteps and update step counts list_of_plans_with_steps = [] @@ -631,7 +615,7 @@ async def get_steps_by_plan(plan_id: str, request: Request) -> List[Step]: updated_action: type: string description: Optional modified action based on feedback - 400: + 400: description: Missing or invalid user information 404: description: Plan or steps not found @@ -658,6 +642,9 @@ async def get_agent_messages(session_id: str, request: Request) -> List[AgentMes - Agent Messages parameters: - name: session_id + in: path + type: string + required: true in: path type: string required: true diff --git a/src/backend/config_kernel.py b/src/backend/config_kernel.py index 03055041e..f65c0a4f0 100644 --- a/src/backend/config_kernel.py +++ b/src/backend/config_kernel.py @@ -1,163 +1,59 @@ # config_kernel.py import os import logging -from typing import Optional - -# Import Semantic Kernel and Azure AI Agent -from semantic_kernel import Kernel +import semantic_kernel as sk +from semantic_kernel.kernel import Kernel +from semantic_kernel.contents import ChatHistory from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent -from azure.cosmos.aio import CosmosClient -from azure.identity.aio import DefaultAzureCredential -from azure.ai.projects.aio import AIProjectClient -from dotenv import load_dotenv - -load_dotenv() - - -def GetRequiredConfig(name, default=None): - if name in os.environ: - return os.environ[name] - if default is not None: - logging.warning("Environment variable %s not found, using default value", name) - return default - raise ValueError(f"Environment variable {name} not found and no default provided") - - -def GetOptionalConfig(name, default=""): - if name in os.environ: - return os.environ[name] - return default - - -def GetBoolConfig(name): - return name in os.environ and os.environ[name].lower() in ["true", "1"] +# Import AppConfig from app_config +from app_config import config +# This file is left as a lightweight wrapper around AppConfig for backward compatibility +# All configuration is now handled by AppConfig in app_config.py class Config: - # Try to get required config with defaults to allow local development - AZURE_TENANT_ID = GetOptionalConfig("AZURE_TENANT_ID") - AZURE_CLIENT_ID = GetOptionalConfig("AZURE_CLIENT_ID") - AZURE_CLIENT_SECRET = GetOptionalConfig("AZURE_CLIENT_SECRET") - - COSMOSDB_ENDPOINT = GetOptionalConfig("COSMOSDB_ENDPOINT", "https://localhost:8081") - COSMOSDB_DATABASE = GetOptionalConfig("COSMOSDB_DATABASE", "macae-database") - COSMOSDB_CONTAINER = GetOptionalConfig("COSMOSDB_CONTAINER", "macae-container") - - AZURE_OPENAI_DEPLOYMENT_NAME = GetRequiredConfig("AZURE_OPENAI_DEPLOYMENT_NAME", "gpt-35-turbo") - AZURE_OPENAI_API_VERSION = GetRequiredConfig("AZURE_OPENAI_API_VERSION", "2023-12-01-preview") - AZURE_OPENAI_ENDPOINT = GetRequiredConfig("AZURE_OPENAI_ENDPOINT", "https://api.openai.com/v1") - - # Azure OpenAI scopes for token-based authentication - AZURE_OPENAI_SCOPES = [f"{GetOptionalConfig('AZURE_OPENAI_SCOPE', 'https://cognitiveservices.azure.com/.default')}"] - - FRONTEND_SITE_NAME = GetOptionalConfig( - "FRONTEND_SITE_NAME", "http://127.0.0.1:3000" - ) - - AZURE_AI_SUBSCRIPTION_ID = GetRequiredConfig("AZURE_AI_SUBSCRIPTION_ID") - AZURE_AI_RESOURCE_GROUP = GetRequiredConfig("AZURE_AI_RESOURCE_GROUP") - AZURE_AI_PROJECT_NAME = GetRequiredConfig("AZURE_AI_PROJECT_NAME") - AZURE_AI_AGENT_PROJECT_CONNECTION_STRING = GetRequiredConfig("AZURE_AI_AGENT_PROJECT_CONNECTION_STRING") - - __azure_credentials = None - __comos_client = None - __cosmos_database = None - __ai_project_client = None + # Use values from AppConfig + AZURE_TENANT_ID = config.AZURE_TENANT_ID + AZURE_CLIENT_ID = config.AZURE_CLIENT_ID + AZURE_CLIENT_SECRET = config.AZURE_CLIENT_SECRET + + # CosmosDB settings + COSMOSDB_ENDPOINT = config.COSMOSDB_ENDPOINT + COSMOSDB_DATABASE = config.COSMOSDB_DATABASE + COSMOSDB_CONTAINER = config.COSMOSDB_CONTAINER + + # Azure OpenAI settings + AZURE_OPENAI_DEPLOYMENT_NAME = config.AZURE_OPENAI_DEPLOYMENT_NAME + AZURE_OPENAI_API_VERSION = config.AZURE_OPENAI_API_VERSION + AZURE_OPENAI_ENDPOINT = config.AZURE_OPENAI_ENDPOINT + AZURE_OPENAI_SCOPES = config.AZURE_OPENAI_SCOPES + + # Other settings + FRONTEND_SITE_NAME = config.FRONTEND_SITE_NAME + AZURE_AI_SUBSCRIPTION_ID = config.AZURE_AI_SUBSCRIPTION_ID + AZURE_AI_RESOURCE_GROUP = config.AZURE_AI_RESOURCE_GROUP + AZURE_AI_PROJECT_NAME = config.AZURE_AI_PROJECT_NAME + AZURE_AI_AGENT_PROJECT_CONNECTION_STRING = config.AZURE_AI_AGENT_PROJECT_CONNECTION_STRING @staticmethod def GetAzureCredentials(): - """Get Azure credentials using DefaultAzureCredential. - - Returns: - DefaultAzureCredential instance for Azure authentication - """ - # Cache the credentials object - if Config.__azure_credentials is not None: - return Config.__azure_credentials - try: - Config.__azure_credentials = DefaultAzureCredential() - return Config.__azure_credentials - except Exception as exc: - logging.warning("Failed to create DefaultAzureCredential: %s", exc) - return None + """Get Azure credentials using the AppConfig implementation.""" + return config.get_azure_credentials() @staticmethod def GetCosmosDatabaseClient(): - """Get a Cosmos DB client for the configured database. - - Returns: - A Cosmos DB database client - """ - try: - if Config.__comos_client is None: - Config.__comos_client = CosmosClient( - Config.COSMOSDB_ENDPOINT, credential=Config.GetAzureCredentials() - ) + """Get a Cosmos DB client using the AppConfig implementation.""" + return config.get_cosmos_database_client() - if Config.__cosmos_database is None: - Config.__cosmos_database = Config.__comos_client.get_database_client( - Config.COSMOSDB_DATABASE - ) - - return Config.__cosmos_database - except Exception as exc: - logging.error("Failed to create CosmosDB client: %s. CosmosDB is required for this application.", exc) - raise - - @staticmethod - async def GetAzureOpenAIToken() -> Optional[str]: - """Get an Azure AD token for Azure OpenAI. - - Returns: - A bearer token or None if token could not be obtained - """ - try: - credential = Config.GetAzureCredentials() - if credential is None: - logging.warning("No Azure credentials available") - return None - token = await credential.get_token(*Config.AZURE_OPENAI_SCOPES) - return token.token - except Exception as exc: - logging.error("Failed to get Azure OpenAI token: %s", exc) - return None - @staticmethod def CreateKernel(): - """ - Creates a new Semantic Kernel instance. - - Returns: - A new Semantic Kernel instance - """ - kernel = Kernel() - return kernel + """Creates a new Semantic Kernel instance using the AppConfig implementation.""" + return config.create_kernel() @staticmethod def GetAIProjectClient(): - """Create and return an AIProjectClient for Azure AI Foundry using from_connection_string. - - Returns: - An AIProjectClient instance - """ - if Config.__ai_project_client is not None: - return Config.__ai_project_client - - try: - credential = Config.GetAzureCredentials() - if credential is None: - raise RuntimeError("Unable to acquire Azure credentials; ensure DefaultAzureCredential is configured") - - connection_string = Config.AZURE_AI_AGENT_PROJECT_CONNECTION_STRING - Config.__ai_project_client = AIProjectClient.from_connection_string( - credential=credential, - conn_str=connection_string - ) - logging.info("Successfully created AIProjectClient using connection string") - return Config.__ai_project_client - except Exception as exc: - logging.error("Failed to create AIProjectClient: %s", exc) - raise + """Get an AIProjectClient using the AppConfig implementation.""" + return config.get_ai_project_client() @staticmethod async def CreateAzureAIAgent( @@ -170,73 +66,14 @@ async def CreateAzureAIAgent( response_format=None, temperature: float = 0.0 ): - """ - Creates a new Azure AI Agent with the specified name and instructions using AIProjectClient. - - Args: - kernel: The Semantic Kernel instance - agent_name: The name of the agent - instructions: The system message / instructions for the agent - agent_type: The type of agent (defaults to "assistant") - tools: Optional tool definitions for the agent - tool_resources: Optional tool resources required by the tools - response_format: Optional response format to control structured output - temperature: The temperature setting for the agent (defaults to 0.0) - - Returns: - A new AzureAIAgent instance - """ - try: - # Get the AIProjectClient - project_client = Config.GetAIProjectClient() - - # Tool handling: We need to distinguish between our SK functions and - # the tool definitions needed by project_client.agents.create_agent - tool_definitions = None - kernel_functions = [] - - # If tools are provided and they are SK KernelFunctions, we need to handle them differently - # than if they are already tool definitions expected by AIProjectClient - if tools: - # Check if tools are SK KernelFunctions - if all(hasattr(tool, 'name') and hasattr(tool, 'invoke') for tool in tools): - # Store the kernel functions to register with the agent later - kernel_functions = tools - # For now, we don't extract tool definitions from kernel functions - # This would require additional code to convert SK functions to AI Project tool definitions - logging.warning("Kernel functions provided as tools will be registered with the agent after creation") - else: - # Assume these are already proper tool definitions for create_agent - tool_definitions = tools - - # Create the agent using the project client - logging.info("Creating agent '%s' with model '%s'", agent_name, Config.AZURE_OPENAI_DEPLOYMENT_NAME) - agent_definition = await project_client.agents.create_agent( - model=Config.AZURE_OPENAI_DEPLOYMENT_NAME, - name=agent_name, - instructions=instructions, - tools=tool_definitions, # Only pass tool_definitions, not kernel functions - tool_resources=tool_resources, - temperature=temperature, - response_format=response_format - ) - - # Create the agent instance directly with project_client and definition - agent_kwargs = { - "client": project_client, - "definition": agent_definition, - "kernel": kernel - } - - agent = AzureAIAgent(**agent_kwargs) - - # Register the kernel functions with the agent if any were provided - if kernel_functions: - for function in kernel_functions: - if hasattr(agent, 'add_function'): - agent.add_function(function) - - return agent - except Exception as exc: - logging.error("Failed to create Azure AI Agent: %s", exc) - raise \ No newline at end of file + """Creates a new Azure AI Agent using the AppConfig implementation.""" + return await config.create_azure_ai_agent( + kernel=kernel, + agent_name=agent_name, + instructions=instructions, + agent_type=agent_type, + tools=tools, + tool_resources=tool_resources, + response_format=response_format, + temperature=temperature + ) \ No newline at end of file diff --git a/src/backend/context/cosmos_memory_kernel.py b/src/backend/context/cosmos_memory_kernel.py index fc47245f0..de4639fc1 100644 --- a/src/backend/context/cosmos_memory_kernel.py +++ b/src/backend/context/cosmos_memory_kernel.py @@ -3,17 +3,29 @@ import asyncio import logging import uuid +import json +import datetime from typing import Any, Dict, List, Optional, Type, Tuple import numpy as np from azure.cosmos.partition_key import PartitionKey +from azure.cosmos.aio import CosmosClient +from azure.identity import DefaultAzureCredential from semantic_kernel.memory.memory_record import MemoryRecord from semantic_kernel.memory.memory_store_base import MemoryStoreBase from semantic_kernel.contents import ChatMessageContent, ChatHistory, AuthorRole -from config_kernel import Config +# Import the AppConfig instance +from app_config import config from models.messages_kernel import BaseDataModel, Plan, Session, Step, AgentMessage +# Add custom JSON encoder class for datetime objects +class DateTimeEncoder(json.JSONEncoder): + """Custom JSON encoder for handling datetime objects.""" + def default(self, obj): + if isinstance(obj, datetime.datetime): + return obj.isoformat() + return super().default(obj) class CosmosMemoryContext(MemoryStoreBase): """A buffered chat completion context that saves messages and data models to Cosmos DB.""" @@ -30,13 +42,21 @@ def __init__( self, session_id: str, user_id: str, + cosmos_container: str = None, + cosmos_endpoint: str = None, + cosmos_database: str = None, buffer_size: int = 100, initial_messages: Optional[List[ChatMessageContent]] = None, ) -> None: self._buffer_size = buffer_size self._messages = initial_messages or [] - self._cosmos_container = Config.COSMOSDB_CONTAINER - self._database = Config.GetCosmosDatabaseClient() + + # Use values from AppConfig instance if not provided + self._cosmos_container = cosmos_container or config.COSMOSDB_CONTAINER + self._cosmos_endpoint = cosmos_endpoint or config.COSMOSDB_ENDPOINT + self._cosmos_database = cosmos_database or config.COSMOSDB_DATABASE + + self._database = None self._container = None self.session_id = session_id self.user_id = user_id @@ -47,8 +67,14 @@ def __init__( async def initialize(self): """Initialize the memory context using CosmosDB.""" try: - if self._database is None: - raise ValueError("CosmosDB client is not available. Please check CosmosDB configuration.") + if not self._database: + # Create Cosmos client + cosmos_client = CosmosClient( + self._cosmos_endpoint, + credential=DefaultAzureCredential() + ) + self._database = cosmos_client.get_database_client(self._cosmos_database) + # Set up CosmosDB container self._container = await self._database.create_container_if_not_exists( id=self._cosmos_container, @@ -65,16 +91,36 @@ async def initialize(self): # Helper method for awaiting initialization async def ensure_initialized(self): """Ensure that the container is initialized.""" - await self._initialized.wait() + if not self._initialized.is_set(): + # If the initialization hasn't been done, do it now + await self.initialize() + + # If after initialization the container is still None, that means initialization failed if self._container is None: - raise RuntimeError("CosmosDB container is not available. Initialization failed.") + # Re-attempt initialization once in case the previous attempt failed + try: + await self.initialize() + except Exception as e: + logging.error(f"Re-initialization attempt failed: {e}") + + # If still not initialized, raise error + if self._container is None: + raise RuntimeError("CosmosDB container is not available. Initialization failed.") async def add_item(self, item: BaseDataModel) -> None: """Add a data model item to Cosmos DB.""" await self.ensure_initialized() try: + # Convert the model to a dict document = item.model_dump() + + # Handle datetime objects by converting them to ISO format strings + for key, value in list(document.items()): + if isinstance(value, datetime.datetime): + document[key] = value.isoformat() + + # Now create the item with the serialized datetime values await self._container.create_item(body=document) logging.info(f"Item added to Cosmos DB - {document['id']}") except Exception as e: @@ -86,7 +132,15 @@ async def update_item(self, item: BaseDataModel) -> None: await self.ensure_initialized() try: + # Convert the model to a dict document = item.model_dump() + + # Handle datetime objects by converting them to ISO format strings + for key, value in list(document.items()): + if isinstance(value, datetime.datetime): + document[key] = value.isoformat() + + # Now upsert the item with the serialized datetime values await self._container.upsert_item(body=document) except Exception as e: logging.exception(f"Failed to update item in Cosmos DB: {e}") @@ -170,8 +224,17 @@ async def get_plan_by_session(self, session_id: str) -> Optional[Plan]: return plans[0] if plans else None async def get_plan(self, plan_id: str) -> Optional[Plan]: + """Retrieve a plan by its ID. + + Args: + plan_id: The ID of the plan to retrieve + + Returns: + The Plan object or None if not found + """ + # Use the session_id as the partition key since that's how we're partitioning our data return await self.get_item_by_id( - plan_id, partition_key=plan_id, model_class=Plan + plan_id, partition_key=self.session_id, model_class=Plan ) async def get_all_plans(self) -> List[Plan]: diff --git a/src/backend/kernel_agents/agent_base.py b/src/backend/kernel_agents/agent_base.py index c444a814d..a0381779a 100644 --- a/src/backend/kernel_agents/agent_base.py +++ b/src/backend/kernel_agents/agent_base.py @@ -17,7 +17,8 @@ Step, StepStatus, ) -from config_kernel import Config +# Import the new AppConfig instance +from app_config import config from event_utils import track_event_if_configured # Default formatting instructions used across agents @@ -98,7 +99,7 @@ async def async_init(self): This method must be called after creating the agent to complete initialization. """ # Create Azure AI Agent or fallback - self._agent = await Config.CreateAzureAIAgent( + self._agent = await config.create_azure_ai_agent( kernel=self._kernel, agent_name=self._agent_name, instructions=self._system_message @@ -126,6 +127,132 @@ async def handle_action_request_wrapper(*args, **kwargs): # Use agent name as plugin for handler self._kernel.add_function(self._agent_name, kernel_func) + async def handle_action_request(self, action_request_json: str) -> str: + """Handle an action request from another agent or the system. + + Args: + action_request_json: The action request as a JSON string + + Returns: + A JSON string containing the action response + """ + # Parse the action request + action_request_dict = json.loads(action_request_json) + action_request = ActionRequest(**action_request_dict) + + # Get the step from memory + step: Step = await self._memory_store.get_step( + action_request.step_id, action_request.session_id + ) + + if not step: + # Create error response if step not found + response = ActionResponse( + step_id=action_request.step_id, + status=StepStatus.failed, + message="Step not found in memory.", + ) + return response.json() + + # Add messages to chat history for context + # This gives the agent visibility of the conversation history + self._chat_history.extend([ + {"role": "assistant", "content": action_request.action}, + {"role": "user", "content": f"{step.human_feedback}. Now make the function call"} + ]) + + try: + # Use the agent to process the action + chat_history = self._chat_history.copy() + + # Call the agent to handle the action + agent_response = await self._agent.invoke(self._kernel, f"{action_request.action}\n\nPlease perform this action") + result = str(agent_response) + + # Store agent message in cosmos memory + await self._memory_store.add_item( + AgentMessage( + session_id=action_request.session_id, + user_id=self._user_id, + plan_id=action_request.plan_id, + content=f"{result}", + source=self._agent_name, + step_id=action_request.step_id, + ) + ) + + # Track telemetry + track_event_if_configured( + "Base agent - Added into the cosmos", + { + "session_id": action_request.session_id, + "user_id": self._user_id, + "plan_id": action_request.plan_id, + "content": f"{result}", + "source": self._agent_name, + "step_id": action_request.step_id, + }, + ) + + except Exception as e: + logging.exception(f"Error during agent execution: {e}") + + # Track error in telemetry + track_event_if_configured( + "Base agent - Error during agent execution, captured into the cosmos", + { + "session_id": action_request.session_id, + "user_id": self._user_id, + "plan_id": action_request.plan_id, + "content": f"{e}", + "source": self._agent_name, + "step_id": action_request.step_id, + }, + ) + + # Return an error response + response = ActionResponse( + step_id=action_request.step_id, + plan_id=action_request.plan_id, + session_id=action_request.session_id, + result=f"Error: {str(e)}", + status=StepStatus.failed, + ) + return response.json() + + logging.info(f"Task completed: {result}") + + # Update step status + step.status = StepStatus.completed + step.agent_reply = result + await self._memory_store.update_step(step) + + # Track step completion in telemetry + track_event_if_configured( + "Base agent - Updated step and updated into the cosmos", + { + "status": StepStatus.completed, + "session_id": action_request.session_id, + "agent_reply": f"{result}", + "user_id": self._user_id, + "plan_id": action_request.plan_id, + "content": f"{result}", + "source": self._agent_name, + "step_id": action_request.step_id, + }, + ) + + # Create and return action response + response = ActionResponse( + step_id=step.id, + plan_id=step.plan_id, + session_id=action_request.session_id, + result=result, + status=StepStatus.completed, + ) + + return response.json() + async def invoke_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str: """Invoke a specific tool by name with the provided arguments. @@ -274,6 +401,11 @@ def get_tools_from_config(cls, kernel: sk.Kernel, agent_type: str, config_path: kernel_functions = [] plugin_name = f"{agent_type}_plugin" + # Early return if no tools defined - prevent empty iteration + if not config.get("tools"): + logging.info(f"No tools defined for agent type '{agent_type}'. Returning empty list.") + return kernel_functions + for tool in config.get("tools", []): try: function_name = tool["name"] @@ -300,8 +432,22 @@ def get_tools_from_config(cls, kernel: sk.Kernel, agent_type: str, config_path: # Register the function with the kernel kernel.add_function(plugin_name, kernel_func) kernel_functions.append(kernel_func) - logging.info(f"Successfully created dynamic tool '{function_name}' for {agent_type}") + logging.debug(f"Successfully created dynamic tool '{function_name}' for {agent_type}") except Exception as e: logging.error(f"Failed to create tool '{tool.get('name', 'unknown')}': {str(e)}") - return kernel_functions \ No newline at end of file + # Log the total number of tools created + if kernel_functions: + logging.info(f"Created {len(kernel_functions)} tools for agent type '{agent_type}'") + else: + logging.info(f"No tools were successfully created for agent type '{agent_type}'") + + return kernel_functions + + def save_state(self) -> Mapping[str, Any]: + """Save the state of this agent.""" + return {"memory": self._memory_store.save_state()} + + def load_state(self, state: Mapping[str, Any]) -> None: + """Load the state of this agent.""" + self._memory_store.load_state(state["memory"]) \ No newline at end of file diff --git a/src/backend/kernel_agents/agent_factory.py b/src/backend/kernel_agents/agent_factory.py index f3e2e4524..47b654589 100644 --- a/src/backend/kernel_agents/agent_factory.py +++ b/src/backend/kernel_agents/agent_factory.py @@ -6,24 +6,26 @@ from semantic_kernel import Kernel from semantic_kernel.functions import KernelFunction from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent +import inspect from models.agent_types import AgentType from kernel_agents.agent_base import BaseAgent -from config_kernel import Config +# Import the new AppConfig instance +from app_config import config # Import all specialized agent implementations from kernel_agents.hr_agent import HrAgent from kernel_agents.human_agent import HumanAgent from kernel_agents.marketing_agent import MarketingAgent from kernel_agents.generic_agent import GenericAgent -from kernel_agents.planner_agent import PlannerAgent from kernel_agents.tech_support_agent import TechSupportAgent from kernel_agents.procurement_agent import ProcurementAgent from kernel_agents.product_agent import ProductAgent +from kernel_agents.planner_agent import PlannerAgent # Add PlannerAgent import from kernel_agents.group_chat_manager import GroupChatManager from context.cosmos_memory_kernel import CosmosMemoryContext -from azure.ai.projects.models import Agent + logger = logging.getLogger(__name__) @@ -40,8 +42,8 @@ class AgentFactory: AgentType.TECH_SUPPORT: TechSupportAgent, AgentType.GENERIC: GenericAgent, AgentType.HUMAN: HumanAgent, - AgentType.PLANNER: PlannerAgent, - AgentType.GROUP_CHAT_MANAGER: GroupChatManager, + AgentType.PLANNER: PlannerAgent, # Add PlannerAgent + AgentType.GROUP_CHAT_MANAGER: GroupChatManager, # Add GroupChatManager } # Mapping of agent types to their string identifiers (for automatic tool loading) @@ -53,8 +55,8 @@ class AgentFactory: AgentType.TECH_SUPPORT: "tech_support", AgentType.GENERIC: "generic", AgentType.HUMAN: "human", - AgentType.PLANNER: "planner", - AgentType.GROUP_CHAT_MANAGER: "group_chat_manager", + AgentType.PLANNER: "planner", # Add planner + AgentType.GROUP_CHAT_MANAGER: "group_chat_manager", # Add group_chat_manager } # System messages for each agent type @@ -66,8 +68,8 @@ class AgentFactory: AgentType.TECH_SUPPORT: "You are a technical support expert helping with technical issues.", AgentType.GENERIC: "You are a helpful assistant ready to help with various tasks.", AgentType.HUMAN: "You are representing a human user in the conversation.", - AgentType.PLANNER: "You are a planner agent responsible for creating and managing plans.", - AgentType.GROUP_CHAT_MANAGER: "You are a group chat manager coordinating the conversation between different agents.", + AgentType.PLANNER: "You are a Planner agent responsible for creating and managing plans. You analyze tasks, break them down into steps, and assign them to the appropriate specialized agents.", + AgentType.GROUP_CHAT_MANAGER: "You are a Group Chat Manager coordinating conversations between different agents to execute plans efficiently.", } # Cache of agent instances by session_id and agent_type @@ -127,7 +129,7 @@ async def create_agent( # Check if we already have an agent in the cache if session_id in cls._agent_cache and agent_type in cls._agent_cache[session_id]: return cls._agent_cache[session_id][agent_type] - + # Get the agent class agent_class = cls._agent_classes.get(agent_type) if not agent_class: @@ -136,8 +138,8 @@ async def create_agent( # Create memory store memory_store = CosmosMemoryContext(session_id, user_id) - # Create a kernel - kernel = Config.CreateKernel() + # Create a kernel using the AppConfig instance + kernel = config.create_kernel() # Use default system message if none provided if system_message is None: @@ -154,7 +156,7 @@ async def create_agent( definition = None client = None try: - client = Config.GetAIProjectClient() + client = config.get_ai_project_client() except Exception as client_exc: logger.error(f"Error creating AIProjectClient: {client_exc}") raise @@ -162,7 +164,7 @@ async def create_agent( if tools: # Create the agent definition using the AIProjectClient (project-based pattern) definition = await client.agents.create_agent( - model=Config.AZURE_OPENAI_DEPLOYMENT_NAME, + model=config.AZURE_OPENAI_DEPLOYMENT_NAME, name=agent_type_str, instructions=system_message, temperature=temperature, @@ -176,27 +178,31 @@ async def create_agent( # Create the agent instance using the project-based pattern try: - agent = agent_class( - agent_name=agent_type_str, - kernel=kernel, - session_id=session_id, - user_id=user_id, - memory_store=memory_store, - tools=tools, - system_message=system_message, - client=client, - definition=definition, + # Filter kwargs to only those accepted by the agent's __init__ + agent_init_params = inspect.signature(agent_class.__init__).parameters + valid_keys = set(agent_init_params.keys()) - {"self"} + filtered_kwargs = {k: v for k, v in { + "agent_name": agent_type_str, + "kernel": kernel, + "session_id": session_id, + "user_id": user_id, + "memory_store": memory_store, + "tools": tools, + "system_message": system_message, + "client": client, + "definition": definition, **kwargs - ) + }.items() if k in valid_keys} + agent = agent_class(**filtered_kwargs) logger.debug(f"[DEBUG] Agent object after instantiation: {agent}") - # Initialize the agent asynchronously - init_result = await agent.async_init() - logger.debug(f"[DEBUG] Result of agent.async_init(): {init_result}") + # Initialize the agent asynchronously if it has async_init + if hasattr(agent, 'async_init') and inspect.iscoroutinefunction(agent.async_init): + init_result = await agent.async_init() + logger.debug(f"[DEBUG] Result of agent.async_init(): {init_result}") # Register tools with Azure AI Agent for LLM function calls - if hasattr(agent._agent, 'add_function') and tools: + if hasattr(agent, '_agent') and hasattr(agent._agent, 'add_function') and tools: for fn in tools: agent._agent.add_function(fn) - except Exception as e: logger.error( f"Error creating agent of type {agent_type} with parameters: {e}" @@ -239,11 +245,11 @@ async def create_azure_ai_agent( agent.add_function(tool) return agent - # Create a kernel - kernel = Config.CreateKernel() + # Create a kernel using the AppConfig instance + kernel = config.create_kernel() - # Await creation since CreateAzureAIAgent is async - agent = await Config.CreateAzureAIAgent( + # Await creation since create_azure_ai_agent is async + agent = await config.create_azure_ai_agent( kernel=kernel, agent_name=agent_name, instructions=system_prompt diff --git a/src/backend/kernel_agents/group_chat_manager.py b/src/backend/kernel_agents/group_chat_manager.py index 0c4b69f7a..03f775039 100644 --- a/src/backend/kernel_agents/group_chat_manager.py +++ b/src/backend/kernel_agents/group_chat_manager.py @@ -41,7 +41,6 @@ def __init__( session_id: str, user_id: str, memory_store: CosmosMemoryContext, - config_path: Optional[str] = None, available_agents: Optional[Dict[str, Any]] = None, ) -> None: """Initialize the Group Chat Manager. @@ -58,7 +57,6 @@ def __init__( self._session_id = session_id self._user_id = user_id self._memory_store = memory_store - self._config_path = config_path # Store available agents self._agent_instances = available_agents or {} diff --git a/src/backend/kernel_agents/planner_agent.py b/src/backend/kernel_agents/planner_agent.py index d434266fb..46fe508c9 100644 --- a/src/backend/kernel_agents/planner_agent.py +++ b/src/backend/kernel_agents/planner_agent.py @@ -8,6 +8,7 @@ import semantic_kernel as sk from semantic_kernel.functions import KernelFunction from semantic_kernel.functions.kernel_arguments import KernelArguments +from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent from kernel_agents.agent_base import BaseAgent from context.cosmos_memory_kernel import CosmosMemoryContext @@ -21,6 +22,7 @@ HumanFeedbackStatus, ) from event_utils import track_event_if_configured +from app_config import config # Define structured output models class StructuredOutputStep(BaseModel): @@ -33,7 +35,7 @@ class StructuredOutputPlan(BaseModel): summary_plan_and_steps: str = Field(description="Brief summary of the plan and steps") human_clarification_request: Optional[str] = Field(None, description="Any additional information needed from the human") -class PlannerAgent: +class PlannerAgent(BaseAgent): """Planner agent implementation using Semantic Kernel. This agent creates and manages plans based on user tasks, breaking them down into steps @@ -46,9 +48,14 @@ def __init__( session_id: str, user_id: str, memory_store: CosmosMemoryContext, + tools: Optional[List[KernelFunction]] = None, + system_message: Optional[str] = None, + agent_name: str = "PlannerAgent", config_path: Optional[str] = None, available_agents: List[str] = None, - agent_tools_list: List[str] = None + agent_tools_list: List[str] = None, + client=None, + definition=None, ) -> None: """Initialize the Planner Agent. @@ -57,35 +64,64 @@ def __init__( session_id: The current session identifier user_id: The user identifier memory_store: The Cosmos memory context - config_path: Optional path to the Planner tools configuration file + tools: Optional list of tools for this agent + system_message: Optional system message for the agent + agent_name: Optional name for the agent (defaults to "PlannerAgent") + config_path: Optional path to the configuration file available_agents: List of available agent names for creating steps agent_tools_list: List of available tools across all agents + client: Optional client instance (passed to BaseAgent) + definition: Optional definition instance (passed to BaseAgent) """ - self._kernel = kernel - self._session_id = session_id - self._user_id = user_id - self._memory_store = memory_store - self._config_path = config_path + # Default system message if not provided + if not system_message: + system_message = "You are a Planner agent responsible for creating and managing plans. You analyze tasks, break them down into steps, and assign them to the appropriate specialized agents." - # Store the available agents and their tools + # Initialize the base agent + super().__init__( + agent_name=agent_name, + kernel=kernel, + session_id=session_id, + user_id=user_id, + memory_store=memory_store, + tools=tools, + system_message=system_message, + agent_type="planner", # Use planner_tools.json if available + client=client, + definition=definition + ) + + # Store additional planner-specific attributes self._available_agents = available_agents or ["HumanAgent", "HrAgent", "MarketingAgent", - "ProductAgent", "ProcurementAgent", - "TechSupportAgent", "GenericAgent"] + "ProductAgent", "ProcurementAgent", + "TechSupportAgent", "GenericAgent"] self._agent_tools_list = agent_tools_list or [] - # Load configuration - config = BaseAgent.load_tools_config("planner", config_path) - self._system_message = config.get( - "system_message", - "You are a Planner agent responsible for creating and managing plans. You analyze tasks, break them down into steps, and assign them to the appropriate specialized agents." - ) + # Create the Azure AI Agent for planning operations + # This will be initialized in async_init + self._azure_ai_agent = None - # Create the agent - self._agent = kernel.create_semantic_function( - function_name="PlannerFunction", - prompt=self._system_message, - description="Creates and manages execution plans" - ) + async def async_init(self) -> None: + """Asynchronously initialize the PlannerAgent. + + Creates the Azure AI Agent for planning operations. + + Returns: + None + """ + try: + # Create the Azure AI Agent using AppConfig + self._azure_ai_agent = await config.create_azure_ai_agent( + kernel=self._kernel, + agent_name="PlannerAgent", + instructions=self._generate_instruction(""), + temperature=0.0 + ) + logging.info("Successfully created Azure AI Agent for PlannerAgent") + return True + except Exception as e: + logging.error(f"Failed to create Azure AI Agent for PlannerAgent: {e}") + raise async def handle_input_task(self, kernel_arguments: KernelArguments) -> str: """Handle the initial input task from the user. @@ -232,38 +268,105 @@ async def _create_structured_plan(self, input_task: InputTask) -> Tuple[Plan, Li # Generate the instruction for the LLM instruction = self._generate_instruction(input_task.description) - # Ask the LLM to generate a structured plan - args = KernelArguments(input=instruction) - result = await self._agent.invoke_async(kernel_arguments=args) - response_content = result.value.strip() + # Log the input task for debugging + logging.info(f"Creating plan for task: '{input_task.description}'") + logging.info(f"Using available agents: {self._available_agents}") + + # Use the Azure AI Agent instead of direct function invocation + if self._azure_ai_agent is None: + # Initialize the agent if it's not already done + await self.async_init() + + if self._azure_ai_agent is None: + raise RuntimeError("Failed to initialize Azure AI Agent for planning") + + # Get response from the Azure AI Agent + # Based on the method signature, invoke takes only named arguments, not positional ones + logging.info(f"Invoking PlannerAgent with instruction length: {len(instruction)}") + + # Create kernel arguments + kernel_args = KernelArguments() + kernel_args["input"] = instruction + + # Call invoke with proper keyword arguments + response_content = "" + + # Use keyword arguments instead of positional arguments + # Based on the method signature, we need to pass 'arguments' and possibly 'kernel' + async_generator = self._azure_ai_agent.invoke(arguments=kernel_args) + + # Collect the response from the async generator + async for chunk in async_generator: + if chunk is not None: + response_content += str(chunk) + + # Debug the response + logging.info(f"Response content length: {len(response_content)}") + logging.debug(f"Response content first 500 chars: {response_content[:500]}") + # Log more of the response for debugging + logging.info(f"Full response: {response_content}") + + # Check if response is empty or whitespace + if not response_content or response_content.isspace(): + raise ValueError("Received empty response from Azure AI Agent") # Parse the JSON response using the structured output model try: # First try to parse using Pydantic model try: parsed_result = StructuredOutputPlan.parse_raw(response_content) - except Exception: + except Exception as e1: + logging.warning(f"Failed to parse direct JSON with Pydantic: {str(e1)}") + # If direct parsing fails, try to extract JSON first json_match = re.search(r'```json\s*(.*?)\s*```', response_content, re.DOTALL) if json_match: json_content = json_match.group(1) - parsed_result = StructuredOutputPlan.parse_raw(json_content) + logging.info(f"Found JSON content in markdown code block, length: {len(json_content)}") + try: + parsed_result = StructuredOutputPlan.parse_raw(json_content) + except Exception as e2: + logging.warning(f"Failed to parse extracted JSON with Pydantic: {str(e2)}") + # Try conventional JSON parsing as fallback + json_data = json.loads(json_content) + parsed_result = StructuredOutputPlan.parse_obj(json_data) else: - # Try parsing as regular JSON then convert to Pydantic model - json_data = json.loads(response_content) - parsed_result = StructuredOutputPlan.parse_obj(json_data) + # Try to extract JSON without code blocks - maybe it's embedded in text + # Look for patterns like { ... } that contain "initial_goal" and "steps" + json_pattern = r'\{.*?"initial_goal".*?"steps".*?\}' + alt_match = re.search(json_pattern, response_content, re.DOTALL) + + if alt_match: + potential_json = alt_match.group(0) + logging.info(f"Found potential JSON in text, length: {len(potential_json)}") + try: + json_data = json.loads(potential_json) + parsed_result = StructuredOutputPlan.parse_obj(json_data) + except Exception as e3: + logging.warning(f"Failed to parse potential JSON: {str(e3)}") + # If all extraction attempts fail, try parsing the whole response as JSON + json_data = json.loads(response_content) + parsed_result = StructuredOutputPlan.parse_obj(json_data) + else: + # If we can't find JSON patterns, create a fallback plan from the text + logging.info("Using fallback plan creation from text response") + return await self._create_fallback_plan_from_text(input_task, response_content) - # Extract plan details + # Extract plan details and log for debugging initial_goal = parsed_result.initial_goal steps_data = parsed_result.steps summary = parsed_result.summary_plan_and_steps human_clarification_request = parsed_result.human_clarification_request + # Log the steps and agent assignments for debugging + for i, step in enumerate(steps_data): + logging.info(f"Step {i+1} - Agent: {step.agent}, Action: {step.action}") + # Create the Plan instance plan = Plan( id=str(uuid.uuid4()), session_id=input_task.session_id, - user_id=input_task.user_id, + user_id=self._user_id, initial_goal=initial_goal, overall_status=PlanStatus.in_progress, summary=summary, @@ -277,7 +380,7 @@ async def _create_structured_plan(self, input_task: InputTask) -> Tuple[Plan, Li "Planner - Initial plan and added into the cosmos", { "session_id": input_task.session_id, - "user_id": input_task.user_id, + "user_id": self._user_id, "initial_goal": initial_goal, "overall_status": PlanStatus.in_progress, "source": "PlannerAgent", @@ -292,6 +395,10 @@ async def _create_structured_plan(self, input_task: InputTask) -> Tuple[Plan, Li action = step_data.action agent_name = step_data.agent + # Log any unusual agent assignments for debugging + if "onboard" in input_task.description.lower() and agent_name != "HrAgent": + logging.warning(f"UNUSUAL AGENT ASSIGNMENT: Task contains 'onboard' but assigned to {agent_name} instead of HrAgent") + # Validate agent name if agent_name not in self._available_agents: logging.warning(f"Invalid agent name: {agent_name}, defaulting to GenericAgent") @@ -302,6 +409,7 @@ async def _create_structured_plan(self, input_task: InputTask) -> Tuple[Plan, Li id=str(uuid.uuid4()), plan_id=plan.id, session_id=input_task.session_id, + user_id=self._user_id, action=action, agent=agent_name, status=StepStatus.planned, @@ -320,7 +428,7 @@ async def _create_structured_plan(self, input_task: InputTask) -> Tuple[Plan, Li "agent": agent_name, "status": StepStatus.planned, "session_id": input_task.session_id, - "user_id": input_task.user_id, + "user_id": self._user_id, "human_approval_status": HumanFeedbackStatus.requested, }, ) @@ -328,9 +436,11 @@ async def _create_structured_plan(self, input_task: InputTask) -> Tuple[Plan, Li return plan, steps except Exception as e: - # If JSON parsing fails, use regex to extract steps - logging.warning(f"Failed to parse JSON response: {e}. Falling back to text parsing.") - return await self._create_plan_from_text(input_task, response_content) + # If JSON parsing fails, log error and create error plan + logging.exception(f"Failed to parse JSON response: {e}") + logging.info(f"Raw response was: {response_content[:1000]}...") + # Try a fallback approach + return await self._create_fallback_plan_from_text(input_task, response_content) except Exception as e: logging.exception(f"Error creating structured plan: {e}") @@ -339,7 +449,7 @@ async def _create_structured_plan(self, input_task: InputTask) -> Tuple[Plan, Li f"Planner - Error in create_structured_plan: {e} into the cosmos", { "session_id": input_task.session_id, - "user_id": input_task.user_id, + "user_id": self._user_id, "initial_goal": "Error generating plan", "overall_status": PlanStatus.failed, "source": "PlannerAgent", @@ -351,7 +461,7 @@ async def _create_structured_plan(self, input_task: InputTask) -> Tuple[Plan, Li error_plan = Plan( id=str(uuid.uuid4()), session_id=input_task.session_id, - user_id=input_task.user_id, + user_id=self._user_id, initial_goal="Error generating plan", overall_status=PlanStatus.failed, summary=f"Error generating plan: {str(e)}" @@ -359,8 +469,8 @@ async def _create_structured_plan(self, input_task: InputTask) -> Tuple[Plan, Li await self._memory_store.add_plan(error_plan) return error_plan, [] - - async def _create_plan_from_text(self, input_task: InputTask, text_content: str) -> Tuple[Plan, List[Step]]: + + async def _create_fallback_plan_from_text(self, input_task: InputTask, text_content: str) -> Tuple[Plan, List[Step]]: """Create a plan from unstructured text when JSON parsing fails. Args: @@ -370,6 +480,8 @@ async def _create_plan_from_text(self, input_task: InputTask, text_content: str) Returns: Tuple containing the created plan and list of steps """ + logging.info("Creating fallback plan from text content") + # Extract goal from the text (first line or use input task description) goal_match = re.search(r"(?:Goal|Initial Goal|Plan):\s*(.+?)(?:\n|$)", text_content) goal = goal_match.group(1).strip() if goal_match else input_task.description @@ -378,9 +490,10 @@ async def _create_plan_from_text(self, input_task: InputTask, text_content: str) plan = Plan( id=str(uuid.uuid4()), session_id=input_task.session_id, - user_id=input_task.user_id, + user_id=self._user_id, initial_goal=goal, - overall_status=PlanStatus.in_progress + overall_status=PlanStatus.in_progress, + summary=f"Plan created from {input_task.description}" ) # Store the plan @@ -395,31 +508,57 @@ async def _create_plan_from_text(self, input_task: InputTask, text_content: str) step_pattern = re.compile(r'(\d+)[.:\)]\s*([^:]*?):\s*(.*?)(?=\d+[.:\)]|$)', re.DOTALL) matches = step_pattern.findall(text_content) + # If still no matches, look for bullet points or numbered lists + if not matches: + step_pattern = re.compile(r'[•\-*]\s*([^:]*?):\s*(.*?)(?=[•\-*]|$)', re.DOTALL) + bullet_matches = step_pattern.findall(text_content) + if bullet_matches: + # Convert bullet matches to our expected format (number, agent, action) + matches = [] + for i, (agent_text, action) in enumerate(bullet_matches, 1): + matches.append((str(i), agent_text.strip(), action.strip())) + steps = [] - for match in matches: - number = match[0].strip() - agent_text = match[1].strip() - action = match[2].strip() - - # Clean up agent name - agent = re.sub(r'\s+', '', agent_text) - if not agent or agent not in self._available_agents: - agent = "GenericAgent" # Default to GenericAgent if not recognized - - # Create and store the step - step = Step( + # If we found no steps at all, create at least one generic step + if not matches: + generic_step = Step( id=str(uuid.uuid4()), plan_id=plan.id, session_id=input_task.session_id, - action=action, - agent=agent, + user_id=self._user_id, + action=f"Process the request: {input_task.description}", + agent="GenericAgent", status=StepStatus.planned, human_approval_status=HumanFeedbackStatus.requested ) - - await self._memory_store.add_step(step) - steps.append(step) - + await self._memory_store.add_step(generic_step) + steps.append(generic_step) + else: + for match in matches: + number = match[0].strip() + agent_text = match[1].strip() + action = match[2].strip() + + # Clean up agent name + agent = re.sub(r'\s+', '', agent_text) + if not agent or agent not in self._available_agents: + agent = "GenericAgent" # Default to GenericAgent if not recognized + + # Create and store the step + step = Step( + id=str(uuid.uuid4()), + plan_id=plan.id, + session_id=input_task.session_id, + user_id=self._user_id, + action=action, + agent=agent, + status=StepStatus.planned, + human_approval_status=HumanFeedbackStatus.requested + ) + + await self._memory_store.add_step(step) + steps.append(step) + return plan, steps def _generate_instruction(self, objective: str) -> str: @@ -437,6 +576,9 @@ def _generate_instruction(self, objective: str) -> str: # Create list of available tools tools_str = "\n".join(self._agent_tools_list) if self._agent_tools_list else "Various specialized tools" + # Build the instruction, avoiding backslashes in f-string expressions + objective_part = f"Your objective is:\n{objective}" if objective else "When given an objective, analyze it and create a plan to accomplish it." + return f""" You are the Planner, an AI orchestrator that manages a group of AI agents to accomplish tasks. @@ -445,9 +587,8 @@ def _generate_instruction(self, objective: str) -> str: The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps. These actions are passed to the specific agent. Make sure the action contains all the information required for the agent to execute the task. - - Your objective is: - {objective} + + {objective_part} The agents you have access to are: {agents_str} @@ -455,6 +596,15 @@ def _generate_instruction(self, objective: str) -> str: These agents have access to the following functions: {tools_str} + IMPORTANT AGENT SELECTION GUIDANCE: + - HrAgent: ALWAYS use for ALL employee-related tasks like onboarding, hiring, benefits, payroll, training, employee records, ID cards, mentoring, background checks, etc. + - MarketingAgent: Use for marketing campaigns, branding, market research, content creation, social media, etc. + - ProcurementAgent: Use for purchasing, vendor management, supply chain, asset management, etc. + - ProductAgent: Use for product development, roadmaps, features, product feedback, etc. + - TechSupportAgent: Use for technical issues, software/hardware setup, troubleshooting, IT support, etc. + - GenericAgent: Use only for general knowledge tasks that don't fit other categories + - HumanAgent: Use only when human input is absolutely required and no other agent can handle the task + The first step of your plan should be to ask the user for any additional information required to progress the rest of steps planned. Only use the functions provided as part of your plan. If the task is not possible with the agents and tools provided, create a step with the agent of type Exception and mark the overall status as completed. @@ -463,31 +613,33 @@ def _generate_instruction(self, objective: str) -> str: If there is a single function call that can directly solve the task, only generate a plan with a single step. For example, if someone asks to be granted access to a database, generate a plan with only one step involving the grant_database_access function, with no additional steps. - When generating the action in the plan, frame the action as an instruction you are passing to the agent to execute. It should be a short, single sentence. Include the function to use. For example, "Set up an Office 365 Account for Jessica Smith. Function: set_up_office_365_account" - - Ensure the summary of the plan and the overall steps is less than 50 words. - - Identify any additional information that might be required to complete the task. Include this information in the plan in the human_clarification_request field of the plan. If it is not required, leave it as null. Do not include information that you are waiting for clarification on in the string of the action field, as this otherwise won't get updated. - You must prioritise using the provided functions to accomplish each step. First evaluate each and every function the agents have access too. Only if you cannot find a function needed to complete the task, and you have reviewed each and every function, and determined why each are not suitable, there are two options you can take when generating the plan. First evaluate whether the step could be handled by a typical large language model, without any specialised functions. For example, tasks such as "add 32 to 54", or "convert this SQL code to a python script", or "write a 200 word story about a fictional product strategy". + If a general Large Language Model CAN handle the step/required action, add a step to the plan with the action you believe would be needed, and add "EXCEPTION: No suitable function found. A generic LLM model is being used for this step." to the end of the action. Assign these steps to the GenericAgent. For example, if the task is to convert the following SQL into python code (SELECT * FROM employees;), and there is no function to convert SQL to python, write a step with the action "convert the following SQL into python code (SELECT * FROM employees;) EXCEPTION: No suitable function found. A generic LLM model is being used for this step." and assign it to the GenericAgent. + Alternatively, if a general Large Language Model CAN NOT handle the step/required action, add a step to the plan with the action you believe would be needed, and add "EXCEPTION: Human support required to do this step, no suitable function found." to the end of the action. Assign these steps to the HumanAgent. For example, if the task is to find the best way to get from A to B, and there is no function to calculate the best route, write a step with the action "Calculate the best route from A to B. EXCEPTION: Human support required, no suitable function found." and assign it to the HumanAgent. Limit the plan to 6 steps or less. Choose from {agents_str} ONLY for planning your steps. + When generating the action in the plan, frame the action as an instruction you are passing to the agent to execute. It should be a short, single sentence. Include the function to use. For example, "Set up an Office 365 Account for Jessica Smith. Function: set_up_office_365_account" + + Ensure the summary of the plan and the overall steps is less than 50 words. + + Identify any additional information that might be required to complete the task. Include this information in the plan in the human_clarification_request field of the plan. If it is not required, leave it as null. Do not include information that you are waiting for clarification on in the string of the action field, as this otherwise won't get updated. + Return your response as a JSON object with the following structure: - { + {{ "initial_goal": "The goal of the plan", "steps": [ - { + {{ "action": "Detailed description of the step action", "agent": "AgentName" - } + }} ], "summary_plan_and_steps": "Brief summary of the plan and steps", "human_clarification_request": "Any additional information needed from the human" - } + }} """ \ No newline at end of file diff --git a/src/backend/tests/test_planner_agent_integration.py b/src/backend/tests/test_planner_agent_integration.py index 9edb6f5c3..b7aa87087 100644 --- a/src/backend/tests/test_planner_agent_integration.py +++ b/src/backend/tests/test_planner_agent_integration.py @@ -350,6 +350,102 @@ async def test_create_structured_plan(self): print(f"\nCreated technical webinar plan with {len(steps)} steps") print(f"Steps assigned to: {', '.join(set(step.agent for step in steps))}") + async def test_hr_agent_selection(self): + """Test that the planner correctly assigns employee onboarding tasks to the HR agent.""" + # Initialize components + await self.initialize_planner_agent() + + # Create an onboarding task + input_task = InputTask( + session_id=self.session_id, + user_id=self.user_id, + description="Onboard a new employee, Jessica Smith." + ) + + print("\n\n==== TESTING HR AGENT SELECTION FOR ONBOARDING ====") + print(f"Task: '{input_task.description}'") + + # Call handle_input_task + args = KernelArguments(input_task_json=input_task.json()) + result = await self.planner_agent.handle_input_task(args) + + # Check that result contains a success message + self.assertIn("created successfully", result) + + # Verify plan was created in memory store + plan = await self.memory_store.get_plan_by_session(self.session_id) + self.assertIsNotNone(plan) + + # Verify steps were created + steps = await self.memory_store.get_steps_for_plan(plan.id, self.session_id) + self.assertGreater(len(steps), 0) + + # Log plan details + print(f"\nšŸ“‹ Created onboarding plan with ID: {plan.id}") + print(f"šŸŽÆ Goal: {plan.initial_goal}") + print(f"šŸ“ Summary: {plan.summary}") + + print("\nšŸ“ Steps:") + for i, step in enumerate(steps): + print(f" {i+1}. šŸ‘¤ Agent: {step.agent}, šŸ”§ Action: {step.action}") + + # Count agents used in the plan + agent_counts = {} + for step in steps: + agent_counts[step.agent] = agent_counts.get(step.agent, 0) + 1 + + print("\nšŸ“Š Agent Distribution:") + for agent, count in agent_counts.items(): + print(f" {agent}: {count} step(s)") + + # The critical test: verify that at least one step is assigned to HrAgent + hr_steps = [step for step in steps if step.agent == "HrAgent"] + has_hr_steps = len(hr_steps) > 0 + self.assertTrue(has_hr_steps, "No steps assigned to HrAgent for an onboarding task") + + if has_hr_steps: + print("\nāœ… TEST PASSED: HrAgent is used for onboarding task") + else: + print("\nāŒ TEST FAILED: HrAgent is not used for onboarding task") + + # Verify that no steps are incorrectly assigned to MarketingAgent + marketing_steps = [step for step in steps if step.agent == "MarketingAgent"] + no_marketing_steps = len(marketing_steps) == 0 + self.assertEqual(len(marketing_steps), 0, + f"Found {len(marketing_steps)} steps incorrectly assigned to MarketingAgent for an onboarding task") + + if no_marketing_steps: + print("āœ… TEST PASSED: No MarketingAgent steps for onboarding task") + else: + print(f"āŒ TEST FAILED: Found {len(marketing_steps)} steps incorrectly assigned to MarketingAgent") + + # Verify that the first step or a step containing "onboard" is assigned to HrAgent + first_agent = steps[0].agent if steps else None + onboarding_steps = [step for step in steps if "onboard" in step.action.lower()] + + if onboarding_steps: + onboard_correct = onboarding_steps[0].agent == "HrAgent" + self.assertEqual(onboarding_steps[0].agent, "HrAgent", + "The step containing 'onboard' was not assigned to HrAgent") + if onboard_correct: + print("āœ… TEST PASSED: Steps containing 'onboard' are assigned to HrAgent") + else: + print(f"āŒ TEST FAILED: Step containing 'onboard' assigned to {onboarding_steps[0].agent}, not HrAgent") + + # If no specific "onboard" step but we have steps, the first should likely be HrAgent + elif steps and "hr" not in first_agent.lower(): + first_step_correct = first_agent == "HrAgent" + self.assertEqual(first_agent, "HrAgent", + f"The first step was assigned to {first_agent}, not HrAgent") + if first_step_correct: + print("āœ… TEST PASSED: First step is assigned to HrAgent") + else: + print(f"āŒ TEST FAILED: First step assigned to {first_agent}, not HrAgent") + + print("\n==== END HR AGENT SELECTION TEST ====\n") + + return plan, steps + async def run_all_tests(self): """Run all tests in sequence.""" # Call setUp explicitly to ensure environment is properly initialized @@ -372,6 +468,10 @@ async def run_all_tests(self): print("\n===== Testing _create_structured_plan directly =====") await self.test_create_structured_plan() + # Test 5: Verify HR agent selection for onboarding tasks + print("\n===== Testing HR agent selection =====") + await self.test_hr_agent_selection() + print("\nAll tests completed successfully!") except Exception as e: diff --git a/src/backend/utils_kernel.py b/src/backend/utils_kernel.py index f5fb8b8c9..d6874414e 100644 --- a/src/backend/utils_kernel.py +++ b/src/backend/utils_kernel.py @@ -11,9 +11,9 @@ from semantic_kernel.functions import KernelFunction from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent -# Import agent factory and config +# Import agent factory and the new AppConfig from kernel_agents.agent_factory import AgentFactory -from config_kernel import Config +from app_config import config from context.cosmos_memory_kernel import CosmosMemoryContext from models.agent_types import AgentType @@ -42,8 +42,8 @@ async def initialize_runtime_and_context( if session_id is None: session_id = str(uuid.uuid4()) - # Create a kernel and memory store - kernel = Config.CreateKernel() + # Create a kernel and memory store using the AppConfig instance + kernel = config.create_kernel() memory_store = CosmosMemoryContext(session_id, user_id) return kernel, memory_store @@ -81,8 +81,8 @@ async def get_agents(session_id: str, user_id: str) -> Dict[str, Any]: AgentType.TECH_SUPPORT: "TechSupportAgent", AgentType.GENERIC: "GenericAgent", AgentType.HUMAN: "HumanAgent", - AgentType.PLANNER: "PlannerAgent", - AgentType.GROUP_CHAT_MANAGER: "GroupChatManager", + AgentType.PLANNER: "PlannerAgent", # Add PlannerAgent + AgentType.GROUP_CHAT_MANAGER: "GroupChatManager", # Add GroupChatManager } # Convert to the agent name dictionary format used by the rest of the app