diff --git a/src/backend/agents/base_agent.py b/src/backend/agents/base_agent.py index 59a6bc1ec..01dedf804 100644 --- a/src/backend/agents/base_agent.py +++ b/src/backend/agents/base_agent.py @@ -21,7 +21,7 @@ Step, StepStatus, ) -from models.messages_kernel import AgentType + from src.backend.event_utils import track_event_if_configured @@ -77,7 +77,7 @@ async def handle_action_request( AssistantMessage(content=message.action, source="GroupChatManager"), UserMessage( content=f"{step.human_feedback}. Now make the function call", - source=AgentType.HUMAN.value, + source="HumanAgent", ), ] ) diff --git a/src/backend/agents/group_chat_manager.py b/src/backend/agents/group_chat_manager.py index 13c67f115..32d7f2386 100644 --- a/src/backend/agents/group_chat_manager.py +++ b/src/backend/agents/group_chat_manager.py @@ -21,7 +21,7 @@ Step, StepStatus, ) -from models.messages_kernel import AgentType + from src.backend.event_utils import track_event_if_configured @@ -57,7 +57,7 @@ async def handle_input_task( user_id=self._user_id, plan_id="", content=f"{message.description}", - source=AgentType.HUMAN.value, + source="HumanAgent", step_id="", ) ) @@ -68,7 +68,7 @@ async def handle_input_task( "session_id": message.session_id, "user_id": self._user_id, "content": message.description, - "source": AgentType.HUMAN.value, + "source": "HumanAgent", }, ) diff --git a/src/backend/agents/human.py b/src/backend/agents/human.py index d43e9f50a..5d1a72d81 100644 --- a/src/backend/agents/human.py +++ b/src/backend/agents/human.py @@ -13,7 +13,6 @@ Step, ) from src.backend.event_utils import track_event_if_configured -from models.messages_kernel import AgentType @default_subscription @@ -53,7 +52,7 @@ async def handle_step_feedback( user_id=self.user_id, plan_id=step.plan_id, content=f"Received feedback for step: {step.action}", - source=AgentType.HUMAN.value, + source="HumanAgent", step_id=message.step_id, ) ) @@ -65,7 +64,7 @@ async def handle_step_feedback( "user_id": self.user_id, "plan_id": step.plan_id, "content": f"Received feedback for step: {step.action}", - "source": AgentType.HUMAN.value, + "source": "HumanAgent", "step_id": message.step_id, }, ) diff --git a/src/backend/agents/planner.py b/src/backend/agents/planner.py index 940397a12..e7975be3f 100644 --- a/src/backend/agents/planner.py +++ b/src/backend/agents/planner.py @@ -25,7 +25,7 @@ StepStatus, HumanFeedbackStatus, ) -from models.messages_kernel import AgentType + from src.backend.event_utils import track_event_if_configured @@ -133,7 +133,7 @@ async def handle_plan_clarification( user_id=self._user_id, plan_id="", content=f"{message.human_clarification}", - source=AgentType.HUMAN.value, + source="HumanAgent", step_id="", ) ) @@ -144,7 +144,7 @@ async def handle_plan_clarification( "session_id": message.session_id, "user_id": self._user_id, "content": f"{message.human_clarification}", - "source": AgentType.HUMAN.value, + "source": "HumanAgent", }, ) diff --git a/src/backend/app_config.py b/src/backend/app_config.py index f97fb9a62..e56961e73 100644 --- a/src/backend/app_config.py +++ b/src/backend/app_config.py @@ -9,110 +9,126 @@ from semantic_kernel.kernel import Kernel from semantic_kernel.contents import ChatHistory from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent + # Load environment variables from .env file load_dotenv() + class AppConfig: """Application configuration class that loads settings from environment variables.""" - + def __init__(self): """Initialize the application configuration with environment variables.""" # Azure authentication settings self.AZURE_TENANT_ID = self._get_optional("AZURE_TENANT_ID") self.AZURE_CLIENT_ID = self._get_optional("AZURE_CLIENT_ID") self.AZURE_CLIENT_SECRET = self._get_optional("AZURE_CLIENT_SECRET") - + # CosmosDB settings self.COSMOSDB_ENDPOINT = self._get_optional("COSMOSDB_ENDPOINT") self.COSMOSDB_DATABASE = self._get_optional("COSMOSDB_DATABASE") self.COSMOSDB_CONTAINER = self._get_optional("COSMOSDB_CONTAINER") - + # Azure OpenAI settings - self.AZURE_OPENAI_DEPLOYMENT_NAME = self._get_required("AZURE_OPENAI_DEPLOYMENT_NAME", "gpt-4o") - self.AZURE_OPENAI_API_VERSION = self._get_required("AZURE_OPENAI_API_VERSION", "2024-11-20") + self.AZURE_OPENAI_DEPLOYMENT_NAME = self._get_required( + "AZURE_OPENAI_DEPLOYMENT_NAME", "gpt-4o" + ) + self.AZURE_OPENAI_API_VERSION = self._get_required( + "AZURE_OPENAI_API_VERSION", "2024-11-20" + ) self.AZURE_OPENAI_ENDPOINT = self._get_required("AZURE_OPENAI_ENDPOINT") - self.AZURE_OPENAI_SCOPES = [f"{self._get_optional('AZURE_OPENAI_SCOPE', 'https://cognitiveservices.azure.com/.default')}"] - + self.AZURE_OPENAI_SCOPES = [ + f"{self._get_optional('AZURE_OPENAI_SCOPE', 'https://cognitiveservices.azure.com/.default')}" + ] + # Frontend settings - self.FRONTEND_SITE_NAME = self._get_optional("FRONTEND_SITE_NAME", "http://127.0.0.1:3000") - + self.FRONTEND_SITE_NAME = self._get_optional( + "FRONTEND_SITE_NAME", "http://127.0.0.1:3000" + ) + # Azure AI settings self.AZURE_AI_SUBSCRIPTION_ID = self._get_required("AZURE_AI_SUBSCRIPTION_ID") self.AZURE_AI_RESOURCE_GROUP = self._get_required("AZURE_AI_RESOURCE_GROUP") self.AZURE_AI_PROJECT_NAME = self._get_required("AZURE_AI_PROJECT_NAME") - self.AZURE_AI_AGENT_PROJECT_CONNECTION_STRING = self._get_required("AZURE_AI_AGENT_PROJECT_CONNECTION_STRING") - + self.AZURE_AI_AGENT_PROJECT_CONNECTION_STRING = self._get_required( + "AZURE_AI_AGENT_PROJECT_CONNECTION_STRING" + ) + # Cached clients and resources self._azure_credentials = None self._cosmos_client = None self._cosmos_database = None self._ai_project_client = None - + def _get_required(self, name: str, default: Optional[str] = None) -> str: """Get a required configuration value from environment variables. - + Args: name: The name of the environment variable default: Optional default value if not found - + Returns: The value of the environment variable or default if provided - + Raises: ValueError: If the environment variable is not found and no default is provided """ if name in os.environ: return os.environ[name] if default is not None: - logging.warning("Environment variable %s not found, using default value", name) + logging.warning( + "Environment variable %s not found, using default value", name + ) return default - raise ValueError(f"Environment variable {name} not found and no default provided") - + raise ValueError( + f"Environment variable {name} not found and no default provided" + ) + def _get_optional(self, name: str, default: str = "") -> str: """Get an optional configuration value from environment variables. - + Args: name: The name of the environment variable default: Default value if not found (default: "") - + Returns: The value of the environment variable or the default value """ if name in os.environ: return os.environ[name] return default - + def _get_bool(self, name: str) -> bool: """Get a boolean configuration value from environment variables. - + Args: name: The name of the environment variable - + Returns: True if the environment variable exists and is set to 'true' or '1', False otherwise """ return name in os.environ and os.environ[name].lower() in ["true", "1"] - + def get_azure_credentials(self): """Get Azure credentials using DefaultAzureCredential. - + Returns: DefaultAzureCredential instance for Azure authentication """ # Cache the credentials object if self._azure_credentials is not None: return self._azure_credentials - + try: self._azure_credentials = DefaultAzureCredential() return self._azure_credentials except Exception as exc: logging.warning("Failed to create DefaultAzureCredential: %s", exc) return None - + def get_cosmos_database_client(self): """Get a Cosmos DB client for the configured database. - + Returns: A Cosmos DB database client """ @@ -129,12 +145,15 @@ def get_cosmos_database_client(self): return self._cosmos_database except Exception as exc: - logging.error("Failed to create CosmosDB client: %s. CosmosDB is required for this application.", exc) + logging.error( + "Failed to create CosmosDB client: %s. CosmosDB is required for this application.", + exc, + ) raise - + def create_kernel(self): """Creates a new Semantic Kernel instance. - + Returns: A new Semantic Kernel instance """ @@ -142,114 +161,140 @@ def create_kernel(self): # The agents will be created using Azure AI Agent Project pattern instead kernel = Kernel() return kernel - + def get_ai_project_client(self): """Create and return an AIProjectClient for Azure AI Foundry using from_connection_string. - + Returns: An AIProjectClient instance """ if self._ai_project_client is not None: return self._ai_project_client - + try: credential = self.get_azure_credentials() if credential is None: - raise RuntimeError("Unable to acquire Azure credentials; ensure DefaultAzureCredential is configured") - + raise RuntimeError( + "Unable to acquire Azure credentials; ensure DefaultAzureCredential is configured" + ) + connection_string = self.AZURE_AI_AGENT_PROJECT_CONNECTION_STRING self._ai_project_client = AIProjectClient.from_connection_string( - credential=credential, - conn_str=connection_string + credential=credential, conn_str=connection_string ) logging.info("Successfully created AIProjectClient using connection string") return self._ai_project_client except Exception as exc: logging.error("Failed to create AIProjectClient: %s", exc) raise - + async def create_azure_ai_agent( self, - kernel: Kernel, - agent_name: str, - instructions: str, - agent_type: str = "assistant", + kernel: Kernel, + agent_name: str, + instructions: str, + agent_type: str = "assistant", tools=None, tool_resources=None, response_format=None, - temperature: float = 0.0 + temperature: float = 0.0, ): """ Creates a new Azure AI Agent with the specified name and instructions using AIProjectClient. - + If an agent with the given name (assistant_id) already exists, it tries to retrieve it first. + Args: kernel: The Semantic Kernel instance - agent_name: The name of the agent + agent_name: The name of the agent (will be used as assistant_id) instructions: The system message / instructions for the agent agent_type: The type of agent (defaults to "assistant") tools: Optional tool definitions for the agent tool_resources: Optional tool resources required by the tools response_format: Optional response format to control structured output temperature: The temperature setting for the agent (defaults to 0.0) - + Returns: A new AzureAIAgent instance """ try: # Get the AIProjectClient project_client = self.get_ai_project_client() - + + # First try to get an existing agent with this name as assistant_id + try: + logging.info(f"Trying to retrieve existing agent with ID: {agent_name}") + existing_definition = await project_client.agents.get_agent(agent_name) + logging.info(f"Found existing agent with ID: {agent_name}") + + # Create the agent instance directly with project_client and existing definition + agent = AzureAIAgent( + client=project_client, definition=existing_definition, kernel=kernel + ) + + logging.info( + f"Successfully loaded existing Azure AI Agent for {agent_name}" + ) + return agent + except Exception as e: + # The Azure AI Projects SDK throws an exception when the agent doesn't exist + # (not returning None), so we catch it and proceed to create a new agent + if "ResourceNotFound" in str(e) or "404" in str(e): + logging.info( + f"Agent with ID {agent_name} not found. Will create a new one." + ) + else: + # Log unexpected errors but still try to create a new agent + logging.warning( + f"Unexpected error while retrieving agent {agent_name}: {str(e)}. Attempting to create new agent." + ) + # Tool handling: We need to distinguish between our SK functions and # the tool definitions needed by project_client.agents.create_agent tool_definitions = None kernel_functions = [] - + # If tools are provided and they are SK KernelFunctions, we need to handle them differently # than if they are already tool definitions expected by AIProjectClient if tools: # Check if tools are SK KernelFunctions - if all(hasattr(tool, 'name') and hasattr(tool, 'invoke') for tool in tools): + if all( + hasattr(tool, "name") and hasattr(tool, "invoke") for tool in tools + ): # Store the kernel functions to register with the agent later kernel_functions = tools # For now, we don't extract tool definitions from kernel functions # This would require additional code to convert SK functions to AI Project tool definitions - logging.warning("Kernel functions provided as tools will be registered with the agent after creation") + logging.warning( + "Kernel functions provided as tools will be registered with the agent after creation" + ) else: # Assume these are already proper tool definitions for create_agent tool_definitions = tools - - # Create the agent using the project client - if response_format is not None: - logging.info("Response format provided: %s", response_format) - + logging.info(f"Creating new agent with ID: {agent_name}") + + # Create the agent using the project client with the agent_name as both name and assistantId agent_definition = await project_client.agents.create_agent( model=self.AZURE_OPENAI_DEPLOYMENT_NAME, name=agent_name, instructions=instructions, - tools=tool_definitions, # Only pass tool_definitions, not kernel functions + tools=tool_definitions, tool_resources=tool_resources, temperature=temperature, - response_format=response_format + response_format=response_format, ) - + # Create the agent instance directly with project_client and definition - agent_kwargs = { - "client": project_client, - "definition": agent_definition, - "kernel": kernel - } - - - # For other agents, create using standard AzureAIAgent - agent = AzureAIAgent(**agent_kwargs) - + agent = AzureAIAgent( + client=project_client, definition=agent_definition, kernel=kernel + ) + # Register the kernel functions with the agent if any were provided if kernel_functions: for function in kernel_functions: - if hasattr(agent, 'add_function'): + if hasattr(agent, "add_function"): agent.add_function(function) - + return agent except Exception as exc: logging.error("Failed to create Azure AI Agent: %s", exc) @@ -257,4 +302,4 @@ async def create_azure_ai_agent( # Create a global instance of AppConfig -config = AppConfig() \ No newline at end of file +config = AppConfig() diff --git a/src/backend/app_kernel.py b/src/backend/app_kernel.py index 145b8d552..b8c88fb49 100644 --- a/src/backend/app_kernel.py +++ b/src/backend/app_kernel.py @@ -16,6 +16,7 @@ # Semantic Kernel imports import semantic_kernel as sk + # Updated import for KernelArguments from semantic_kernel.functions.kernel_arguments import KernelArguments @@ -108,42 +109,47 @@ async def input_task_endpoint(input_task: InputTask, request: Request): user_id = authenticated_user["user_principal_id"] if not user_id: - track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"}) + track_event_if_configured( + "UserIdNotFound", {"status_code": 400, "detail": "no user"} + ) raise HTTPException(status_code=400, detail="no user") - + # Generate session ID if not provided if not input_task.session_id: input_task.session_id = str(uuid.uuid4()) - + try: # Create all agents instead of just the planner agent # This ensures other agents are created first and the planner has access to them - kernel, memory_store = await initialize_runtime_and_context(input_task.session_id, user_id) + kernel, memory_store = await initialize_runtime_and_context( + input_task.session_id, user_id + ) agents = await AgentFactory.create_all_agents( - session_id=input_task.session_id, - user_id=user_id + session_id=input_task.session_id, user_id=user_id ) - - # Get the planner agent from the created agents - planner_agent = agents[AgentType.PLANNER] - + + group_chat_manager = agents[AgentType.GROUP_CHAT_MANAGER.value] + # Convert input task to JSON for the kernel function, add user_id here - input_task_data = input_task.model_dump() - input_task_data["user_id"] = user_id - input_task_json = json.dumps(input_task_data) - + logging.info(f"Input task: {input_task}") # Use the planner to handle the task - result = await planner_agent.handle_input_task( - input_task - ) - + result = await group_chat_manager.handle_input_task(input_task) + print(f"Result: {result}") # Get plan from memory store plan = await memory_store.get_plan_by_session(input_task.session_id) - print(f"Plan: {plan}") - + if not plan: # If the plan is not found, raise an error + track_event_if_configured( + "PlanNotFound", + { + "status": "Plan not found", + "session_id": input_task.session_id, + "description": input_task.description, + }, + ) + raise HTTPException(status_code=404, detail="Plan not found") # Log custom event for successful input task processing track_event_if_configured( "InputTaskProcessed", @@ -161,23 +167,22 @@ async def input_task_endpoint(input_task: InputTask, request: Request): "plan_id": plan.id, "description": input_task.description, } - + except Exception as e: logging.exception(f"Error handling input task: {e}") track_event_if_configured( - "InputTaskError", + "InputTaskError", { "session_id": input_task.session_id, "description": input_task.description, "error": str(e), - } + }, ) raise HTTPException(status_code=400, detail="Error creating plan") @app.post("/api/human_feedback") async def human_feedback_endpoint(human_feedback: HumanFeedback, request: Request): - """ Receive human feedback on a step. @@ -235,22 +240,23 @@ async def human_feedback_endpoint(human_feedback: HumanFeedback, request: Reques authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] if not user_id: - track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"}) + track_event_if_configured( + "UserIdNotFound", {"status_code": 400, "detail": "no user"} + ) raise HTTPException(status_code=400, detail="no user") - - # Get the agents for this session - agents = await get_agents(human_feedback.session_id, user_id) - + + kernel, memory_store = await initialize_runtime_and_context( + human_feedback.session_id, user_id + ) + agents = await AgentFactory.create_all_agents( + session_id=human_feedback.session_id, user_id=user_id + ) + # Send the feedback to the human agent human_agent = agents[AgentType.HUMAN.value] - - # Convert feedback to JSON for the kernel function - human_feedback_json = human_feedback.json() - + # Use the human agent to handle the feedback - await human_agent.handle_human_feedback( - KernelArguments(human_feedback_json=human_feedback_json) - ) + await human_agent.handle_human_feedback(human_feedback=human_feedback) track_event_if_configured( "Completed Feedback received", @@ -318,21 +324,24 @@ async def human_clarification_endpoint( authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] if not user_id: - track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"}) + track_event_if_configured( + "UserIdNotFound", {"status_code": 400, "detail": "no user"} + ) raise HTTPException(status_code=400, detail="no user") - - # Get the agents for this session - agents = await get_agents(human_clarification.session_id, user_id) - - # Send the clarification to the planner agent - planner_agent = agents[AgentType.PLANNER.value] - - # Convert clarification to JSON for proper processing - human_clarification_json = human_clarification.json() - - # Use the planner to handle the clarification - await planner_agent.handle_human_clarification( - KernelArguments(human_clarification_json=human_clarification_json) + + kernel, memory_store = await initialize_runtime_and_context( + human_clarification.session_id, user_id + ) + agents = await AgentFactory.create_all_agents( + session_id=human_clarification.session_id, user_id=user_id + ) + + # Send the feedback to the human agent + human_agent = agents[AgentType.HUMAN.value] + + # Use the human agent to handle the feedback + await human_agent.handle_human_clarification( + human_clarification=human_clarification ) track_event_if_configured( @@ -406,32 +415,24 @@ async def approve_step_endpoint( authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] if not user_id: - track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"}) + track_event_if_configured( + "UserIdNotFound", {"status_code": 400, "detail": "no user"} + ) raise HTTPException(status_code=400, detail="no user") - + # Get the agents for this session - agents = await get_agents(human_feedback.session_id, user_id) - - # Send the approval to the group chat manager - group_chat_manager = agents[AgentType.GROUP_CHAT_MANAGER.value] - - # Handle the approval - human_feedback_json = human_feedback.json() - - # First process with HumanAgent to update step status - human_agent = agents[AgentType.HUMAN.value] - await human_agent.handle_human_feedback( - KernelArguments(human_feedback_json=human_feedback_json) + kernel, memory_store = await initialize_runtime_and_context( + human_feedback.session_id, user_id ) - - # Then execute the next step with GroupChatManager - await group_chat_manager.execute_next_step( - KernelArguments( - session_id=human_feedback.session_id, - plan_id=human_feedback.plan_id - ) + agents = await AgentFactory.create_all_agents( + session_id=human_feedback.session_id, user_id=user_id ) + # Send the approval to the group chat manager + group_chat_manager = agents[AgentType.GROUP_CHAT_MANAGER.value] + + await group_chat_manager.handle_human_feedback(human_feedback) + # Return a status message if human_feedback.step_id: track_event_if_configured( @@ -517,11 +518,15 @@ async def get_plans( authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] if not user_id: - track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"}) + track_event_if_configured( + "UserIdNotFound", {"status_code": 400, "detail": "no user"} + ) raise HTTPException(status_code=400, detail="no user") # Initialize memory context - memory_store = CosmosMemoryContext(session_id or "", user_id) + kernel, memory_store = await initialize_runtime_and_context( + session_id or "", user_id + ) if session_id: plan = await memory_store.get_plan_by_session(session_id=session_id) @@ -607,11 +612,13 @@ async def get_steps_by_plan(plan_id: str, request: Request) -> List[Step]: authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] if not user_id: - track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"}) + track_event_if_configured( + "UserIdNotFound", {"status_code": 400, "detail": "no user"} + ) raise HTTPException(status_code=400, detail="no user") - + # Initialize memory context - memory_store = CosmosMemoryContext("", user_id) + kernel, memory_store = await initialize_runtime_and_context("", user_id) steps = await memory_store.get_steps_for_plan(plan_id=plan_id) return steps @@ -671,11 +678,15 @@ async def get_agent_messages(session_id: str, request: Request) -> List[AgentMes authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] if not user_id: - track_event_if_configured("UserIdNotFound", {"status_code": 400, "detail": "no user"}) + track_event_if_configured( + "UserIdNotFound", {"status_code": 400, "detail": "no user"} + ) raise HTTPException(status_code=400, detail="no user") - + # Initialize memory context - memory_store = CosmosMemoryContext(session_id, user_id) + kernel, memory_store = await initialize_runtime_and_context( + session_id or "", user_id + ) agent_messages = await memory_store.get_data_by_type("agent_message") return agent_messages @@ -704,10 +715,10 @@ async def delete_all_messages(request: Request) -> Dict[str, str]: user_id = authenticated_user["user_principal_id"] if not user_id: raise HTTPException(status_code=400, detail="no user") - + # Initialize memory context - memory_store = CosmosMemoryContext(session_id="", user_id=user_id) - + kernel, memory_store = await initialize_runtime_and_context("", user_id) + logging.info("Deleting all plans") await memory_store.delete_all_items("plan") logging.info("Deleting all sessions") @@ -716,10 +727,10 @@ async def delete_all_messages(request: Request) -> Dict[str, str]: await memory_store.delete_all_items("step") logging.info("Deleting all agent_messages") await memory_store.delete_all_items("agent_message") - + # Clear the agent factory cache AgentFactory.clear_cache() - + return {"status": "All messages deleted"} @@ -765,9 +776,9 @@ async def get_all_messages(request: Request): user_id = authenticated_user["user_principal_id"] if not user_id: raise HTTPException(status_code=400, detail="no user") - + # Initialize memory context - memory_store = CosmosMemoryContext(session_id="", user_id=user_id) + kernel, memory_store = await initialize_runtime_and_context("", user_id) message_list = await memory_store.get_all_items() return message_list @@ -804,9 +815,8 @@ async def get_agent_tools(): return [] - # Run the app if __name__ == "__main__": import uvicorn - uvicorn.run("app_kernel:app", host="127.0.0.1", port=8000, reload=True) \ No newline at end of file + uvicorn.run("app_kernel:app", host="127.0.0.1", port=8000, reload=True) diff --git a/src/backend/config_kernel.py b/src/backend/config_kernel.py index 107dbee5c..31ee1ea9a 100644 --- a/src/backend/config_kernel.py +++ b/src/backend/config_kernel.py @@ -3,6 +3,7 @@ import logging import semantic_kernel as sk from semantic_kernel.kernel import Kernel + # Updated imports for compatibility try: # Try newer structure @@ -15,6 +16,7 @@ # Import AppConfig from app_config from app_config import config + # This file is left as a lightweight wrapper around AppConfig for backward compatibility # All configuration is now handled by AppConfig in app_config.py class Config: @@ -39,7 +41,9 @@ class Config: AZURE_AI_SUBSCRIPTION_ID = config.AZURE_AI_SUBSCRIPTION_ID AZURE_AI_RESOURCE_GROUP = config.AZURE_AI_RESOURCE_GROUP AZURE_AI_PROJECT_NAME = config.AZURE_AI_PROJECT_NAME - AZURE_AI_AGENT_PROJECT_CONNECTION_STRING = config.AZURE_AI_AGENT_PROJECT_CONNECTION_STRING + AZURE_AI_AGENT_PROJECT_CONNECTION_STRING = ( + config.AZURE_AI_AGENT_PROJECT_CONNECTION_STRING + ) @staticmethod def GetAzureCredentials(): @@ -55,7 +59,7 @@ def GetCosmosDatabaseClient(): def CreateKernel(): """Creates a new Semantic Kernel instance using the AppConfig implementation.""" return config.create_kernel() - + @staticmethod def GetAIProjectClient(): """Get an AIProjectClient using the AppConfig implementation.""" diff --git a/src/backend/kernel_agents/agent_base.py b/src/backend/kernel_agents/agent_base.py index 14c0cd438..4547c4eb7 100644 --- a/src/backend/kernel_agents/agent_base.py +++ b/src/backend/kernel_agents/agent_base.py @@ -17,6 +17,7 @@ # Fall back to older structure for compatibility class ChatMessageContent: """Compatibility class for older SK versions.""" + def __init__(self, role="", content="", name=None): self.role = role self.content = content @@ -24,9 +25,11 @@ def __init__(self, role="", content="", name=None): class ChatHistory: """Compatibility class for older SK versions.""" + def __init__(self): self.messages = [] + from context.cosmos_memory_kernel import CosmosMemoryContext from models.messages_kernel import ( ActionRequest, @@ -35,6 +38,7 @@ def __init__(self): Step, StepStatus, ) + # Import the new AppConfig instance from app_config import config from event_utils import track_event_if_configured @@ -42,6 +46,7 @@ def __init__(self): # Default formatting instructions used across agents DEFAULT_FORMATTING_INSTRUCTIONS = "Instructions: returning the output of this function call verbatim to the user in markdown. Then write AGENT SUMMARY: and then include a summary of what you did." + class BaseAgent(AzureAIAgent): """BaseAgent implemented using Semantic Kernel with Azure AI Agent support.""" @@ -59,7 +64,7 @@ def __init__( definition=None, ): """Initialize the base agent. - + Args: agent_name: The name of the agent kernel: The semantic kernel instance @@ -78,24 +83,26 @@ def __init__( # If system_message isn't provided, try to get it from config if not system_message: config = self.load_tools_config(agent_type) - system_message = config.get("system_message", self._default_system_message(agent_name)) + system_message = config.get( + "system_message", self._default_system_message(agent_name) + ) else: tools = tools or [] system_message = system_message or self._default_system_message(agent_name) - + # Call AzureAIAgent constructor with required client and definition super().__init__( kernel=kernel, deployment_name=None, # Set as needed - endpoint=None, # Set as needed - api_version=None, # Set as needed - token=None, # Set as needed + endpoint=None, # Set as needed + api_version=None, # Set as needed + token=None, # Set as needed agent_name=agent_name, system_prompt=system_message, client=client, - definition=definition + definition=definition, ) - + # Store instance variables self._agent_name = agent_name self._kernel = kernel @@ -106,48 +113,47 @@ def __init__( self._system_message = system_message self._chat_history = [{"role": "system", "content": self._system_message}] self._agent = None # Will be initialized in async_init - + # Required properties for AgentGroupChat compatibility self.name = agent_name # This is crucial for AgentGroupChat to identify agents - - + # Register the handler functions self._register_functions() def _default_system_message(self, agent_name=None) -> str: - name = agent_name or getattr(self, '_agent_name', 'Agent') + name = agent_name or getattr(self, "_agent_name", "Agent") return f"You are an AI assistant named {name}. Help the user by providing accurate and helpful information." async def async_init(self): """Asynchronously initialize the agent after construction. - + This method must be called after creating the agent to complete initialization. """ # Create Azure AI Agent or fallback self._agent = await config.create_azure_ai_agent( kernel=self._kernel, agent_name=self._agent_name, - instructions=self._system_message + instructions=self._system_message, ) # Tools are registered with the kernel via get_tools_from_config return self async def invoke_async(self, *args, **kwargs): """Invoke this agent asynchronously. - + This method is required for compatibility with AgentGroupChat. - + Args: *args: Positional arguments **kwargs: Keyword arguments - + Returns: The agent's response """ # Ensure agent is initialized if self._agent is None: await self.async_init() - + # Get the text input from args or kwargs text = None if args and isinstance(args[0], str): @@ -156,23 +162,21 @@ async def invoke_async(self, *args, **kwargs): text = kwargs["text"] elif "arguments" in kwargs and hasattr(kwargs["arguments"], "get"): text = kwargs["arguments"].get("text") or kwargs["arguments"].get("input") - + if not text: settings = kwargs.get("settings", {}) if isinstance(settings, dict) and "input" in settings: text = settings["input"] - + # If text is still not found, create a default message if not text: text = "Hello, please assist with a task." - + # Use the text to invoke the agent try: logging.info(f"Invoking {self._agent_name} with text: {text[:100]}...") response = await self._agent.invoke( - self._kernel, - text, - settings=kwargs.get("settings", {}) + self._kernel, text, settings=kwargs.get("settings", {}) ) return response except Exception as e: @@ -184,70 +188,69 @@ def _register_functions(self): # Use the kernel function decorator approach instead of from_native_method # which isn't available in SK 1.28.0 function_name = "handle_action_request" - + # Define the function using the kernel function decorator @kernel_function( description="Handle an action request from another agent or the system", - name=function_name + name=function_name, ) async def handle_action_request_wrapper(*args, **kwargs): # Forward to the instance method return await self.handle_action_request(*args, **kwargs) - + # Wrap the decorated function into a KernelFunction and register under this agent's plugin kernel_func = KernelFunction.from_method(handle_action_request_wrapper) # Use agent name as plugin for handler self._kernel.add_function(self._agent_name, kernel_func) - + # Required method for AgentGroupChat compatibility - async def send_message_async(self, message_content: ChatMessageContent, chat_history: ChatHistory): + async def send_message_async( + self, message_content: ChatMessageContent, chat_history: ChatHistory + ): """Send a message to the agent asynchronously, adding it to chat history. - + Args: message_content: The content of the message chat_history: The chat history - + Returns: None """ # Convert message to format expected by the agent if hasattr(message_content, "role") and hasattr(message_content, "content"): - self._chat_history.append({ - "role": message_content.role, - "content": message_content.content - }) - + self._chat_history.append( + {"role": message_content.role, "content": message_content.content} + ) + # If chat history is provided, update our internal history if chat_history and hasattr(chat_history, "messages"): # Update with the latest messages from chat history - for msg in chat_history.messages[-5:]: # Only use last 5 messages to avoid history getting too long + for msg in chat_history.messages[ + -5: + ]: # Only use last 5 messages to avoid history getting too long if msg not in self._chat_history: - self._chat_history.append({ - "role": msg.role, - "content": msg.content - }) - + self._chat_history.append( + {"role": msg.role, "content": msg.content} + ) + # No need to return anything as we're just updating state return None - async def handle_action_request(self, action_request_json: str) -> str: + async def handle_action_request(self, action_request: ActionRequest) -> str: """Handle an action request from another agent or the system. - + Args: action_request_json: The action request as a JSON string - + Returns: A JSON string containing the action response """ - # Parse the action request - action_request_dict = json.loads(action_request_json) - action_request = ActionRequest(**action_request_dict) - + # Get the step from memory step: Step = await self._memory_store.get_step( action_request.step_id, action_request.session_id ) - + if not step: # Create error response if step not found response = ActionResponse( @@ -256,29 +259,45 @@ async def handle_action_request(self, action_request_json: str) -> str: message="Step not found in memory.", ) return response.json() - + # Add messages to chat history for context # This gives the agent visibility of the conversation history - self._chat_history.extend([ - {"role": "assistant", "content": action_request.action}, - {"role": "user", "content": f"{step.human_feedback}. Now make the function call"} - ]) - + self._chat_history.extend( + [ + {"role": "assistant", "content": action_request.action}, + { + "role": "user", + "content": f"{step.human_feedback}. Now make the function call", + }, + ] + ) + try: # Use the agent to process the action chat_history = self._chat_history.copy() - + # Call the agent to handle the action - agent_response = await self._agent.invoke(self._kernel, f"{action_request.action}\n\nPlease perform this action") - result = str(agent_response) - + async_generator = self._agent.invoke( + self._kernel, f"{action_request.action}\n\nPlease perform this action" + ) + + response_content = "" + + # Collect the response from the async generator + async for chunk in async_generator: + if chunk is not None: + response_content += str(chunk) + + logging.info(f"Response content length: {len(response_content)}") + logging.info(f"Response content: {response_content}") + # Store agent message in cosmos memory await self._memory_store.add_item( AgentMessage( session_id=action_request.session_id, user_id=self._user_id, plan_id=action_request.plan_id, - content=f"{result}", + content=f"{response_content}", source=self._agent_name, step_id=action_request.step_id, ) @@ -291,7 +310,7 @@ async def handle_action_request(self, action_request_json: str) -> str: "session_id": action_request.session_id, "user_id": self._user_id, "plan_id": action_request.plan_id, - "content": f"{result}", + "content": f"{response_content}", "source": self._agent_name, "step_id": action_request.step_id, }, @@ -299,7 +318,7 @@ async def handle_action_request(self, action_request_json: str) -> str: except Exception as e: logging.exception(f"Error during agent execution: {e}") - + # Track error in telemetry track_event_if_configured( "Base agent - Error during agent execution, captured into the cosmos", @@ -312,7 +331,7 @@ async def handle_action_request(self, action_request_json: str) -> str: "step_id": action_request.step_id, }, ) - + # Return an error response response = ActionResponse( step_id=action_request.step_id, @@ -322,12 +341,12 @@ async def handle_action_request(self, action_request_json: str) -> str: status=StepStatus.failed, ) return response.json() - - logging.info(f"Task completed: {result}") + + logging.info(f"Task completed: {response_content}") # Update step status step.status = StepStatus.completed - step.agent_reply = result + step.agent_reply = response_content await self._memory_store.update_step(step) # Track step completion in telemetry @@ -336,10 +355,10 @@ async def handle_action_request(self, action_request_json: str) -> str: { "status": StepStatus.completed, "session_id": action_request.session_id, - "agent_reply": f"{result}", + "agent_reply": f"{response_content}", "user_id": self._user_id, "plan_id": action_request.plan_id, - "content": f"{result}", + "content": f"{response_content}", "source": self._agent_name, "step_id": action_request.step_id, }, @@ -350,80 +369,92 @@ async def handle_action_request(self, action_request_json: str) -> str: step_id=step.id, plan_id=step.plan_id, session_id=action_request.session_id, - result=result, + result=response_content, status=StepStatus.completed, ) - + return response.json() async def invoke_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str: """Invoke a specific tool by name with the provided arguments. - + Args: tool_name: The name of the tool to invoke arguments: A dictionary of arguments to pass to the tool - + Returns: The result of the tool invocation as a string - + Raises: ValueError: If the tool is not found """ # Find the tool by name in the agent's tools list tool = next((t for t in self._tools if t.name == tool_name), None) - + if not tool: # Try looking up the tool in the kernel's plugins plugin_name = f"{self._agent_name.lower().replace('agent', '')}_plugin" try: tool = self._kernel.get_function(plugin_name, tool_name) except Exception: - raise ValueError(f"Tool '{tool_name}' not found in agent tools or kernel plugins") - + raise ValueError( + f"Tool '{tool_name}' not found in agent tools or kernel plugins" + ) + if not tool: raise ValueError(f"Tool '{tool_name}' not found") - + try: # Create kernel arguments from the dictionary kernel_args = KernelArguments() for key, value in arguments.items(): kernel_args[key] = value - + # Invoke the tool logging.info(f"Invoking tool '{tool_name}' with arguments: {arguments}") - + # Use invoke_with_args_dict directly instead of relying on KernelArguments - if hasattr(tool, 'invoke_with_args_dict') and callable(tool.invoke_with_args_dict): + if hasattr(tool, "invoke_with_args_dict") and callable( + tool.invoke_with_args_dict + ): result = await tool.invoke_with_args_dict(arguments) else: # Fall back to standard invoke method result = await tool.invoke(kernel_args) - + # Log telemetry if configured - track_event_if_configured("AgentToolInvocation", { - "agent_name": self._agent_name, - "tool_name": tool_name, - "session_id": self._session_id, - "user_id": self._user_id - }) - + track_event_if_configured( + "AgentToolInvocation", + { + "agent_name": self._agent_name, + "tool_name": tool_name, + "session_id": self._session_id, + "user_id": self._user_id, + }, + ) + return str(result) except Exception as e: logging.error(f"Error invoking tool '{tool_name}': {str(e)}") raise @staticmethod - def create_dynamic_function(name: str, response_template: str, formatting_instr: str = DEFAULT_FORMATTING_INSTRUCTIONS) -> Callable[..., Awaitable[str]]: + def create_dynamic_function( + name: str, + response_template: str, + formatting_instr: str = DEFAULT_FORMATTING_INSTRUCTIONS, + ) -> Callable[..., Awaitable[str]]: """Create a dynamic function for agent tools based on the name and template. - + Args: name: The name of the function to create response_template: The template string to use for the response formatting_instr: Optional formatting instructions to append to the response - + Returns: A dynamic async function that can be registered with the semantic kernel """ + async def dynamic_function(**kwargs) -> str: try: # Format the template with the provided kwargs @@ -436,16 +467,15 @@ async def dynamic_function(**kwargs) -> str: return f"Error: Missing parameter {e} for {name}" except Exception as e: return f"Error processing {name}: {str(e)}" - + # Name the function properly for better debugging dynamic_function.__name__ = name - + # Create a wrapped kernel function that matches the expected signature - @kernel_function( - description=f"Dynamic function: {name}", - name=name - ) - async def kernel_wrapper(kernel_arguments: KernelArguments = None, **kwargs) -> str: + @kernel_function(description=f"Dynamic function: {name}", name=name) + async def kernel_wrapper( + kernel_arguments: KernelArguments = None, **kwargs + ) -> str: # Combine all arguments into one dictionary all_args = {} if kernel_arguments: @@ -453,36 +483,42 @@ async def kernel_wrapper(kernel_arguments: KernelArguments = None, **kwargs) -> all_args[key] = value all_args.update(kwargs) return await dynamic_function(**all_args) - + return kernel_wrapper @staticmethod - def load_tools_config(filename: str, config_path: Optional[str] = None) -> Dict[str, Any]: + def load_tools_config( + filename: str, config_path: Optional[str] = None + ) -> Dict[str, Any]: """Load tools configuration from a JSON file. - + Args: filename: The filename without extension (e.g., "hr", "marketing") config_path: Optional explicit path to the configuration file - + Returns: A dictionary containing the configuration """ if config_path is None: # Default path relative to the tools directory current_dir = os.path.dirname(os.path.abspath(__file__)) - backend_dir = os.path.dirname(current_dir) # Just one level up to get to backend dir - + backend_dir = os.path.dirname( + current_dir + ) # Just one level up to get to backend dir + # Normalize filename to avoid issues with spaces and capitalization # Convert "Hr Agent" to "hr" and "TechSupport Agent" to "tech_support" - logging.debug(f"Normalizing filename: {filename}") + logging.info(f"Normalizing filename: {filename}") normalized_filename = filename.replace(" ", "_").replace("-", "_").lower() # If it ends with "_agent", remove it if normalized_filename.endswith("_agent"): normalized_filename = normalized_filename[:-6] - logging - config_path = os.path.join(backend_dir, "tools", f"{normalized_filename}_tools.json") - logging.debug(f"Looking for tools config at: {config_path}") - + + config_path = os.path.join( + backend_dir, "tools", f"{normalized_filename}_tools.json" + ) + logging.info(f"Looking for tools config at: {config_path}") + try: with open(config_path, "r") as f: return json.load(f) @@ -492,64 +528,77 @@ def load_tools_config(filename: str, config_path: Optional[str] = None) -> Dict[ return { "agent_name": f"{filename.capitalize()}Agent", "system_message": "You are an AI assistant", - "tools": [] + "tools": [], } @classmethod - def get_tools_from_config(cls, kernel: sk.Kernel, agent_type: str, config_path: Optional[str] = None) -> List[KernelFunction]: + def get_tools_from_config( + cls, kernel: sk.Kernel, agent_type: str, config_path: Optional[str] = None + ) -> List[KernelFunction]: """Get the list of tools for an agent from configuration. - + Args: kernel: The semantic kernel instance agent_type: The type of agent (e.g., "marketing", "hr") config_path: Optional explicit path to the configuration file - + Returns: A list of KernelFunction objects representing the tools """ # Load configuration config = cls.load_tools_config(agent_type, config_path) - + # Convert the configured tools to kernel functions kernel_functions = [] plugin_name = f"{agent_type}_plugin" - + # Early return if no tools defined - prevent empty iteration if not config.get("tools"): - logging.info(f"No tools defined for agent type '{agent_type}'. Returning empty list.") + logging.info( + f"No tools defined for agent type '{agent_type}'. Returning empty list." + ) return kernel_functions - + for tool in config.get("tools", []): try: function_name = tool["name"] description = tool.get("description", "") # Create a dynamic function using the JSON response_template - response_template = tool.get("response_template") or tool.get("prompt_template") or "" - + response_template = ( + tool.get("response_template") or tool.get("prompt_template") or "" + ) + # Generate a dynamic function using our improved approach - dynamic_fn = cls.create_dynamic_function(function_name, response_template) - + dynamic_fn = cls.create_dynamic_function( + function_name, response_template + ) + # Create kernel function from the decorated function kernel_func = KernelFunction.from_method(dynamic_fn) - + # Add parameter metadata from JSON to the kernel function for param in tool.get("parameters", []): param_name = param.get("name", "") param_desc = param.get("description", "") param_type = param.get("type", "string") - + # Set this parameter in the function's metadata if param_name: - logging.debug(f"Adding parameter '{param_name}' to function '{function_name}'") - + logging.info( + f"Adding parameter '{param_name}' to function '{function_name}'" + ) + # Register the function with the kernel kernel.add_function(plugin_name, kernel_func) kernel_functions.append(kernel_func) - logging.debug(f"Successfully created dynamic tool '{function_name}' for {agent_type}") + logging.info( + f"Successfully created dynamic tool '{function_name}' for {agent_type}" + ) except Exception as e: - logging.error(f"Failed to create tool '{tool.get('name', 'unknown')}': {str(e)}") - - + logging.error( + f"Failed to create tool '{tool.get('name', 'unknown')}': {str(e)}" + ) + return kernel_functions def save_state(self) -> Mapping[str, Any]: @@ -558,4 +607,4 @@ def save_state(self) -> Mapping[str, Any]: def load_state(self, state: Mapping[str, Any]) -> None: """Load the state of this agent.""" - self._memory_store.load_state(state["memory"]) \ No newline at end of file + self._memory_store.load_state(state["memory"]) diff --git a/src/backend/kernel_agents/agent_factory.py b/src/backend/kernel_agents/agent_factory.py index 73d8ec8a9..eaedf1521 100644 --- a/src/backend/kernel_agents/agent_factory.py +++ b/src/backend/kernel_agents/agent_factory.py @@ -121,19 +121,31 @@ async def create_agent( ) -> BaseAgent: """Create an agent of the specified type. + This method creates and initializes an agent instance of the specified type. If an agent + of the same type already exists for the session, it returns the cached instance. The method + handles the complete initialization process including: + 1. Creating a memory store for the agent + 2. Setting up the Semantic Kernel + 3. Loading appropriate tools from JSON configuration files + 4. Creating an Azure AI agent definition using the AI Project client + 5. Initializing the agent with all required parameters + 6. Running any asynchronous initialization if needed + 7. Caching the agent for future use + Args: - agent_type: The type of agent to create - session_id: The session ID - user_id: The user ID - temperature: The temperature to use for the agent - system_message: Optional system message for the agent + agent_type: The type of agent to create (from AgentType enum) + session_id: The unique identifier for the current session + user_id: The user identifier for the current user + temperature: The temperature parameter for the agent's responses (0.0-1.0) + system_message: Optional custom system message to override default + response_format: Optional response format configuration for structured outputs **kwargs: Additional parameters to pass to the agent constructor Returns: - An instance of the specified agent type + An initialized instance of the specified agent type Raises: - ValueError: If the agent type is unknown + ValueError: If the agent type is unknown or initialization fails """ # Check if we already have an agent in the cache if ( @@ -174,24 +186,23 @@ async def create_agent( client = config.get_ai_project_client() except Exception as client_exc: logger.error(f"Error creating AIProjectClient: {client_exc}") - if agent_type == AgentType.GROUP_CHAT_MANAGER: - logger.info( - f"Continuing with GroupChatManager creation despite AIProjectClient error" - ) - else: - raise + raise try: # Create the agent definition using the AIProjectClient (project-based pattern) # For GroupChatManager, create a definition with minimal configuration if client is not None: - definition = await client.agents.create_agent( - model=config.AZURE_OPENAI_DEPLOYMENT_NAME, - name=agent_type_str, - instructions=system_message, - temperature=temperature, - response_format=response_format, # Add response_format if required - ) + try: + definition = await client.agents.get_agent(agent_type_str) + + except Exception as get_agent_exc: + definition = await client.agents.create_agent( + model=config.AZURE_OPENAI_DEPLOYMENT_NAME, + name=agent_type_str, + instructions=system_message, + temperature=temperature, + response_format=response_format, # Add response_format if required + ) logger.info( f"Successfully created agent definition for {agent_type_str}" ) @@ -199,12 +210,8 @@ async def create_agent( logger.error( f"Error creating agent definition with AIProjectClient for {agent_type_str}: {agent_exc}" ) - if agent_type == AgentType.GROUP_CHAT_MANAGER: - logger.info( - f"Continuing with GroupChatManager creation despite definition error" - ) - else: - raise + + raise # Create the agent instance using the project-based pattern try: @@ -228,13 +235,13 @@ async def create_agent( if k in valid_keys } agent = agent_class(**filtered_kwargs) - logger.debug(f"[DEBUG] Agent object after instantiation: {agent}") + logger.info(f"[DEBUG] Agent object after instantiation: {agent}") # Initialize the agent asynchronously if it has async_init if hasattr(agent, "async_init") and inspect.iscoroutinefunction( agent.async_init ): init_result = await agent.async_init() - logger.debug(f"[DEBUG] Result of agent.async_init(): {init_result}") + logger.info(f"[DEBUG] Result of agent.async_init(): {init_result}") # Register tools with Azure AI Agent for LLM function calls if ( hasattr(agent, "_agent") @@ -321,21 +328,24 @@ async def _load_tools_for_agent( async def create_all_agents( cls, session_id: str, user_id: str, temperature: float = 0.0 ) -> Dict[AgentType, BaseAgent]: - """Create all agent types for a session. + """Create all agent types for a session in a specific order. + + This method creates all agent instances for a session in a multi-phase approach: + 1. First, it creates all basic agent types except for the Planner and GroupChatManager + 2. Then it creates the Planner agent, providing it with references to all other agents + 3. Finally, it creates the GroupChatManager with references to all agents including the Planner + + This ordered creation ensures that dependencies between agents are properly established, + particularly for the Planner and GroupChatManager which need to coordinate other agents. Args: - session_id: The session ID - user_id: The user ID - temperature: The temperature to use for the agents + session_id: The unique identifier for the current session + user_id: The user identifier for the current user + temperature: The temperature parameter for agent responses (0.0-1.0) Returns: - Dictionary mapping agent types to agent instances + Dictionary mapping agent types (from AgentType enum) to initialized agent instances """ - # Check if we already have all agents in the cache - if session_id in cls._agent_cache and len(cls._agent_cache[session_id]) == len( - cls._agent_classes - ): - return cls._agent_cache[session_id] # Create each agent type in two phases # First, create all agents except PlannerAgent and GroupChatManager @@ -363,16 +373,15 @@ async def create_all_agents( # Create agent name to instance mapping for the planner agent_instances = {} for agent_type, agent in agents.items(): - agent_name = ( - cls._agent_type_strings.get(agent_type).replace("_", "") + "Agent" + agent_name = agent_type.value + + logging.info( + f"Creating agent instance for {agent_name} with type {agent_type}" ) - agent_name = ( - agent_name[0].upper() + agent_name[1:] - ) # Capitalize first letter agent_instances[agent_name] = agent # Log the agent instances for debugging - logger.debug( + logger.info( f"Created {len(agent_instances)} agent instances for planner: {', '.join(agent_instances.keys())}" ) @@ -391,6 +400,9 @@ async def create_all_agents( ) ), ) + agent_instances[AgentType.PLANNER.value] = ( + planner_agent # to pass it to group chat manager + ) agents[planner_agent_type] = planner_agent # Phase 3: Create group chat manager with all agents including the planner @@ -399,7 +411,7 @@ async def create_all_agents( session_id=session_id, user_id=user_id, temperature=temperature, - available_agents=agent_instances, # Pass all agents to group chat manager + agent_instances=agent_instances, # Pass agent instances to the planner ) agents[group_chat_manager_type] = group_chat_manager diff --git a/src/backend/kernel_agents/agent_utils.py b/src/backend/kernel_agents/agent_utils.py index 4228737f9..ee16c3dba 100644 --- a/src/backend/kernel_agents/agent_utils.py +++ b/src/backend/kernel_agents/agent_utils.py @@ -13,6 +13,7 @@ class FSMStateAndTransition(BaseModel): """Model for state and transition in a finite state machine.""" + identifiedTargetState: str identifiedTargetTransition: str @@ -25,61 +26,65 @@ async def extract_and_update_transition_states( kernel: sk.Kernel, ) -> Optional[Step]: """ - This function extracts the identified target state and transition from the LLM response and updates + This function extracts the identified target state and transition from the LLM response and updates the step with the identified target state and transition. This is reliant on the agent_reply already being present. - + Args: step: The step to update session_id: The current session ID user_id: The user ID planner_dynamic_or_workflow: Type of planner kernel: The semantic kernel instance - + Returns: The updated step or None if extraction fails """ planner_dynamic_or_workflow = "workflow" if planner_dynamic_or_workflow == "workflow": cosmos = CosmosMemoryContext(session_id=session_id, user_id=user_id) - + # Create chat history for the semantic kernel completion messages = [ {"role": "assistant", "content": step.action}, {"role": "assistant", "content": step.agent_reply}, - {"role": "assistant", "content": "Based on the above conversation between two agents, I need you to identify the identifiedTargetState and identifiedTargetTransition values. Only return these values. Do not make any function calls. If you are unable to work out the next transition state, return ERROR."} + { + "role": "assistant", + "content": "Based on the above conversation between two agents, I need you to identify the identifiedTargetState and identifiedTargetTransition values. Only return these values. Do not make any function calls. If you are unable to work out the next transition state, return ERROR.", + }, ] # Get the LLM response using semantic kernel completion_service = kernel.get_service("completion") - + try: completion_result = await completion_service.complete_chat_async( messages=messages, - execution_settings={ - "response_format": {"type": "json_object"} - } + execution_settings={"response_format": {"type": "json_object"}}, ) - + content = completion_result - + # Parse the LLM response parsed_result = json.loads(content) structured_plan = FSMStateAndTransition(**parsed_result) - + # Update the step step.identified_target_state = structured_plan.identifiedTargetState - step.identified_target_transition = structured_plan.identifiedTargetTransition - + step.identified_target_transition = ( + structured_plan.identifiedTargetTransition + ) + await cosmos.update_step(step) return step - + except Exception as e: print(f"Error extracting transition states: {e}") return None + # The commented-out functions below would be implemented when needed # async def set_next_viable_step_to_runnable(session_id): # pass # async def initiate_replanning(session_id): -# pass \ No newline at end of file +# pass diff --git a/src/backend/kernel_agents/generic_agent.py b/src/backend/kernel_agents/generic_agent.py index fef2d3d26..cde553010 100644 --- a/src/backend/kernel_agents/generic_agent.py +++ b/src/backend/kernel_agents/generic_agent.py @@ -8,6 +8,7 @@ from context.cosmos_memory_kernel import CosmosMemoryContext from models.messages_kernel import AgentType + class GenericAgent(BaseAgent): """Generic agent implementation using Semantic Kernel.""" @@ -25,7 +26,7 @@ def __init__( definition=None, ) -> None: """Initialize the Generic Agent. - + Args: kernel: The semantic kernel instance session_id: The current session identifier @@ -43,17 +44,19 @@ def __init__( # Load the generic tools configuration config = self.load_tools_config("generic", config_path) tools = self.get_tools_from_config(kernel, "generic", config_path) - + # Use system message from config if not explicitly provided if not system_message: - system_message = config.get("system_message", + system_message = config.get( + "system_message", "You are a generic agent. You are used to handle generic tasks that a general Large Language Model can assist with. " "You are being called as a fallback, when no other agents are able to use their specialised functions in order to solve " - "the user's task. Summarize back to the user what was done.") - + "the user's task. Summarize back to the user what was done.", + ) + # Use agent name from config if available agent_name = AgentType.GENERIC.value - + # Call the parent initializer super().__init__( agent_name=agent_name, @@ -64,20 +67,20 @@ def __init__( tools=tools, system_message=system_message, client=client, - definition=definition + definition=definition, ) - + # Explicitly inherit handle_action_request from the parent class # This is not technically necessary but makes the inheritance explicit async def handle_action_request(self, action_request_json: str) -> str: """Handle an action request from another agent or the system. - + This method is inherited from BaseAgent but explicitly included here for clarity. - + Args: action_request_json: The action request as a JSON string - + Returns: A JSON string containing the action response """ - return await super().handle_action_request(action_request_json) \ No newline at end of file + return await super().handle_action_request(action_request_json) diff --git a/src/backend/kernel_agents/group_chat_manager.py b/src/backend/kernel_agents/group_chat_manager.py index 288ba60eb..39e65142b 100644 --- a/src/backend/kernel_agents/group_chat_manager.py +++ b/src/backend/kernel_agents/group_chat_manager.py @@ -6,29 +6,13 @@ import semantic_kernel as sk from semantic_kernel.functions.kernel_arguments import KernelArguments +from semantic_kernel.functions.kernel_function import KernelFunction from semantic_kernel.agents import AgentGroupChat # pylint: disable=E0611 from semantic_kernel.agents.strategies import ( SequentialSelectionStrategy, TerminationStrategy, ) -# Updated imports for compatibility -try: - # Try importing from newer structure first - from semantic_kernel.contents import ChatMessageContent, ChatHistory -except ImportError: - # Fall back to older structure for compatibility - class ChatMessageContent: - """Compatibility class for older SK versions.""" - def __init__(self, role="", content="", name=None): - self.role = role - self.content = content - self.name = name - - class ChatHistory: - """Compatibility class for older SK versions.""" - def __init__(self): - self.messages = [] from kernel_agents.agent_base import BaseAgent from context.cosmos_memory_kernel import CosmosMemoryContext @@ -36,6 +20,7 @@ def __init__(self): ActionRequest, ActionResponse, AgentMessage, + HumanFeedback, Step, StepStatus, PlanStatus, @@ -47,32 +32,11 @@ def __init__(self): from event_utils import track_event_if_configured -class GroupChatManagerClass: - """A class for service compatibility with Semantic Kernel.""" - # Defining properties needed by Semantic Kernel - service_id = "" - - def __init__(self, manager): - self.manager = manager - self.service_id = f"group_chat_manager_{manager._session_id}" - - async def execute_next_step(self, kernel_arguments: KernelArguments) -> str: - """Execute the next step in the plan. - - Args: - kernel_arguments: KernelArguments that should contain session_id and plan_id - - Returns: - Status message - """ - return await self.manager.execute_next_step(kernel_arguments) - +class GroupChatManager(BaseAgent): + """GroupChatManager agent implementation using Semantic Kernel. -class GroupChatManager: - """Group Chat Manager implementation using Semantic Kernel's AgentGroupChat. - - This manager coordinates conversations between different agents and ensures - the plan executes smoothly by orchestrating agent interactions. + This agent creates and manages plans based on user tasks, breaking them down into steps + that can be executed by specialized agents to achieve the user's goal. """ def __init__( @@ -81,358 +45,251 @@ def __init__( session_id: str, user_id: str, memory_store: CosmosMemoryContext, - available_agents: Optional[Dict[str, Any]] = None, + tools: Optional[List[KernelFunction]] = None, + system_message: Optional[str] = None, + agent_name: str = AgentType.GROUP_CHAT_MANAGER.value, + agent_tools_list: List[str] = None, + agent_instances: Optional[Dict[str, BaseAgent]] = None, + client=None, + definition=None, ) -> None: - """Initialize the Group Chat Manager. - + """Initialize the GroupChatManager Agent. + Args: kernel: The semantic kernel instance session_id: The current session identifier user_id: The user identifier memory_store: The Cosmos memory context - config_path: Optional path to the group_chat_manager tools configuration file - available_agents: Dictionary of available agents mapped by their name + tools: Optional list of tools for this agent + system_message: Optional system message for the agent + agent_name: Optional name for the agent (defaults to "GroupChatManagerAgent") + config_path: Optional path to the configuration file + available_agents: List of available agent names for creating steps + agent_tools_list: List of available tools across all agents + agent_instances: Dictionary of agent instances available to the GroupChatManager + client: Optional client instance (passed to BaseAgent) + definition: Optional definition instance (passed to BaseAgent) """ - self._kernel = kernel - self._session_id = session_id - self._user_id = user_id - self._memory_store = memory_store - - # Store available agents - self._agent_instances = available_agents or {} - - # Initialize the AgentGroupChat later when all agents are registered - self._agent_group_chat = None - self._initialized = False - - # Create a wrapper class for service registration - service_wrapper = GroupChatManagerClass(self) - - try: - # Register with kernel using the service_wrapper - if hasattr(kernel, "register_services"): - kernel.register_services({service_wrapper.service_id: service_wrapper}) - logging.info(f"Registered GroupChatManager as kernel service with ID: {service_wrapper.service_id}") - elif hasattr(kernel, "services") and hasattr(kernel.services, "register_service"): - kernel.services.register_service(service_wrapper.service_id, service_wrapper) - logging.info(f"Registered GroupChatManager as kernel service with ID: {service_wrapper.service_id}") - elif hasattr(kernel, "services") and isinstance(kernel.services, dict): - # Last resort: directly add to services dictionary - kernel.services[service_wrapper.service_id] = service_wrapper - logging.info(f"Added GroupChatManager to kernel services dictionary with ID: {service_wrapper.service_id}") - else: - logging.warning("Could not register GroupChatManager service. Semantic Kernel version might be incompatible.") - except Exception as e: - logging.error(f"Error registering GroupChatManager service: {e}") - # Continue without crashing - - async def initialize_group_chat(self) -> None: - """Initialize the AgentGroupChat with registered agents and strategies.""" - if self._initialized: - return - - # Create the AgentGroupChat with registered agents and strategies - self._agent_group_chat = AgentGroupChat( - agents=list(self._agent_instances.values()), - termination_strategy=self.PlanTerminationStrategy(agents=list(self._agent_instances.values())), - selection_strategy=self.PlanSelectionStrategy(agents=list(self._agent_instances.values())), + # Default system message if not provided + if not system_message: + system_message = "You are a GroupChatManager agent responsible for creating and managing plans. You analyze tasks, break them down into steps, and assign them to the appropriate specialized agents." + + # Initialize the base agent + super().__init__( + agent_name=agent_name, + kernel=kernel, + session_id=session_id, + user_id=user_id, + memory_store=memory_store, + tools=tools, + system_message=system_message, + agent_type=AgentType.GROUP_CHAT_MANAGER.value, # Use GroupChatManager_tools.json if available + client=client, + definition=definition, ) - - self._initialized = True - logging.info(f"Initialized AgentGroupChat with {len(self._agent_instances)} agents") - async def register_agent(self, agent_name: str, agent: BaseAgent) -> None: - """Register an agent with the Group Chat Manager. - - Args: - agent_name: The name of the agent - agent: The agent instance - """ - self._agent_instances[agent_name] = agent - self._initialized = False # Need to re-initialize after adding new agents - logging.info(f"Registered agent {agent_name} with Group Chat Manager") - - class PlanSelectionStrategy(SequentialSelectionStrategy): - """Strategy for determining which agent should take the next turn in the chat. - - This strategy follows the progression of a plan, selecting agents based on - the current step or phase of the plan execution. - """ + # Store additional GroupChatManager-specific attributes + self._available_agents = [ + AgentType.HUMAN.value, + AgentType.HR.value, + AgentType.MARKETING.value, + AgentType.PRODUCT.value, + AgentType.PROCUREMENT.value, + AgentType.TECH_SUPPORT.value, + AgentType.GENERIC.value, + ] + self._agent_tools_list = agent_tools_list or [] + self._agent_instances = agent_instances or {} - async def select_agent(self, agents, history): - """Select the next agent that should take the turn in the chat. - - Args: - agents: List of available agents - history: Chat history (ChatHistory object) - - Returns: - The next agent to take the turn - """ - # If no history, start with the PlannerAgent - if not history or not history.messages: - return next((agent for agent in agents if agent.name == AgentType.PLANNER.value), None) - - # Get the last message - last_message = history.messages[-1] - - # Extract name from the message - in SK ChatMessageContent - last_sender = last_message.role - if hasattr(last_message, 'name') and last_message.name: - last_sender = last_message.name - - # Route based on the last sender - if last_sender == AgentType.PLANNER.value: - # After the planner creates a plan, HumanAgent should review it - return next((agent for agent in agents if agent.name == AgentType.HUMAN.value), None) - elif last_sender == AgentType.HUMAN.value: - # After human feedback, the specific agent for the step should proceed - # For simplicity, use GenericAgent as fallback - return next((agent for agent in agents if agent.name == AgentType.GENERIC.value), None) - elif last_sender == AgentType.GROUP_CHAT_MANAGER.value: - # If the manager just assigned a step, find the agent that should execute it - # For simplicity, just rotate to the next agent - agent_names = [agent.name for agent in agents] - try: - current_index = agent_names.index(last_sender) - next_index = (current_index + 1) % len(agents) - return agents[next_index] - except ValueError: - return agents[0] if agents else None - else: - # Default to the Group Chat Manager - return next((agent for agent in agents if agent.name ==AgentType.GROUP_CHAT_MANAGER.value), - agents[0] if agents else None) - - class PlanTerminationStrategy(TerminationStrategy): - """Strategy for determining when the agent group chat should terminate. - - This strategy decides when the plan is complete or when a human needs to - provide additional input to continue. + # Create the Azure AI Agent for group chat operations + # This will be initialized in async_init + self._azure_ai_agent = None + + async def handle_input_task(self, message: InputTask) -> Plan: """ - - def __init__(self, agents, maximum_iterations=10, automatic_reset=True): - """Initialize the termination strategy. - - Args: - agents: List of agents in the group chat - maximum_iterations: Maximum number of iterations before termination - automatic_reset: Whether to reset the agent after termination - """ - super().__init__(maximum_iterations, automatic_reset) - self._agents = agents - - async def should_terminate(self, history, agents=None) -> bool: - """Check if the chat should terminate. - - Args: - history: Chat history as a ChatHistory object - agents: List of agents (optional, uses self._agents if not provided) - - Returns: - True if the chat should terminate, False otherwise - """ - # Default termination conditions from parent class - if await super().should_terminate(history, agents or self._agents): - return True - - # If no history, continue the chat - if not history or not history.messages: - return False - - # Get the last message - last_message = history.messages[-1] - - # End the chat if the plan is completed or if human intervention is required - if "plan completed" in last_message.content.lower(): - return True - - if "human intervention required" in last_message.content.lower(): - return True - - # Terminate if we encounter a specific error condition - if "error" in last_message.content.lower() and "cannot proceed" in last_message.content.lower(): - return True - - # Otherwise, continue the chat - return False - - async def handle_input_task(self, input_task_json: str) -> str: - """Handle the initial input task from the user. - - Args: - input_task_json: Input task in JSON format - - Returns: - Status message + Handles the input task from the user. This is the initial message that starts the conversation. + This method should create a new plan. """ - # Parse the input task - input_task = InputTask.parse_raw(input_task_json) - - # Store the user's message + logging.info(f"Received input task: {message}") await self._memory_store.add_item( AgentMessage( - session_id=input_task.session_id, + session_id=message.session_id, user_id=self._user_id, plan_id="", - content=f"{input_task.description}", + content=f"{message.description}", source=AgentType.HUMAN.value, step_id="", ) ) - + track_event_if_configured( "Group Chat Manager - Received and added input task into the cosmos", { - "session_id": input_task.session_id, + "session_id": message.session_id, "user_id": self._user_id, - "content": input_task.description, + "content": message.description, "source": AgentType.HUMAN.value, }, ) - - # Ensure the planner agent is registered - if AgentType.PLANNER.value not in self._agent_instances: - return f"{AgentType.PLANNER.value} not registered. Cannot create plan." - - # Get the planner agent + + # Send the InputTask to the PlannerAgent planner_agent = self._agent_instances[AgentType.PLANNER.value] - - # Forward the input task to the planner agent to create a plan - planner_args = KernelArguments(input_task_json=input_task_json) - plan_result = await planner_agent.handle_input_task(planner_args) - - return f"Plan creation initiated: {plan_result}" - - async def handle_human_feedback(self, human_feedback_json: str) -> str: - """Handle human feedback on steps. - - Args: - human_feedback_json: Human feedback in JSON format - - Returns: - Status message + result = await planner_agent.handle_input_task(message) + logging.info(f"Plan created: {result}") + return result + + async def handle_human_feedback(self, message: HumanFeedback) -> None: + """ + Handles the human approval feedback for a single step or all steps. + Updates the step status and stores the feedback in the session context. + + class HumanFeedback(BaseModel): + step_id: str + plan_id: str + session_id: str + approved: bool + human_feedback: Optional[str] = None + updated_action: Optional[str] = None + + class Step(BaseDataModel): + + data_type: Literal["step"] = Field("step", Literal=True) + plan_id: str + action: str + agent: BAgentType + status: StepStatus = StepStatus.planned + agent_reply: Optional[str] = None + human_feedback: Optional[str] = None + human_approval_status: Optional[HumanFeedbackStatus] = HumanFeedbackStatus.requested + updated_action: Optional[str] = None + session_id: ( + str # Added session_id to the Step model to partition the steps by session_id + ) + ts: Optional[int] = None """ - # Parse the human feedback - human_feedback = json.loads(human_feedback_json) - - session_id = human_feedback.get("session_id", "") - plan_id = human_feedback.get("plan_id", "") - step_id = human_feedback.get("step_id", "") - approved = human_feedback.get("approved", False) - feedback_text = human_feedback.get("human_feedback", "") - - # Get general information + # Need to retrieve all the steps for the plan + logging.info(f"GroupChatManager Received human feedback: {message}") + + steps: List[Step] = await self._memory_store.get_steps_by_plan(message.plan_id) + # Filter for steps that are planned or awaiting feedback + + # Get the first step assigned to HumanAgent for feedback + human_feedback_step: Step = next( + (s for s in steps if s.agent == AgentType.HUMAN), None + ) + + # Determine the feedback to use + if human_feedback_step and human_feedback_step.human_feedback: + # Use the provided human feedback if available + received_human_feedback_on_step = human_feedback_step.human_feedback + else: + received_human_feedback_on_step = "" + + # Provide generic context to the model general_information = f"Today's date is {datetime.now().date()}." - - # Get the plan - plan = await self._memory_store.get_plan(plan_id) - if not plan: - return f"Plan {plan_id} not found" - - # Get plan human clarification if available - if hasattr(plan, 'human_clarification_response') and plan.human_clarification_response: + + # Get the general background information provided by the user in regards to the overall plan (not the steps) to add as context. + plan = await self._memory_store.get_plan_by_session( + session_id=message.session_id + ) + if plan.human_clarification_response: received_human_feedback_on_plan = ( plan.human_clarification_response + " This information may or may not be relevant to the step you are executing - it was feedback provided by the human user on the overall plan, which includes multiple steps, not just the one you are actioning now." ) else: - received_human_feedback_on_plan = "No human feedback provided on the overall plan." - + received_human_feedback_on_plan = ( + "No human feedback provided on the overall plan." + ) # Combine all feedback into a single string received_human_feedback = ( - f"{feedback_text} " + f"{received_human_feedback_on_step} " f"{general_information} " f"{received_human_feedback_on_plan}" ) - - # Get all steps for the plan - steps = await self._memory_store.get_steps_for_plan(plan_id, session_id) - + # Update and execute the specific step if step_id is provided - if step_id: - step = next((s for s in steps if s.id == step_id), None) + if message.step_id: + step = next((s for s in steps if s.id == message.step_id), None) if step: - await self._update_step_status(step, approved, received_human_feedback) - if approved: - return await self._execute_step(session_id, step) + await self._update_step_status( + step, message.approved, received_human_feedback + ) + if message.approved: + await self._execute_step(message.session_id, step) else: - # Handle rejected step + # Notify the GroupChatManager that the step has been rejected + # TODO: Implement this logic later step.status = StepStatus.rejected - if hasattr(step, 'human_approval_status'): - step.human_approval_status = HumanFeedbackStatus.rejected - await self._memory_store.update_step(step) - + step.human_approval_status = HumanFeedbackStatus.rejected + self._memory_store.update_step(step) track_event_if_configured( - "Group Chat Manager - Step has been rejected and updated into the cosmos", + "Group Chat Manager - Steps has been rejected and updated into the cosmos", { "status": StepStatus.rejected, - "session_id": session_id, + "session_id": message.session_id, "user_id": self._user_id, - "human_approval_status": "rejected", + "human_approval_status": HumanFeedbackStatus.rejected, "source": step.agent, }, ) - return f"Step {step_id} rejected" - else: - return f"Step {step_id} not found" else: - # Update all steps if no specific step_id is provided - updates_count = 0 + # Update and execute all steps if no specific step_id is provided for step in steps: - if step.status == StepStatus.planned: - await self._update_step_status(step, approved, received_human_feedback) - if approved: - await self._execute_step(session_id, step) - updates_count += 1 - - return f"Updated {updates_count} steps with human feedback" - - async def _update_step_status(self, step: Step, approved: bool, received_human_feedback: str) -> None: - """Update a step's status based on human feedback. - - Args: - step: The step to update - approved: Whether the step is approved - received_human_feedback: Feedback from human - """ + await self._update_step_status( + step, message.approved, received_human_feedback + ) + if message.approved: + await self._execute_step(message.session_id, step) + else: + # Notify the GroupChatManager that the step has been rejected + # TODO: Implement this logic later + step.status = StepStatus.rejected + step.human_approval_status = HumanFeedbackStatus.rejected + self._memory_store.update_step(step) + track_event_if_configured( + "Group Chat Manager - Step has been rejected and updated into the cosmos", + { + "status": StepStatus.rejected, + "session_id": message.session_id, + "user_id": self._user_id, + "human_approval_status": HumanFeedbackStatus.rejected, + "source": step.agent, + }, + ) + + # Function to update step status and add feedback + async def _update_step_status( + self, step: Step, approved: bool, received_human_feedback: str + ): if approved: step.status = StepStatus.approved - if hasattr(step, 'human_approval_status'): - step.human_approval_status = HumanFeedbackStatus.accepted + step.human_approval_status = HumanFeedbackStatus.accepted else: step.status = StepStatus.rejected - if hasattr(step, 'human_approval_status'): - step.human_approval_status = HumanFeedbackStatus.rejected - + step.human_approval_status = HumanFeedbackStatus.rejected + step.human_feedback = received_human_feedback + step.status = StepStatus.completed await self._memory_store.update_step(step) - track_event_if_configured( "Group Chat Manager - Received human feedback, Updating step and updated into the cosmos", { - "status": step.status, + "status": StepStatus.completed, "session_id": step.session_id, "user_id": self._user_id, "human_feedback": received_human_feedback, "source": step.agent, }, ) - - async def _execute_step(self, session_id: str, step: Step) -> str: - """Execute a step by sending an action request to the appropriate agent. - - Args: - session_id: The session identifier - step: The step to execute - - Returns: - Status message + + async def _execute_step(self, session_id: str, step: Step): """ - # Update step status + Executes the given step by sending an ActionRequest to the appropriate agent. + """ + # Update step status to 'action_requested' step.status = StepStatus.action_requested await self._memory_store.update_step(step) - track_event_if_configured( "Group Chat Manager - Update step to action_requested and updated into the cosmos", { @@ -442,55 +299,80 @@ async def _execute_step(self, session_id: str, step: Step) -> str: "source": step.agent, }, ) - - # Generate conversation history for context - plan = await self._memory_store.get_plan(step.plan_id) - steps = await self._memory_store.get_steps_for_plan(step.plan_id, session_id) - conversation_history = await self._generate_conversation_history(steps, step.id, plan) - - # Create action request with conversation history for context - action_with_history = f"{conversation_history} Here is the step to action: {step.action}. ONLY perform the steps and actions required to complete this specific step, the other steps have already been completed. Only use the conversational history for additional information, if it's required to complete the step you have been assigned." - - # Format agent name for display - if hasattr(step, 'agent') and step.agent: - agent_name = step.agent - formatted_agent = re.sub(r"([a-z])([A-Z])", r"\1 \2", agent_name) + + # generate conversation history for the invoked agent + plan = await self._memory_store.get_plan_by_session(session_id=session_id) + steps: List[Step] = await self._memory_store.get_steps_by_plan(plan.id) + + current_step_id = step.id + # Initialize the formatted string + formatted_string = "" + formatted_string += "Here is the conversation history so far for the current plan. This information may or may not be relevant to the step you have been asked to execute." + formatted_string += f"The user's task was:\n{plan.summary}\n\n" + formatted_string += ( + "The conversation between the previous agents so far is below:\n" + ) + + # Iterate over the steps until the current_step_id + for i, step in enumerate(steps): + if step.id == current_step_id: + break + formatted_string += f"Step {i}\n" + formatted_string += f"Group chat manager: {step.action}\n" + formatted_string += f"{step.agent.name}: {step.agent_reply}\n" + formatted_string += "" + + logging.info(f"Formatted string: {formatted_string}") + + action_with_history = f"{formatted_string}. Here is the step to action: {step.action}. ONLY perform the steps and actions required to complete this specific step, the other steps have already been completed. Only use the conversational history for additional information, if it's required to complete the step you have been assigned." + + # Send action request to the appropriate agent + action_request = ActionRequest( + step_id=step.id, + plan_id=step.plan_id, + session_id=session_id, + action=action_with_history, + agent=step.agent, + ) + logging.info(f"Sending ActionRequest to {step.agent.value}") + + if step.agent != "": + agent_name = step.agent.value + formatted_agent = agent_name else: - # Default to GenericAgent if none specified - agent_name = AgentType.GENERIC.value - formatted_agent = AgentType.GENERIC.value - - # Store the agent message + raise ValueError(f"Check {step.agent} is missing") + await self._memory_store.add_item( AgentMessage( session_id=session_id, user_id=self._user_id, plan_id=step.plan_id, content=f"Requesting {formatted_agent} to perform action: {step.action}", - source="GroupChatManager", + source=AgentType.GROUP_CHAT_MANAGER.value, step_id=step.id, ) ) - + track_event_if_configured( - f"Group Chat Manager - Requesting {agent_name} to perform the action and added into the cosmos", + f"Group Chat Manager - Requesting {formatted_agent} to perform the action and added into the cosmos", { "session_id": session_id, "user_id": self._user_id, "plan_id": step.plan_id, - "content": f"Requesting {agent_name} to perform action: {step.action}", + "content": f"Requesting {formatted_agent} to perform action: {step.action}", "source": "GroupChatManager", "step_id": step.id, }, ) - - # Special handling for HumanAgent - if agent_name == AgentType.HUMAN.value: - # Mark as completed since we have received the human feedback + + if step.agent == AgentType.HUMAN.value: + # we mark the step as complete since we have received the human feedback + # Update step status to 'completed' step.status = StepStatus.completed await self._memory_store.update_step(step) - - logging.info("Marking the step as complete - Since we have received the human feedback") + logging.info( + "Marking the step as complete - Since we have received the human feedback" + ) track_event_if_configured( "Group Chat Manager - Steps completed - Received the human feedback and updated into the cosmos", { @@ -498,183 +380,14 @@ async def _execute_step(self, session_id: str, step: Step) -> str: "user_id": self._user_id, "plan_id": step.plan_id, "content": "Marking the step as complete - Since we have received the human feedback", - "source": agent_name, + "source": step.agent, "step_id": step.id, }, ) - return f"Step {step.id} for HumanAgent marked as completed" - - # Check if agent is registered - if agent_name not in self._agent_instances: - logging.warning(f"Agent {agent_name} not found. Using GenericAgent instead.") - agent_name = "GenericAgent" - if agent_name not in self._agent_instances: - return f"No agent found to handle step {step.id}" - - # Create action request - action_request = ActionRequest( - step_id=step.id, - plan_id=step.plan_id, - session_id=session_id, - action=action_with_history - ) - - # Send action request to the agent - agent = self._agent_instances[agent_name] - result = await agent.handle_action_request(action_request.json()) - - return f"Step {step.id} execution started with {agent_name}: {result}" - - async def run_group_chat(self, user_input: str, plan_id: str = "", step_id: str = "") -> str: - """Run the AgentGroupChat with a given input. - - Args: - user_input: The user input to start the conversation - plan_id: Optional plan ID for context - step_id: Optional step ID for context - - Returns: - Result of the group chat - """ - # Ensure the group chat is initialized - await self.initialize_group_chat() - - try: - # Run the group chat with Semantic Kernel - result = await self._agent_group_chat.invoke_async(user_input) - - # Process the result which could be a ChatHistory object or something else - if hasattr(result, "messages") and result.messages: - messages = result.messages - elif hasattr(result, "value") and isinstance(result.value, list): - messages = result.value - else: - # Fallback for other formats - messages = [] - if isinstance(result, str): - # If it's just a string response - logging.debug(f"Group chat returned a string: {result[:100]}...") - await self._memory_store.add_item( - AgentMessage( - session_id=self._session_id, - user_id=self._user_id, - plan_id=plan_id, - content=result, - source="GroupChatManager", - step_id=step_id, - ) - ) - return result - - # Process the messages from the chat result - final_response = None - - for msg in messages: - # Skip the initial user message - if hasattr(msg, "role") and msg.role == "user" and msg.content == user_input: - continue - - # Determine the source/agent name - source = "assistant" - if hasattr(msg, "name") and msg.name: - source = msg.name - elif hasattr(msg, "role") and msg.role: - source = msg.role - - # Get the message content - content = msg.content if hasattr(msg, "content") else str(msg) - - # Store the message in memory - await self._memory_store.add_item( - AgentMessage( - session_id=self._session_id, - user_id=self._user_id, - plan_id=plan_id, - content=content, - source=source, - step_id=step_id, - ) - ) - - # Keep track of the final response - final_response = content - - # Return the final message from the chat - if final_response: - return final_response - return "Group chat completed with no messages." - - except Exception as e: - logging.exception(f"Error running group chat: {e}") - return f"Error running group chat: {str(e)}" - - async def execute_next_step(self, kernel_arguments: KernelArguments) -> str: - """Execute the next step in the plan. - - Args: - kernel_arguments: KernelArguments that should contain session_id and plan_id - - Returns: - Status message - """ - # Extract arguments - session_id = kernel_arguments.get("session_id", "") - plan_id = kernel_arguments.get("plan_id", "") - - if not session_id or not plan_id: - return "Missing session_id or plan_id in arguments" - - # Get all steps for the plan - steps = await self._memory_store.get_steps_for_plan(plan_id, session_id) - - # Find the next step to execute (first approved or planned step) - next_step = None - for step in steps: - if step.status == StepStatus.approved or step.status == StepStatus.planned: - next_step = step - break - - if not next_step: - # All steps are completed, mark plan as completed - plan = await self._memory_store.get_plan(plan_id) - if plan: - plan.overall_status = PlanStatus.completed - await self._memory_store.update_plan(plan) - return "All steps completed. Plan execution finished." - - return await self._execute_step(session_id, next_step) - - async def _generate_conversation_history(self, steps: List[Step], current_step_id: str, plan: Any) -> str: - """Generate conversation history for context. - - Args: - steps: List of all steps - current_step_id: ID of the current step - plan: The plan object - - Returns: - Formatted conversation history - """ - # Initialize the formatted string - formatted_string = "Here is the conversation history so far for the current plan. This information may or may not be relevant to the step you have been asked to execute." - - # Add plan summary if available - if hasattr(plan, 'summary') and plan.summary: - formatted_string += f"The user's task was:\n{plan.summary}\n\n" - elif hasattr(plan, 'initial_goal') and plan.initial_goal: - formatted_string += f"The user's task was:\n{plan.initial_goal}\n\n" - - formatted_string += "The conversation between the previous agents so far is below:\n" - - # Iterate over the steps until the current_step_id - for i, step in enumerate(steps): - if step.id == current_step_id: - break - - if step.status == StepStatus.completed and hasattr(step, 'agent_reply') and step.agent_reply: - formatted_string += f"Step {i}\n" - formatted_string += f"Group chat manager: {step.action}\n" - formatted_string += f"{step.agent}: {step.agent_reply}\n" - - formatted_string += "" - return formatted_string \ No newline at end of file + else: + # Use the agent from the step to determine which agent to send to + agent = self._agent_instances[step.agent.value] + await agent.handle_action_request( + action_request + ) # this function is in base_agent.py + logging.info(f"Sent ActionRequest to {step.agent.value}") diff --git a/src/backend/kernel_agents/hr_agent.py b/src/backend/kernel_agents/hr_agent.py index 8c595ec86..256aef000 100644 --- a/src/backend/kernel_agents/hr_agent.py +++ b/src/backend/kernel_agents/hr_agent.py @@ -7,9 +7,10 @@ from context.cosmos_memory_kernel import CosmosMemoryContext from models.messages_kernel import AgentType + class HrAgent(BaseAgent): """HR agent implementation using Semantic Kernel. - + This agent provides HR-related functions such as onboarding, benefits management, and employee administration. All tools are loaded from hr_tools.json. """ @@ -28,7 +29,7 @@ def __init__( definition=None, ) -> None: """Initialize the HR Agent. - + Args: kernel: The semantic kernel instance session_id: The current session identifier @@ -46,17 +47,17 @@ def __init__( # Load the HR tools configuration config = self.load_tools_config("hr", config_path) tools = self.get_tools_from_config(kernel, "hr", config_path) - + # Use system message from config if not explicitly provided if not system_message: system_message = config.get( - "system_message", - "You are an AI Agent. You have knowledge about HR (e.g., human resources), policies, procedures, and onboarding guidelines." + "system_message", + "You are an AI Agent. You have knowledge about HR (e.g., human resources), policies, procedures, and onboarding guidelines.", ) - + # Use agent name from config if available agent_name = AgentType.HR.value - + super().__init__( agent_name=agent_name, kernel=kernel, @@ -66,5 +67,5 @@ def __init__( tools=tools, system_message=system_message, client=client, - definition=definition - ) \ No newline at end of file + definition=definition, + ) diff --git a/src/backend/kernel_agents/human_agent.py b/src/backend/kernel_agents/human_agent.py index ee99c49fb..7a0f1e3ef 100644 --- a/src/backend/kernel_agents/human_agent.py +++ b/src/backend/kernel_agents/human_agent.py @@ -7,12 +7,22 @@ from kernel_agents.agent_base import BaseAgent from context.cosmos_memory_kernel import CosmosMemoryContext -from models.messages_kernel import AgentType, HumanFeedback, Step, StepStatus, AgentMessage, ActionRequest +from models.messages_kernel import ( + AgentType, + ApprovalRequest, + HumanClarification, + HumanFeedback, + Step, + StepStatus, + AgentMessage, + ActionRequest, +) from event_utils import track_event_if_configured + class HumanAgent(BaseAgent): """Human agent implementation using Semantic Kernel. - + This agent represents a human user in the system, receiving and processing feedback from humans and passing it to other agents for further action. """ @@ -31,7 +41,7 @@ def __init__( definition=None, ) -> None: """Initialize the Human Agent. - + Args: kernel: The semantic kernel instance session_id: The current session identifier @@ -50,11 +60,11 @@ def __init__( tools = self.get_tools_from_config(kernel, "human", config_path) if not system_message: system_message = config.get( - "system_message", - "You are representing a human user in the conversation. You handle interactions that require human feedback or input." + "system_message", + "You are representing a human user in the conversation. You handle interactions that require human feedback or input.", ) agent_name = AgentType.HUMAN.value - + super().__init__( agent_name=agent_name, kernel=kernel, @@ -64,56 +74,48 @@ def __init__( tools=tools, system_message=system_message, client=client, - definition=definition + definition=definition, ) - - async def handle_human_feedback(self, kernel_arguments: KernelArguments) -> str: + + async def handle_human_feedback(self, human_feedback: HumanFeedback) -> str: """Handle human feedback on a step. - + + This method processes feedback provided by a human user on a specific step in a plan. + It updates the step with the feedback, marks the step as completed, and notifies the + GroupChatManager by creating an ApprovalRequest in the memory store. + Args: - kernel_arguments: Contains the human_feedback_json string - + human_feedback: The HumanFeedback object containing feedback details + including step_id, session_id, and human_feedback text + Returns: - Status message + Status message indicating success or failure of processing the feedback """ - # Parse the human feedback - human_feedback_json = kernel_arguments["human_feedback_json"] - human_feedback = HumanFeedback.parse_raw(human_feedback_json) - + # Get the step - step = await self._memory_store.get_step(human_feedback.step_id, human_feedback.session_id) + step = await self._memory_store.get_step( + human_feedback.step_id, human_feedback.session_id + ) if not step: return f"Step {human_feedback.step_id} not found" - + # Update the step with the feedback step.human_feedback = human_feedback.human_feedback - step.updated_action = human_feedback.updated_action - - if human_feedback.approved: - step.status = StepStatus.approved - else: - step.status = StepStatus.needs_update - + step.status = StepStatus.completed + # Save the updated step await self._memory_store.update_step(step) - - # If approved and updated action is provided, update the step's action - if human_feedback.approved and human_feedback.updated_action: - step.action = human_feedback.updated_action - await self._memory_store.update_step(step) - - # Add a record of the feedback to the memory store await self._memory_store.add_item( AgentMessage( session_id=human_feedback.session_id, - user_id=self._user_id, + user_id=step.user_id, plan_id=step.plan_id, content=f"Received feedback for step: {step.action}", source=AgentType.HUMAN.value, step_id=human_feedback.step_id, ) ) - + # Track the event track_event_if_configured( f"Human Agent - Received feedback for step and added into the cosmos", @@ -126,58 +128,60 @@ async def handle_human_feedback(self, kernel_arguments: KernelArguments) -> str: "step_id": human_feedback.step_id, }, ) - - # Notify the GroupChatManager - if human_feedback.approved: - # Create a request to execute the next step - group_chat_manager_id = f"group_chat_manager_{human_feedback.session_id}" - - # Use GroupChatManager's execute_next_step method - if hasattr(self._kernel, 'get_service'): - group_chat_manager = self._kernel.get_service(group_chat_manager_id) - if group_chat_manager: - await group_chat_manager.execute_next_step( - KernelArguments( - session_id=human_feedback.session_id, - plan_id=step.plan_id - ) - ) - - # Track the approval request event - track_event_if_configured( - f"Human Agent - Approval request sent for step and added into the cosmos", - { - "session_id": human_feedback.session_id, - "user_id": self._user_id, - "plan_id": step.plan_id, - "step_id": human_feedback.step_id, - "agent_id": "GroupChatManager", - }, + + # Notify the GroupChatManager that the step has been completed + await self._memory_store.add_item( + ApprovalRequest( + session_id=human_feedback.session_id, + user_id=self._user_id, + plan_id=step.plan_id, + step_id=human_feedback.step_id, + agent_id=AgentType.GROUP_CHAT_MANAGER.value, ) - + ) + + # Track the approval request event + track_event_if_configured( + f"Human Agent - Approval request sent for step and added into the cosmos", + { + "session_id": human_feedback.session_id, + "user_id": self._user_id, + "plan_id": step.plan_id, + "step_id": human_feedback.step_id, + "agent_id": "GroupChatManager", + }, + ) + return "Human feedback processed successfully" - - async def provide_clarification(self, kernel_arguments: KernelArguments) -> str: + + async def provide_clarification( + self, human_clarification: HumanClarification + ) -> str: """Provide clarification on a plan. - + + This method stores human clarification information for a plan associated with a session. + It retrieves the plan from memory, updates it with the clarification text, and records + the event in telemetry. + Args: - kernel_arguments: Contains session_id and clarification_text - + human_clarification: The HumanClarification object containing the session_id + and clarification_text provided by the human user + Returns: - Status message + Status message indicating success or failure of adding the clarification """ - session_id = kernel_arguments["session_id"] - clarification_text = kernel_arguments["clarification_text"] - + session_id = human_clarification.session_id + clarification_text = human_clarification.clarification_text + # Get the plan associated with this session plan = await self._memory_store.get_plan_by_session(session_id) if not plan: return f"No plan found for session {session_id}" - + # Update the plan with the clarification plan.human_clarification_response = clarification_text await self._memory_store.update_plan(plan) - + # Track the event track_event_if_configured( "Human Agent - Provided clarification for plan", @@ -188,5 +192,5 @@ async def provide_clarification(self, kernel_arguments: KernelArguments) -> str: "clarification": clarification_text, }, ) - - return f"Clarification provided for plan {plan.id}" \ No newline at end of file + + return f"Clarification provided for plan {plan.id}" diff --git a/src/backend/kernel_agents/marketing_agent.py b/src/backend/kernel_agents/marketing_agent.py index bc5047c4e..3b3f7dfcc 100644 --- a/src/backend/kernel_agents/marketing_agent.py +++ b/src/backend/kernel_agents/marketing_agent.py @@ -7,9 +7,10 @@ from context.cosmos_memory_kernel import CosmosMemoryContext from models.messages_kernel import AgentType + class MarketingAgent(BaseAgent): """Marketing agent implementation using Semantic Kernel. - + This agent specializes in marketing strategies, campaign development, content creation, and market analysis. It can create effective marketing campaigns, analyze market trends, develop promotional content, and more. @@ -30,7 +31,7 @@ def __init__( definition=None, ) -> None: """Initialize the Marketing Agent. - + Args: kernel: The semantic kernel instance session_id: The current session identifier @@ -48,17 +49,17 @@ def __init__( # Load the marketing tools configuration config = self.load_tools_config("marketing", config_path) tools = self.get_tools_from_config(kernel, "marketing", config_path) - + # Use system message from config if not explicitly provided if not system_message: system_message = config.get( - "system_message", - "You are an AI Agent. You have knowledge about marketing, including campaigns, market research, and promotional activities." + "system_message", + "You are an AI Agent. You have knowledge about marketing, including campaigns, market research, and promotional activities.", ) - + # Use agent name from config if available agent_name = AgentType.MARKETING.value - + super().__init__( agent_name=agent_name, kernel=kernel, @@ -68,5 +69,5 @@ def __init__( tools=tools, system_message=system_message, client=client, - definition=definition - ) \ No newline at end of file + definition=definition, + ) diff --git a/src/backend/kernel_agents/planner_agent.py b/src/backend/kernel_agents/planner_agent.py index 36ce2546d..cf3148d59 100644 --- a/src/backend/kernel_agents/planner_agent.py +++ b/src/backend/kernel_agents/planner_agent.py @@ -93,7 +93,7 @@ def __init__( AgentType.HUMAN.value, AgentType.HR.value, AgentType.MARKETING.value, - AgentType.PRODUCT, + AgentType.PRODUCT.value, AgentType.PROCUREMENT.value, AgentType.TECH_SUPPORT.value, AgentType.GENERIC.value, @@ -105,48 +105,6 @@ def __init__( # This will be initialized in async_init self._azure_ai_agent = None - def _get_response_format_schema(self) -> dict: - """ - Returns a JSON schema that defines the expected structure of the response. - This ensures responses from the agent will match the required format exactly. - """ - return { - "type": "object", - "properties": { - "initial_goal": { - "type": "string", - "description": "The primary goal extracted from the user's input task", - }, - "steps": { - "type": "array", - "description": "List of steps required to complete the task", - "items": { - "type": "object", - "properties": { - "action": { - "type": "string", - "description": "A clear instruction for the agent including the function name to use", - }, - "agent": { - "type": "string", - "description": "The name of the agent responsible for this step", - }, - }, - "required": ["action", "agent"], - }, - }, - "summary_plan_and_steps": { - "type": "string", - "description": "A concise summary of the overall plan and its steps in less than 50 words", - }, - "human_clarification_request": { - "type": ["string", "null"], - "description": "Optional request for additional information needed from the user", - }, - }, - "required": ["initial_goal", "steps", "summary_plan_and_steps"], - } - async def async_init(self) -> None: """Asynchronously initialize the PlannerAgent. @@ -168,12 +126,12 @@ async def async_init(self) -> None: instructions=instructions, # Pass the formatted string, not an object temperature=0.0, response_format=ResponseFormatJsonSchemaType( - json_schema=ResponseFormatJsonSchema( - name=PlannerResponsePlan.__name__, - description=f"respond with {PlannerResponsePlan.__name__.lower()}", - schema=PlannerResponsePlan.model_json_schema(), - ) - ), + json_schema=ResponseFormatJsonSchema( + name=PlannerResponsePlan.__name__, + description=f"respond with {PlannerResponsePlan.__name__.lower()}", + schema=PlannerResponsePlan.model_json_schema(), + ) + ), ) logging.info("Successfully created Azure AI Agent for PlannerAgent") return True @@ -331,12 +289,12 @@ async def _create_structured_plan( try: # Generate the instruction for the LLM logging.info("Generating instruction for the LLM") - logging.debug(f"Input: {input_task}") - logging.debug(f"Available agents: {self._available_agents}") + logging.info(f"Input: {input_task}") + logging.info(f"Available agents: {self._available_agents}") # Get template variables as a dictionary args = self._generate_args(input_task.description) - + logging.info(f"Generated args: {args}") logging.info(f"Creating plan for task: '{input_task.description}'") logging.info(f"Using available agents: {self._available_agents}") @@ -355,10 +313,9 @@ async def _create_structured_plan( kernel_args = KernelArguments(**args) # kernel_args["input"] = f"TASK: {input_task.description}\n\n{instruction}" - logging.debug(f"Kernel arguments: {kernel_args}") + logging.info(f"Kernel arguments: {kernel_args}") # Get the schema for our expected response format - response_format_schema = self._get_response_format_schema() # Ensure we're using the right pattern for Azure AI agents with semantic kernel # Properly handle async generation @@ -367,10 +324,6 @@ async def _create_structured_plan( settings={ "temperature": 0.0, # Keep temperature low for consistent planning "max_tokens": 10096, # Ensure we have enough tokens for the full plan - "response_format": { - "type": "json_object", - "schema": response_format_schema, - }, }, ) @@ -383,7 +336,6 @@ async def _create_structured_plan( response_content += str(chunk) logging.info(f"Response content length: {len(response_content)}") - logging.debug(f"Response content: {response_content[:500]}...") # Check if response is empty or whitespace if not response_content or response_content.isspace(): @@ -398,6 +350,9 @@ async def _create_structured_plan( try: parsed_result = PlannerResponsePlan.parse_raw(response_content) logging.info("Successfully parsed response with direct parsing") + logging.info(f"\n\n\n\n") + logging.info(f"Parsed result: {parsed_result}") + logging.info(f"\n\n\n\n") except Exception as parse_error: logging.warning(f"Failed direct parse: {parse_error}") @@ -727,6 +682,7 @@ def _generate_args(self, objective: str) -> any: if hasattr(self, "_agent_instances") and self._agent_instances: # Process each agent to get their tools for agent_name, agent in self._agent_instances.items(): + if hasattr(agent, "_tools") and agent._tools: # Add each tool from this agent for tool in agent._tools: @@ -814,7 +770,7 @@ def _generate_args(self, objective: str) -> any: tools_list.append(tool_entry) - logging.debug(f"Generated {len(tools_list)} tools from agent instances") + logging.info(f"Generated {len(tools_list)} tools from agent instances") # If we couldn't extract tools from agent instances, create a simplified format if not tools_list: @@ -833,7 +789,7 @@ def _generate_args(self, objective: str) -> any: # Extract agent name if format is "Agent: AgentName" agent_name = agent_part.replace("Agent", "").strip() if not agent_name: - agent_name = "GenericAgent" + agent_name = AgentType.GENERIC.value tools_list.append( { diff --git a/src/backend/kernel_agents/procurement_agent.py b/src/backend/kernel_agents/procurement_agent.py index e84c578a1..9100425cd 100644 --- a/src/backend/kernel_agents/procurement_agent.py +++ b/src/backend/kernel_agents/procurement_agent.py @@ -7,9 +7,10 @@ from context.cosmos_memory_kernel import CosmosMemoryContext from models.messages_kernel import AgentType + class ProcurementAgent(BaseAgent): """Procurement agent implementation using Semantic Kernel. - + This agent specializes in purchasing, vendor management, supply chain operations, and inventory control. It can create purchase orders, manage vendors, track orders, and ensure efficient procurement processes. @@ -29,7 +30,7 @@ def __init__( definition=None, ) -> None: """Initialize the Procurement Agent. - + Args: kernel: The semantic kernel instance session_id: The current session identifier @@ -47,17 +48,17 @@ def __init__( # Load the procurement tools configuration config = self.load_tools_config("procurement", config_path) tools = self.get_tools_from_config(kernel, "procurement", config_path) - + # Use system message from config if not explicitly provided if not system_message: system_message = config.get( - "system_message", - "You are an AI Agent. You are able to assist with procurement enquiries and order items. If you need additional information from the human user asking the question in order to complete a request, ask before calling a function." + "system_message", + "You are an AI Agent. You are able to assist with procurement enquiries and order items. If you need additional information from the human user asking the question in order to complete a request, ask before calling a function.", ) - + # Use agent name from config if available agent_name = AgentType.PROCUREMENT.value - + super().__init__( agent_name=agent_name, kernel=kernel, @@ -67,5 +68,5 @@ def __init__( tools=tools, system_message=system_message, client=client, - definition=definition - ) \ No newline at end of file + definition=definition, + ) diff --git a/src/backend/kernel_agents/product_agent.py b/src/backend/kernel_agents/product_agent.py index c6b62916e..a107d0084 100644 --- a/src/backend/kernel_agents/product_agent.py +++ b/src/backend/kernel_agents/product_agent.py @@ -7,9 +7,10 @@ from context.cosmos_memory_kernel import CosmosMemoryContext from models.messages_kernel import AgentType + class ProductAgent(BaseAgent): """Product agent implementation using Semantic Kernel. - + This agent specializes in product management, development, and related tasks. It can provide information about products, manage inventory, handle product launches, analyze sales data, and coordinate with other teams like marketing @@ -30,7 +31,7 @@ def __init__( definition=None, ) -> None: """Initialize the Product Agent. - + Args: kernel: The semantic kernel instance session_id: The current session identifier @@ -48,17 +49,17 @@ def __init__( # Load the product tools configuration config = self.load_tools_config("product", config_path) tools = self.get_tools_from_config(kernel, "product", config_path) - + # Use system message from config if not explicitly provided if not system_message: system_message = config.get( - "system_message", - "You are a Product agent. You have knowledge about product management, development, and compliance guidelines. When asked to call a function, you should summarize back what was done." + "system_message", + "You are a Product agent. You have knowledge about product management, development, and compliance guidelines. When asked to call a function, you should summarize back what was done.", ) - + # Use agent name from config if available agent_name = AgentType.PRODUCT.value - + super().__init__( agent_name=agent_name, kernel=kernel, @@ -68,5 +69,5 @@ def __init__( tools=tools, system_message=system_message, client=client, - definition=definition - ) \ No newline at end of file + definition=definition, + ) diff --git a/src/backend/kernel_agents/tech_support_agent.py b/src/backend/kernel_agents/tech_support_agent.py index dd3cec031..2a69ec014 100644 --- a/src/backend/kernel_agents/tech_support_agent.py +++ b/src/backend/kernel_agents/tech_support_agent.py @@ -7,11 +7,12 @@ from context.cosmos_memory_kernel import CosmosMemoryContext from models.messages_kernel import AgentType + class TechSupportAgent(BaseAgent): """Tech Support agent implementation using Semantic Kernel. - + This agent specializes in IT troubleshooting, system administration, network issues, - software installation, and general technical support. It can help with setting up software, + software installation, and general technical support. It can help with setting up software, accounts, devices, and other IT-related tasks. """ @@ -29,7 +30,7 @@ def __init__( definition=None, ) -> None: """Initialize the Tech Support Agent. - + Args: kernel: The semantic kernel instance session_id: The current session identifier @@ -47,17 +48,17 @@ def __init__( # Load the tech support tools configuration config = self.load_tools_config("tech_support", config_path) tools = self.get_tools_from_config(kernel, "tech_support", config_path) - + # Use system message from config if not explicitly provided if not system_message: system_message = config.get( - "system_message", - "You are an AI Agent who is knowledgeable about Information Technology. You are able to help with setting up software, accounts, devices, and other IT-related tasks. If you need additional information from the human user asking the question in order to complete a request, ask before calling a function." + "system_message", + "You are an AI Agent who is knowledgeable about Information Technology. You are able to help with setting up software, accounts, devices, and other IT-related tasks. If you need additional information from the human user asking the question in order to complete a request, ask before calling a function.", ) - + # Use agent name from config if available agent_name = AgentType.TECH_SUPPORT.value - + super().__init__( agent_name=agent_name, kernel=kernel, @@ -67,5 +68,5 @@ def __init__( tools=tools, system_message=system_message, client=client, - definition=definition - ) \ No newline at end of file + definition=definition, + ) diff --git a/src/backend/models/messages.py b/src/backend/models/messages.py index ebfd83aa6..60453cb57 100644 --- a/src/backend/models/messages.py +++ b/src/backend/models/messages.py @@ -2,7 +2,13 @@ from enum import Enum from typing import Literal, Optional -from .messages_kernel import AgentType +from autogen_core.components.models import ( + AssistantMessage, + FunctionExecutionResultMessage, + LLMMessage, + SystemMessage, + UserMessage, +) from pydantic import BaseModel, Field @@ -17,15 +23,15 @@ class DataType(str, Enum): class BAgentType(str, Enum): """Enumeration of agent types.""" - AgentType.HUMAN.value = "HumanAgent" - AgentType.HR.value = "HrAgent" - AgentType.MARKETING.value = "MarketingAgent" - AgentType.PROCUREMENT.value = "ProcurementAgent" - AgentType.PRODUCT.value = "ProductAgent" - AgentType.GENERIC.value = "GenericAgent" - AgentType.TECH_SUPPORT.value = "TechSupportAgent" - AgentType.GROUP_CHAT_MANAGER.value = "GroupChatManager" - AgentType.PLANNER.value = "PlannerAgent" + human_agent = "HumanAgent" + hr_agent = "HrAgent" + marketing_agent = "MarketingAgent" + procurement_agent = "ProcurementAgent" + product_agent = "ProductAgent" + generic_agent = "GenericAgent" + tech_support_agent = "TechSupportAgent" + group_chat_manager = "GroupChatManager" + planner_agent = "PlannerAgent" # Add other agents as needed diff --git a/src/backend/models/messages_kernel.py b/src/backend/models/messages_kernel.py index c5ea55805..f9aaec79d 100644 --- a/src/backend/models/messages_kernel.py +++ b/src/backend/models/messages_kernel.py @@ -13,24 +13,27 @@ # Classes specifically for handling runtime interrupts class GetHumanInputMessage(KernelBaseModel): """Message requesting input from a human.""" + content: str - + + class GroupChatMessage(KernelBaseModel): """Message in a group chat.""" + body: Any source: str session_id: str target: str = "" id: str = Field(default_factory=lambda: str(uuid.uuid4())) - + def __str__(self): - content = self.body.content if hasattr(self.body, 'content') else str(self.body) + content = self.body.content if hasattr(self.body, "content") else str(self.body) return f"GroupChatMessage(source={self.source}, content={content})" class DataType(str, Enum): """Enumeration of possible data types for documents in the database.""" - + session = "session" plan = "plan" step = "step" @@ -39,23 +42,23 @@ class DataType(str, Enum): class AgentType(str, Enum): """Enumeration of agent types.""" - - HUMAN= "Human_Agent" + + HUMAN = "Human_Agent" HR = "Hr_Agent" MARKETING = "Marketing_Agent" PROCUREMENT = "Procurement_Agent" PRODUCT = "Product_Agent" GENERIC = "Generic_Agent" - TECH_SUPPORT= "Tech_Support_Agent" + TECH_SUPPORT = "Tech_Support_Agent" GROUP_CHAT_MANAGER = "Group_Chat_Manager" PLANNER = "Planner_Agent" - + # Add other agents as needed class StepStatus(str, Enum): """Enumeration of possible statuses for a step.""" - + planned = "planned" awaiting_feedback = "awaiting_feedback" approved = "approved" @@ -67,7 +70,7 @@ class StepStatus(str, Enum): class PlanStatus(str, Enum): """Enumeration of possible statuses for a plan.""" - + in_progress = "in_progress" completed = "completed" failed = "failed" @@ -75,7 +78,7 @@ class PlanStatus(str, Enum): class HumanFeedbackStatus(str, Enum): """Enumeration of human feedback statuses.""" - + requested = "requested" accepted = "accepted" rejected = "rejected" @@ -83,7 +86,7 @@ class HumanFeedbackStatus(str, Enum): class MessageRole(str, Enum): """Message roles compatible with Semantic Kernel.""" - + system = "system" user = "user" assistant = "assistant" @@ -92,7 +95,7 @@ class MessageRole(str, Enum): class BaseDataModel(KernelBaseModel): """Base data model with common fields.""" - + id: str = Field(default_factory=lambda: str(uuid.uuid4())) timestamp: Optional[datetime] = Field(default_factory=datetime.utcnow) @@ -100,23 +103,23 @@ class BaseDataModel(KernelBaseModel): # Basic message class for Semantic Kernel compatibility class ChatMessage(KernelBaseModel): """Base class for chat messages in Semantic Kernel format.""" - + role: MessageRole content: str metadata: Dict[str, Any] = Field(default_factory=dict) - + def to_semantic_kernel_dict(self) -> Dict[str, Any]: """Convert to format expected by Semantic Kernel.""" return { "role": self.role.value, "content": self.content, - "metadata": self.metadata + "metadata": self.metadata, } class StoredMessage(BaseDataModel): """Message stored in the database with additional metadata.""" - + data_type: Literal["message"] = Field("message", Literal=True) session_id: str user_id: str @@ -126,7 +129,7 @@ class StoredMessage(BaseDataModel): step_id: Optional[str] = None source: Optional[str] = None metadata: Dict[str, Any] = Field(default_factory=dict) - + def to_chat_message(self) -> ChatMessage: """Convert to ChatMessage format.""" return ChatMessage( @@ -139,14 +142,14 @@ def to_chat_message(self) -> ChatMessage: "session_id": self.session_id, "user_id": self.user_id, "message_id": self.id, - **self.metadata - } + **self.metadata, + }, ) class AgentMessage(BaseDataModel): """Base class for messages sent between agents.""" - + data_type: Literal["agent_message"] = Field("agent_message", Literal=True) session_id: str user_id: str @@ -158,7 +161,7 @@ class AgentMessage(BaseDataModel): class Session(BaseDataModel): """Represents a user session.""" - + data_type: Literal["session"] = Field("session", Literal=True) user_id: str current_status: str @@ -167,7 +170,7 @@ class Session(BaseDataModel): class Plan(BaseDataModel): """Represents a plan containing multiple steps.""" - + data_type: Literal["plan"] = Field("plan", Literal=True) session_id: str user_id: str @@ -181,7 +184,7 @@ class Plan(BaseDataModel): class Step(BaseDataModel): """Represents an individual step (task) within a plan.""" - + data_type: Literal["step"] = Field("step", Literal=True) plan_id: str session_id: str # Partition key @@ -197,7 +200,7 @@ class Step(BaseDataModel): class PlanWithSteps(Plan): """Plan model that includes the associated steps.""" - + steps: List[Step] = Field(default_factory=list) total_steps: int = 0 planned: int = 0 @@ -207,7 +210,7 @@ class PlanWithSteps(Plan): action_requested: int = 0 completed: int = 0 failed: int = 0 - + def update_step_counts(self): """Update the counts of steps by their status.""" status_counts = { @@ -219,10 +222,10 @@ def update_step_counts(self): StepStatus.completed: 0, StepStatus.failed: 0, } - + for step in self.steps: status_counts[step.status] += 1 - + self.total_steps = len(self.steps) self.planned = status_counts[StepStatus.planned] self.awaiting_feedback = status_counts[StepStatus.awaiting_feedback] @@ -231,7 +234,7 @@ def update_step_counts(self): self.action_requested = status_counts[StepStatus.action_requested] self.completed = status_counts[StepStatus.completed] self.failed = status_counts[StepStatus.failed] - + # Mark the plan as complete if the sum of completed and failed steps equals the total number of steps if self.completed + self.failed == self.total_steps: self.overall_status = PlanStatus.completed @@ -240,14 +243,14 @@ def update_step_counts(self): # Message classes for communication between agents class InputTask(KernelBaseModel): """Message representing the initial input task from the user.""" - + session_id: str description: str # Initial goal class ApprovalRequest(KernelBaseModel): """Message sent to HumanAgent to request approval for a step.""" - + step_id: str plan_id: str session_id: str @@ -258,7 +261,7 @@ class ApprovalRequest(KernelBaseModel): class HumanFeedback(KernelBaseModel): """Message containing human feedback on a step.""" - + step_id: Optional[str] = None plan_id: str session_id: str @@ -269,7 +272,7 @@ class HumanFeedback(KernelBaseModel): class HumanClarification(KernelBaseModel): """Message containing human clarification on a plan.""" - + plan_id: str session_id: str human_clarification: str @@ -277,7 +280,7 @@ class HumanClarification(KernelBaseModel): class ActionRequest(KernelBaseModel): """Message sent to an agent to perform an action.""" - + step_id: str plan_id: str session_id: str @@ -287,7 +290,7 @@ class ActionRequest(KernelBaseModel): class ActionResponse(KernelBaseModel): """Message containing the response from an agent after performing an action.""" - + step_id: str plan_id: str session_id: str @@ -297,97 +300,105 @@ class ActionResponse(KernelBaseModel): class PlanStateUpdate(KernelBaseModel): """Optional message for updating the plan state.""" - + plan_id: str session_id: str overall_status: PlanStatus - + # Semantic Kernel chat message handler class SKChatHistory: """Helper class to work with Semantic Kernel chat history.""" - + def __init__(self, memory_store): """Initialize with a memory store.""" self.memory_store = memory_store - - async def add_system_message(self, session_id: str, user_id: str, content: str, **kwargs): + + async def add_system_message( + self, session_id: str, user_id: str, content: str, **kwargs + ): """Add a system message to the chat history.""" message = StoredMessage( session_id=session_id, user_id=user_id, role=MessageRole.system, content=content, - **kwargs + **kwargs, ) await self._store_message(message) return message - - async def add_user_message(self, session_id: str, user_id: str, content: str, **kwargs): + + async def add_user_message( + self, session_id: str, user_id: str, content: str, **kwargs + ): """Add a user message to the chat history.""" message = StoredMessage( session_id=session_id, user_id=user_id, role=MessageRole.user, content=content, - **kwargs + **kwargs, ) await self._store_message(message) return message - - async def add_assistant_message(self, session_id: str, user_id: str, content: str, **kwargs): + + async def add_assistant_message( + self, session_id: str, user_id: str, content: str, **kwargs + ): """Add an assistant message to the chat history.""" message = StoredMessage( session_id=session_id, user_id=user_id, role=MessageRole.assistant, content=content, - **kwargs + **kwargs, ) await self._store_message(message) return message - - async def add_function_message(self, session_id: str, user_id: str, content: str, **kwargs): + + async def add_function_message( + self, session_id: str, user_id: str, content: str, **kwargs + ): """Add a function result message to the chat history.""" message = StoredMessage( session_id=session_id, user_id=user_id, role=MessageRole.function, content=content, - **kwargs + **kwargs, ) await self._store_message(message) return message - + async def _store_message(self, message: StoredMessage): """Store a message in the memory store.""" # Convert to dictionary for storage message_dict = message.model_dump() - + # Use memory store to save the message # This assumes your memory store has an upsert_async method that takes a collection name and data await self.memory_store.upsert_async( - f"message_{message.session_id}", - message_dict + f"message_{message.session_id}", message_dict ) - - async def get_chat_history(self, session_id: str, limit: int = 100) -> List[ChatMessage]: + + async def get_chat_history( + self, session_id: str, limit: int = 100 + ) -> List[ChatMessage]: """Retrieve chat history for a session.""" # Query messages from the memory store # This assumes your memory store has a method to query items messages = await self.memory_store.query_items( - f"message_{session_id}", - limit=limit + f"message_{session_id}", limit=limit ) - + # Convert to ChatMessage objects chat_messages = [] for msg_dict in messages: msg = StoredMessage.model_validate(msg_dict) chat_messages.append(msg.to_chat_message()) - + return chat_messages - + async def clear_history(self, session_id: str): """Clear chat history for a session.""" # This assumes your memory store has a method to delete a collection @@ -397,7 +408,8 @@ async def clear_history(self, session_id: str): # Define the expected structure of the LLM response class PlannerResponseStep(KernelBaseModel): action: str - agent: AgentType + agent: AgentType + class PlannerResponsePlan(KernelBaseModel): initial_goal: str @@ -405,34 +417,35 @@ class PlannerResponsePlan(KernelBaseModel): summary_plan_and_steps: str human_clarification_request: Optional[str] = None + # Helper class for Semantic Kernel function calling class SKFunctionRegistry: """Helper class to register and execute functions in Semantic Kernel.""" - + def __init__(self, kernel): """Initialize with a Semantic Kernel instance.""" self.kernel = kernel self.functions = {} - + def register_function(self, name: str, function_obj, description: str = None): """Register a function with the kernel.""" self.functions[name] = { "function": function_obj, - "description": description or "" + "description": description or "", } - + # Register with the kernel's function registry # The exact implementation depends on Semantic Kernel's API # This is a placeholder - adjust according to the actual SK API if hasattr(self.kernel, "register_function"): self.kernel.register_function(name, function_obj, description) - + async def execute_function(self, name: str, **kwargs): """Execute a registered function.""" if name not in self.functions: raise ValueError(f"Function {name} not registered") - + function_obj = self.functions[name]["function"] # Execute the function # This might vary based on SK's execution model - return await function_obj(**kwargs) \ No newline at end of file + return await function_obj(**kwargs) diff --git a/src/backend/utils_kernel.py b/src/backend/utils_kernel.py index dfe637a6a..ad0aa1bac 100644 --- a/src/backend/utils_kernel.py +++ b/src/backend/utils_kernel.py @@ -18,20 +18,22 @@ from models.messages_kernel import AgentType from kernel_agents.hr_agent import HrAgent -from kernel_agents.human_agent import HumanAgent +from kernel_agents.human_agent import HumanAgent from kernel_agents.marketing_agent import MarketingAgent from kernel_agents.generic_agent import GenericAgent from kernel_agents.tech_support_agent import TechSupportAgent from kernel_agents.procurement_agent import ProcurementAgent from kernel_agents.product_agent import ProductAgent -from kernel_agents.planner_agent import PlannerAgent +from kernel_agents.planner_agent import PlannerAgent from kernel_agents.group_chat_manager import GroupChatManager + logging.basicConfig(level=logging.INFO) # Cache for agent instances by session agent_instances: Dict[str, Dict[str, Any]] = {} azure_agent_instances: Dict[str, Dict[str, AzureAIAgent]] = {} + async def initialize_runtime_and_context( session_id: Optional[str] = None, user_id: str = None ) -> Tuple[sk.Kernel, CosmosMemoryContext]: @@ -46,41 +48,44 @@ async def initialize_runtime_and_context( Tuple containing the kernel and memory context """ if user_id is None: - raise ValueError("The 'user_id' parameter cannot be None. Please provide a valid user ID.") + raise ValueError( + "The 'user_id' parameter cannot be None. Please provide a valid user ID." + ) if session_id is None: session_id = str(uuid.uuid4()) - + # Create a kernel and memory store using the AppConfig instance kernel = config.create_kernel() memory_store = CosmosMemoryContext(session_id, user_id) - + return kernel, memory_store + async def get_agents(session_id: str, user_id: str) -> Dict[str, Any]: """ Get or create agent instances for a session. - + Args: session_id: The session identifier user_id: The user identifier - + Returns: Dictionary of agent instances mapped by their names """ cache_key = f"{session_id}_{user_id}" - + if cache_key in agent_instances: return agent_instances[cache_key] - + try: # Create all agents for this session using the factory raw_agents = await AgentFactory.create_all_agents( session_id=session_id, user_id=user_id, - temperature=0.0 # Default temperature + temperature=0.0, # Default temperature ) - + # Get mapping of agent types to class names agent_classes = { AgentType.HR: HrAgent.__name__, @@ -93,28 +98,31 @@ async def get_agents(session_id: str, user_id: str) -> Dict[str, Any]: AgentType.PLANNER: PlannerAgent.__name__, AgentType.GROUP_CHAT_MANAGER: GroupChatManager.__name__, } - + # Convert to the agent name dictionary format used by the rest of the app - agents = {agent_type.value: agent for agent_type, agent in raw_agents.items()} - + agents = { + agent_classes[agent_type]: agent for agent_type, agent in raw_agents.items() + } + # Cache the agents agent_instances[cache_key] = agents - + return agents except Exception as e: logging.error(f"Error creating agents: {str(e)}") raise + def load_tools_from_json_files() -> List[Dict[str, Any]]: """ Load tool definitions from JSON files in the tools directory. - + Returns: List of dictionaries containing tool information """ tools_dir = os.path.join(os.path.dirname(__file__), "tools") functions = [] - + try: if os.path.exists(tools_dir): for file in os.listdir(tools_dir): @@ -123,35 +131,40 @@ def load_tools_from_json_files() -> List[Dict[str, Any]]: try: with open(tool_path, "r") as f: tool_data = json.load(f) - + # Extract agent name from filename (e.g., hr_tools.json -> HR) agent_name = file.split("_")[0].capitalize() - + # Process each tool in the file for tool in tool_data.get("tools", []): try: - functions.append({ - "agent": agent_name, - "function": tool.get("name", ""), - "description": tool.get("description", ""), - "parameters": str(tool.get("parameters", {})) - }) + functions.append( + { + "agent": agent_name, + "function": tool.get("name", ""), + "description": tool.get("description", ""), + "parameters": str(tool.get("parameters", {})), + } + ) except Exception as e: - logging.warning(f"Error processing tool in {file}: {str(e)}") + logging.warning( + f"Error processing tool in {file}: {str(e)}" + ) except Exception as e: logging.error(f"Error loading tool file {file}: {str(e)}") except Exception as e: logging.error(f"Error reading tools directory: {str(e)}") - + return functions + async def rai_success(description: str) -> bool: """ Checks if a description passes the RAI (Responsible AI) check. - + Args: description: The text to check - + Returns: True if it passes, False otherwise """ @@ -161,16 +174,16 @@ async def rai_success(description: str) -> bool: access_token = credential.get_token( "https://cognitiveservices.azure.com/.default" ).token - + CHECK_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT") API_VERSION = os.getenv("AZURE_OPENAI_API_VERSION") DEPLOYMENT_NAME = os.getenv("AZURE_OPENAI_MODEL_NAME") - + if not all([CHECK_ENDPOINT, API_VERSION, DEPLOYMENT_NAME]): logging.error("Missing required environment variables for RAI check") # Default to allowing the operation if config is missing return True - + url = f"{CHECK_ENDPOINT}/openai/deployments/{DEPLOYMENT_NAME}/chat/completions?api-version={API_VERSION}" headers = { "Authorization": f"Bearer {access_token}", @@ -195,12 +208,12 @@ async def rai_success(description: str) -> bool: "top_p": 0.95, "max_tokens": 800, } - + # Send request response = requests.post(url, headers=headers, json=payload, timeout=30) if response.status_code == 400 or response.status_code == 200: response_json = response.json() - + if ( response_json.get("choices") and "message" in response_json["choices"][0] @@ -212,8 +225,8 @@ async def rai_success(description: str) -> bool: return False response.raise_for_status() # Raise exception for non-200 status codes including 400 but not content_filter return True - + except Exception as e: logging.error(f"Error in RAI check: {str(e)}") # Default to allowing the operation if RAI check fails - return True \ No newline at end of file + return True