diff --git a/src/backend/app_kernel.py b/src/backend/app_kernel.py index 04313507..dd38c9a0 100644 --- a/src/backend/app_kernel.py +++ b/src/backend/app_kernel.py @@ -35,20 +35,20 @@ ActionRequest, ActionResponse, ) -from utils_kernel import initialize_runtime_and_context, get_agents, retrieve_all_agent_tools, rai_success +from utils_kernel import initialize_runtime_and_context, get_agents, rai_success from event_utils import track_event_if_configured from models.agent_types import AgentType from kernel_agents.agent_factory import AgentFactory -# Check if the Application Insights Instrumentation Key is set in the environment variables -instrumentation_key = os.getenv("APPLICATIONINSIGHTS_INSTRUMENTATION_KEY") -if instrumentation_key: - # Configure Application Insights if the Instrumentation Key is found - configure_azure_monitor(connection_string=instrumentation_key) - logging.info("Application Insights configured with the provided Instrumentation Key") -else: - # Log a warning if the Instrumentation Key is not found - logging.warning("No Application Insights Instrumentation Key found. Skipping configuration") +# # Check if the Application Insights Instrumentation Key is set in the environment variables +# instrumentation_key = os.getenv("APPLICATIONINSIGHTS_INSTRUMENTATION_KEY") +# if instrumentation_key: +# # Configure Application Insights if the Instrumentation Key is found +# configure_azure_monitor(connection_string=instrumentation_key) +# logging.info("Application Insights configured with the provided Instrumentation Key") +# else: +# # Log a warning if the Instrumentation Key is not found +# logging.warning("No Application Insights Instrumentation Key found. Skipping configuration") # Configure logging logging.basicConfig(level=logging.INFO) @@ -59,10 +59,10 @@ ) logging.getLogger("azure.identity.aio._internal").setLevel(logging.WARNING) -# Suppress info logs from OpenTelemetry exporter -logging.getLogger("azure.monitor.opentelemetry.exporter.export._base").setLevel( - logging.WARNING -) +# # Suppress info logs from OpenTelemetry exporter +# logging.getLogger("azure.monitor.opentelemetry.exporter.export._base").setLevel( +# logging.WARNING +# ) # Initialize the FastAPI app app = FastAPI() @@ -132,9 +132,10 @@ async def input_task_endpoint(input_task: InputTask, request: Request): input_task_data["user_id"] = user_id input_task_json = json.dumps(input_task_data) + logging.info(f"Input task: {input_task}") # Use the planner to handle the task result = await planner_agent.handle_input_task( - KernelArguments(input_task_json=input_task_json) + input_task ) print(f"Result: {result}") @@ -819,7 +820,7 @@ async def get_agent_tools(): type: string description: Arguments required by the tool function """ - return retrieve_all_agent_tools() + return [] # Initialize the application when it starts diff --git a/src/backend/config_kernel.py b/src/backend/config_kernel.py index f65c0a4f..107dbee5 100644 --- a/src/backend/config_kernel.py +++ b/src/backend/config_kernel.py @@ -3,7 +3,13 @@ import logging import semantic_kernel as sk from semantic_kernel.kernel import Kernel -from semantic_kernel.contents import ChatHistory +# Updated imports for compatibility +try: + # Try newer structure + from semantic_kernel.contents import ChatHistory +except ImportError: + # Fall back to older structure for compatibility + from semantic_kernel.connectors.ai.chat_completion_client import ChatHistory from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent # Import AppConfig from app_config @@ -54,26 +60,3 @@ def CreateKernel(): def GetAIProjectClient(): """Get an AIProjectClient using the AppConfig implementation.""" return config.get_ai_project_client() - - @staticmethod - async def CreateAzureAIAgent( - kernel: Kernel, - agent_name: str, - instructions: str, - agent_type: str = "assistant", - tools=None, - tool_resources=None, - response_format=None, - temperature: float = 0.0 - ): - """Creates a new Azure AI Agent using the AppConfig implementation.""" - return await config.create_azure_ai_agent( - kernel=kernel, - agent_name=agent_name, - instructions=instructions, - agent_type=agent_type, - tools=tools, - tool_resources=tool_resources, - response_format=response_format, - temperature=temperature - ) \ No newline at end of file diff --git a/src/backend/kernel_agents/agent_base.py b/src/backend/kernel_agents/agent_base.py index a0381779..0d6352d7 100644 --- a/src/backend/kernel_agents/agent_base.py +++ b/src/backend/kernel_agents/agent_base.py @@ -9,6 +9,24 @@ from semantic_kernel.functions.kernel_arguments import KernelArguments from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent +# Updated imports for compatibility +try: + # Try importing from newer structure first + from semantic_kernel.contents import ChatMessageContent, ChatHistory +except ImportError: + # Fall back to older structure for compatibility + class ChatMessageContent: + """Compatibility class for older SK versions.""" + def __init__(self, role="", content="", name=None): + self.role = role + self.content = content + self.name = name + + class ChatHistory: + """Compatibility class for older SK versions.""" + def __init__(self): + self.messages = [] + from context.cosmos_memory_kernel import CosmosMemoryContext from models.messages_kernel import ( ActionRequest, @@ -64,6 +82,7 @@ def __init__( else: tools = tools or [] system_message = system_message or self._default_system_message(agent_name) + # Call AzureAIAgent constructor with required client and definition super().__init__( kernel=kernel, @@ -76,6 +95,8 @@ def __init__( client=client, definition=definition ) + + # Store instance variables self._agent_name = agent_name self._kernel = kernel self._session_id = session_id @@ -84,8 +105,14 @@ def __init__( self._tools = tools self._system_message = system_message self._chat_history = [{"role": "system", "content": self._system_message}] + self._agent = None # Will be initialized in async_init + + # Required properties for AgentGroupChat compatibility + self.name = agent_name # This is crucial for AgentGroupChat to identify agents + # Log initialization logging.info(f"Initialized {agent_name} with {len(self._tools)} tools") + # Register the handler functions self._register_functions() @@ -107,6 +134,53 @@ async def async_init(self): # Tools are registered with the kernel via get_tools_from_config return self + async def invoke_async(self, *args, **kwargs): + """Invoke this agent asynchronously. + + This method is required for compatibility with AgentGroupChat. + + Args: + *args: Positional arguments + **kwargs: Keyword arguments + + Returns: + The agent's response + """ + # Ensure agent is initialized + if self._agent is None: + await self.async_init() + + # Get the text input from args or kwargs + text = None + if args and isinstance(args[0], str): + text = args[0] + elif "text" in kwargs: + text = kwargs["text"] + elif "arguments" in kwargs and hasattr(kwargs["arguments"], "get"): + text = kwargs["arguments"].get("text") or kwargs["arguments"].get("input") + + if not text: + settings = kwargs.get("settings", {}) + if isinstance(settings, dict) and "input" in settings: + text = settings["input"] + + # If text is still not found, create a default message + if not text: + text = "Hello, please assist with a task." + + # Use the text to invoke the agent + try: + logging.info(f"Invoking {self._agent_name} with text: {text[:100]}...") + response = await self._agent.invoke( + self._kernel, + text, + settings=kwargs.get("settings", {}) + ) + return response + except Exception as e: + logging.error(f"Error invoking {self._agent_name}: {e}") + return f"Error: {str(e)}" + def _register_functions(self): """Register this agent's functions with the kernel.""" # Use the kernel function decorator approach instead of from_native_method @@ -126,6 +200,37 @@ async def handle_action_request_wrapper(*args, **kwargs): kernel_func = KernelFunction.from_method(handle_action_request_wrapper) # Use agent name as plugin for handler self._kernel.add_function(self._agent_name, kernel_func) + + # Required method for AgentGroupChat compatibility + async def send_message_async(self, message_content: ChatMessageContent, chat_history: ChatHistory): + """Send a message to the agent asynchronously, adding it to chat history. + + Args: + message_content: The content of the message + chat_history: The chat history + + Returns: + None + """ + # Convert message to format expected by the agent + if hasattr(message_content, "role") and hasattr(message_content, "content"): + self._chat_history.append({ + "role": message_content.role, + "content": message_content.content + }) + + # If chat history is provided, update our internal history + if chat_history and hasattr(chat_history, "messages"): + # Update with the latest messages from chat history + for msg in chat_history.messages[-5:]: # Only use last 5 messages to avoid history getting too long + if msg not in self._chat_history: + self._chat_history.append({ + "role": msg.role, + "content": msg.content + }) + + # No need to return anything as we're just updating state + return None async def handle_action_request(self, action_request_json: str) -> str: """Handle an action request from another agent or the system. diff --git a/src/backend/kernel_agents/agent_factory.py b/src/backend/kernel_agents/agent_factory.py index 574385c8..8e540aad 100644 --- a/src/backend/kernel_agents/agent_factory.py +++ b/src/backend/kernel_agents/agent_factory.py @@ -23,10 +23,14 @@ from kernel_agents.product_agent import ProductAgent from kernel_agents.planner_agent import PlannerAgent # Add PlannerAgent import from kernel_agents.group_chat_manager import GroupChatManager - +from semantic_kernel.prompt_template.prompt_template_config import PromptTemplateConfig from context.cosmos_memory_kernel import CosmosMemoryContext +from models.messages_kernel import PlannerResponsePlan - +from azure.ai.projects.models import ( + ResponseFormatJsonSchema, + ResponseFormatJsonSchemaType, +) logger = logging.getLogger(__name__) @@ -108,6 +112,7 @@ async def create_agent( user_id: str, temperature: float = 0.0, system_message: Optional[str] = None, + response_format: Optional[Any] = None, **kwargs ) -> BaseAgent: """Create an agent of the specified type. @@ -174,7 +179,7 @@ async def create_agent( name=agent_type_str, instructions=system_message, temperature=temperature, - response_format=None # Add response_format if required + response_format=response_format # Add response_format if required ) logger.info(f"Successfully created agent definition for {agent_type_str}") except Exception as agent_exc: @@ -224,57 +229,6 @@ async def create_agent( return agent - @classmethod - async def create_azure_ai_agent( - cls, - agent_name: str, - session_id: str, - system_prompt: str, - tools: List[KernelFunction] = None - ) -> AzureAIAgent: - """Create an Azure AI Agent. - - Args: - agent_name: The name of the agent - session_id: The session ID - system_prompt: The system prompt for the agent - tools: Optional list of tools for the agent - - Returns: - An Azure AI Agent instance - """ - # Check if we already have an agent in the cache - cache_key = f"{session_id}_{agent_name}" - if session_id in cls._azure_ai_agent_cache and cache_key in cls._azure_ai_agent_cache[session_id]: - # If tools are provided, make sure they are registered with the cached agent - agent = cls._azure_ai_agent_cache[session_id][cache_key] - if tools: - for tool in tools: - agent.add_function(tool) - return agent - - # Create a kernel using the AppConfig instance - kernel = config.create_kernel() - - # Await creation since create_azure_ai_agent is async - agent = await config.create_azure_ai_agent( - kernel=kernel, - agent_name=agent_name, - instructions=system_prompt - ) - - # Register tools if provided - if tools: - for tool in tools: - agent.add_function(tool) - - # Cache the agent instance - if session_id not in cls._azure_ai_agent_cache: - cls._azure_ai_agent_cache[session_id] = {} - cls._azure_ai_agent_cache[session_id][cache_key] = agent - - return agent - @classmethod async def _load_tools_for_agent(cls, kernel: Kernel, agent_type: str) -> List[KernelFunction]: """Load tools for an agent from the tools directory. @@ -310,7 +264,7 @@ async def _load_tools_for_agent(cls, kernel: Kernel, agent_type: str) -> List[Ke # For other agent types, try to create a simple fallback tool try: # Use PromptTemplateConfig to create a simple tool - from semantic_kernel.prompt_template.prompt_template_config import PromptTemplateConfig + # Simple minimal prompt prompt = f"""You are a helpful assistant specialized in {agent_type} tasks. @@ -398,7 +352,14 @@ async def create_all_agents( session_id=session_id, user_id=user_id, temperature=temperature, - agent_instances=agent_instances # Pass agent instances to the planner + agent_instances=agent_instances, # Pass agent instances to the planner + response_format=ResponseFormatJsonSchemaType( + json_schema=ResponseFormatJsonSchema( + name=PlannerResponsePlan.__name__, + description=f"respond with {PlannerResponsePlan.__name__.lower()}", + schema=PlannerResponsePlan.model_json_schema(), + ) + ) ) agents[planner_agent_type] = planner_agent diff --git a/src/backend/kernel_agents/group_chat_manager.py b/src/backend/kernel_agents/group_chat_manager.py index 03f77503..ea15bb08 100644 --- a/src/backend/kernel_agents/group_chat_manager.py +++ b/src/backend/kernel_agents/group_chat_manager.py @@ -5,11 +5,30 @@ from typing import Dict, List, Optional, Any, Tuple import semantic_kernel as sk -from semantic_kernel.agents import AgentGroupChat +from semantic_kernel.functions.kernel_arguments import KernelArguments +from semantic_kernel.agents import AgentGroupChat # pylint: disable=E0611 + from semantic_kernel.agents.strategies import ( SequentialSelectionStrategy, TerminationStrategy, ) +# Updated imports for compatibility +try: + # Try importing from newer structure first + from semantic_kernel.contents import ChatMessageContent, ChatHistory +except ImportError: + # Fall back to older structure for compatibility + class ChatMessageContent: + """Compatibility class for older SK versions.""" + def __init__(self, role="", content="", name=None): + self.role = role + self.content = content + self.name = name + + class ChatHistory: + """Compatibility class for older SK versions.""" + def __init__(self): + self.messages = [] from kernel_agents.agent_base import BaseAgent from context.cosmos_memory_kernel import CosmosMemoryContext @@ -28,6 +47,27 @@ from event_utils import track_event_if_configured +class GroupChatManagerClass: + """A class for service compatibility with Semantic Kernel.""" + # Defining properties needed by Semantic Kernel + service_id = "" + + def __init__(self, manager): + self.manager = manager + self.service_id = f"group_chat_manager_{manager._session_id}" + + async def execute_next_step(self, kernel_arguments: KernelArguments) -> str: + """Execute the next step in the plan. + + Args: + kernel_arguments: KernelArguments that should contain session_id and plan_id + + Returns: + Status message + """ + return await self.manager.execute_next_step(kernel_arguments) + + class GroupChatManager: """Group Chat Manager implementation using Semantic Kernel's AgentGroupChat. @@ -64,6 +104,27 @@ def __init__( # Initialize the AgentGroupChat later when all agents are registered self._agent_group_chat = None self._initialized = False + + # Create a wrapper class for service registration + service_wrapper = GroupChatManagerClass(self) + + try: + # Register with kernel using the service_wrapper + if hasattr(kernel, "register_services"): + kernel.register_services({service_wrapper.service_id: service_wrapper}) + logging.info(f"Registered GroupChatManager as kernel service with ID: {service_wrapper.service_id}") + elif hasattr(kernel, "services") and hasattr(kernel.services, "register_service"): + kernel.services.register_service(service_wrapper.service_id, service_wrapper) + logging.info(f"Registered GroupChatManager as kernel service with ID: {service_wrapper.service_id}") + elif hasattr(kernel, "services") and isinstance(kernel.services, dict): + # Last resort: directly add to services dictionary + kernel.services[service_wrapper.service_id] = service_wrapper + logging.info(f"Added GroupChatManager to kernel services dictionary with ID: {service_wrapper.service_id}") + else: + logging.warning("Could not register GroupChatManager service. Semantic Kernel version might be incompatible.") + except Exception as e: + logging.error(f"Error registering GroupChatManager service: {e}") + # Continue without crashing async def initialize_group_chat(self) -> None: """Initialize the AgentGroupChat with registered agents and strategies.""" @@ -103,42 +164,46 @@ async def select_agent(self, agents, history): Args: agents: List of available agents - history: Chat history + history: Chat history (ChatHistory object) Returns: The next agent to take the turn """ # If no history, start with the PlannerAgent - if not history: + if not history or not history.messages: return next((agent for agent in agents if agent.name == "PlannerAgent"), None) # Get the last message - last_message = history[-1] + last_message = history.messages[-1] - match last_message.name: - case "PlannerAgent": - # After the planner creates a plan, HumanAgent should review it - return next((agent for agent in agents if agent.name == "HumanAgent"), None) - - case "HumanAgent": - # After human feedback, the specific agent for the step should proceed - # Need to extract which agent should be next from the plan - # For demo purposes, going with a simple approach - # In a real implementation, we would look up the next step in the plan - return next((agent for agent in agents if agent.name == "GenericAgent"), None) - - case "GroupChatManager": - # If the manager just assigned a step, the specific agent should execute it - # For demo purposes, we'll just use the next agent in a simple rotation - current_agent_index = next((i for i, agent in enumerate(agents) - if agent.name == last_message.name), 0) - next_index = (current_agent_index + 1) % len(agents) + # Extract name from the message - in SK ChatMessageContent + last_sender = last_message.role + if hasattr(last_message, 'name') and last_message.name: + last_sender = last_message.name + + # Route based on the last sender + if last_sender == "PlannerAgent": + # After the planner creates a plan, HumanAgent should review it + return next((agent for agent in agents if agent.name == "HumanAgent"), None) + elif last_sender == "HumanAgent": + # After human feedback, the specific agent for the step should proceed + # For simplicity, use GenericAgent as fallback + return next((agent for agent in agents if agent.name == "GenericAgent"), None) + elif last_sender == "GroupChatManager": + # If the manager just assigned a step, find the agent that should execute it + # For simplicity, just rotate to the next agent + agent_names = [agent.name for agent in agents] + try: + current_index = agent_names.index(last_sender) + next_index = (current_index + 1) % len(agents) return agents[next_index] - - case _: - # Default to the Group Chat Manager to coordinate next steps - return next((agent for agent in agents if agent.name == "GroupChatManager"), None) - + except ValueError: + return agents[0] if agents else None + else: + # Default to the Group Chat Manager + return next((agent for agent in agents if agent.name == "GroupChatManager"), + agents[0] if agents else None) + class PlanTerminationStrategy(TerminationStrategy): """Strategy for determining when the agent group chat should terminate. @@ -154,23 +219,29 @@ def __init__(self, agents, maximum_iterations=10, automatic_reset=True): maximum_iterations: Maximum number of iterations before termination automatic_reset: Whether to reset the agent after termination """ - super().__init__(agents, maximum_iterations, automatic_reset) + super().__init__(maximum_iterations, automatic_reset) + self._agents = agents - async def should_agent_terminate(self, agent, history): - """Check if the agent should terminate. + async def should_terminate(self, history, agents=None) -> bool: + """Check if the chat should terminate. Args: - agent: The current agent - history: Chat history + history: Chat history as a ChatHistory object + agents: List of agents (optional, uses self._agents if not provided) Returns: - True if the agent should terminate, False otherwise + True if the chat should terminate, False otherwise """ - # Default termination conditions - if not history: + # Default termination conditions from parent class + if await super().should_terminate(history, agents or self._agents): + return True + + # If no history, continue the chat + if not history or not history.messages: return False - last_message = history[-1] + # Get the last message + last_message = history.messages[-1] # End the chat if the plan is completed or if human intervention is required if "plan completed" in last_message.content.lower(): @@ -469,47 +540,90 @@ async def run_group_chat(self, user_input: str, plan_id: str = "", step_id: str await self.initialize_group_chat() try: - # Run the group chat - chat_result = await self._agent_group_chat.invoke_async(user_input) + # Run the group chat with Semantic Kernel + result = await self._agent_group_chat.invoke_async(user_input) + + # Process the result which could be a ChatHistory object or something else + if hasattr(result, "messages") and result.messages: + messages = result.messages + elif hasattr(result, "value") and isinstance(result.value, list): + messages = result.value + else: + # Fallback for other formats + messages = [] + if isinstance(result, str): + # If it's just a string response + logging.debug(f"Group chat returned a string: {result[:100]}...") + await self._memory_store.add_item( + AgentMessage( + session_id=self._session_id, + user_id=self._user_id, + plan_id=plan_id, + content=result, + source="GroupChatManager", + step_id=step_id, + ) + ) + return result + + # Process the messages from the chat result + final_response = None - # Process and store results - messages = chat_result.value for msg in messages: # Skip the initial user message - if msg.role == "user" and msg.content == user_input: + if hasattr(msg, "role") and msg.role == "user" and msg.content == user_input: continue - - # Store agent messages in the memory + + # Determine the source/agent name + source = "assistant" + if hasattr(msg, "name") and msg.name: + source = msg.name + elif hasattr(msg, "role") and msg.role: + source = msg.role + + # Get the message content + content = msg.content if hasattr(msg, "content") else str(msg) + + # Store the message in memory await self._memory_store.add_item( AgentMessage( session_id=self._session_id, user_id=self._user_id, plan_id=plan_id, - content=msg.content, - source=msg.name if hasattr(msg, "name") else msg.role, + content=content, + source=source, step_id=step_id, ) ) + + # Keep track of the final response + final_response = content # Return the final message from the chat - if messages: - return messages[-1].content + if final_response: + return final_response return "Group chat completed with no messages." except Exception as e: - logging.error(f"Error running group chat: {str(e)}") + logging.exception(f"Error running group chat: {e}") return f"Error running group chat: {str(e)}" - async def execute_next_step(self, session_id: str, plan_id: str) -> str: + async def execute_next_step(self, kernel_arguments: KernelArguments) -> str: """Execute the next step in the plan. Args: - session_id: The session identifier - plan_id: The plan identifier + kernel_arguments: KernelArguments that should contain session_id and plan_id Returns: Status message """ + # Extract arguments + session_id = kernel_arguments.get("session_id", "") + plan_id = kernel_arguments.get("plan_id", "") + + if not session_id or not plan_id: + return "Missing session_id or plan_id in arguments" + # Get all steps for the plan steps = await self._memory_store.get_steps_for_plan(plan_id, session_id) diff --git a/src/backend/kernel_agents/planner_agent.py b/src/backend/kernel_agents/planner_agent.py index 12d76016..fd41dfb0 100644 --- a/src/backend/kernel_agents/planner_agent.py +++ b/src/backend/kernel_agents/planner_agent.py @@ -8,7 +8,6 @@ import semantic_kernel as sk from semantic_kernel.functions import KernelFunction from semantic_kernel.functions.kernel_arguments import KernelArguments -from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent from kernel_agents.agent_base import BaseAgent from context.cosmos_memory_kernel import CosmosMemoryContext @@ -16,6 +15,7 @@ AgentMessage, InputTask, Plan, + PlannerResponsePlan, Step, StepStatus, PlanStatus, @@ -24,16 +24,6 @@ from event_utils import track_event_if_configured from app_config import config -# Define structured output models -class StructuredOutputStep(BaseModel): - action: str = Field(description="Detailed description of the step action") - agent: str = Field(description="Name of the agent to execute this step") - -class StructuredOutputPlan(BaseModel): - initial_goal: str = Field(description="The goal of the plan") - steps: List[StructuredOutputStep] = Field(description="List of steps to achieve the goal") - summary_plan_and_steps: str = Field(description="Brief summary of the plan and steps") - human_clarification_request: Optional[str] = Field(None, description="Any additional information needed from the human") class PlannerAgent(BaseAgent): """Planner agent implementation using Semantic Kernel. @@ -126,7 +116,7 @@ async def async_init(self) -> None: logging.error(f"Failed to create Azure AI Agent for PlannerAgent: {e}") raise - async def handle_input_task(self, kernel_arguments: KernelArguments) -> str: + async def handle_input_task(self, input_task: InputTask) -> str: """Handle the initial input task from the user. Args: @@ -136,15 +126,19 @@ async def handle_input_task(self, kernel_arguments: KernelArguments) -> str: Status message """ # Parse the input task - input_task_json = kernel_arguments["input_task_json"] - input_task = InputTask.parse_raw(input_task_json) + logging.info("Handling input task") + + logging.info(f"Parsed input task: {input_task}") # Generate a structured plan with steps + + logging.info(f"Received input task: {input_task.description}") + logging.info(f"Session ID: {input_task.session_id}, User ID: {self._user_id}") plan, steps = await self._create_structured_plan(input_task) - print(f"Plan created: {plan}") + logging.info(f"Plan created: {plan}") + logging.info(f"Steps created: {steps}") - print(f"Steps created: {steps}") if steps: @@ -274,8 +268,13 @@ async def _create_structured_plan(self, input_task: InputTask) -> Tuple[Plan, Li """ try: # Generate the instruction for the LLM + logging.info("Generating instruction for the LLM") + logging.debug(f"Input: {input_task}") + logging.debug(f"Available agents: {self._available_agents}") + instruction = self._generate_instruction(input_task.description) + logging.info(f"Generated instruction: {instruction}") # Log the input task for debugging logging.info(f"Creating plan for task: '{input_task.description}'") logging.info(f"Using available agents: {self._available_agents}") @@ -295,17 +294,17 @@ async def _create_structured_plan(self, input_task: InputTask) -> Tuple[Plan, Li kernel_args = KernelArguments() kernel_args["input"] = f"TASK: {input_task.description}\n\n{instruction}" - print(f"Kernel arguments: {kernel_args}") + logging.debug(f"Kernel arguments: {kernel_args}") # Call invoke with proper keyword arguments response_content = "" - # Use keyword arguments instead of positional arguments - # Set a lower temperature to ensure consistent results + # Ensure we're using the right pattern for Azure AI agents with semantic kernel + # Properly handle async generation async_generator = self._azure_ai_agent.invoke( arguments=kernel_args, settings={ - "temperature": 0.0 + "temperature": 0.0, # Keep temperature low for consistent planning } ) @@ -314,13 +313,8 @@ async def _create_structured_plan(self, input_task: InputTask) -> Tuple[Plan, Li if chunk is not None: response_content += str(chunk) - print(f"Response content: {response_content}") - - # Debug the response - logging.info(f"Response content length: {len(response_content)}") - logging.debug(f"Response content first 500 chars: {response_content[:500]}") - # Log more of the response for debugging - logging.info(f"Full response: {response_content}") + + logging.info(f"Response content: {response_content}") # Check if response is empty or whitespace if not response_content or response_content.isspace(): @@ -330,7 +324,7 @@ async def _create_structured_plan(self, input_task: InputTask) -> Tuple[Plan, Li try: # First try to parse using Pydantic model try: - parsed_result = StructuredOutputPlan.parse_raw(response_content) + parsed_result = PlannerResponsePlan.parse_raw(response_content) except Exception as e1: logging.warning(f"Failed to parse direct JSON with Pydantic: {str(e1)}") @@ -340,12 +334,12 @@ async def _create_structured_plan(self, input_task: InputTask) -> Tuple[Plan, Li json_content = json_match.group(1) logging.info(f"Found JSON content in markdown code block, length: {len(json_content)}") try: - parsed_result = StructuredOutputPlan.parse_raw(json_content) + parsed_result = PlannerResponsePlan.parse_raw(json_content) except Exception as e2: logging.warning(f"Failed to parse extracted JSON with Pydantic: {str(e2)}") # Try conventional JSON parsing as fallback json_data = json.loads(json_content) - parsed_result = StructuredOutputPlan.parse_obj(json_data) + parsed_result = PlannerResponsePlan.parse_obj(json_data) else: # Try to extract JSON without code blocks - maybe it's embedded in text # Look for patterns like { ... } that contain "initial_goal" and "steps" @@ -357,12 +351,12 @@ async def _create_structured_plan(self, input_task: InputTask) -> Tuple[Plan, Li logging.info(f"Found potential JSON in text, length: {len(potential_json)}") try: json_data = json.loads(potential_json) - parsed_result = StructuredOutputPlan.parse_obj(json_data) + parsed_result = PlannerResponsePlan.parse_obj(json_data) except Exception as e3: logging.warning(f"Failed to parse potential JSON: {str(e3)}") # If all extraction attempts fail, try parsing the whole response as JSON json_data = json.loads(response_content) - parsed_result = StructuredOutputPlan.parse_obj(json_data) + parsed_result = PlannerResponsePlan.parse_obj(json_data) else: # If we can't find JSON patterns, create a fallback plan from the text logging.info("Using fallback plan creation from text response") @@ -680,23 +674,5 @@ def _generate_instruction(self, objective: str) -> str: Limit the plan to 6 steps or less. Choose from {agents_str} ONLY for planning your steps. - - When generating the action in the plan, frame the action as an instruction you are passing to the agent to execute. It should be a short, single sentence. Include the function to use. For example, "Set up an Office 365 Account for Jessica Smith. Function: set_up_office_365_account" - - Ensure the summary of the plan and the overall steps is less than 50 words. - Identify any additional information that might be required to complete the task. Include this information in the plan in the human_clarification_request field of the plan. If it is not required, leave it as null. Do not include information that you are waiting for clarification on in the string of the action field, as this otherwise won't get updated. - - Return your response as a JSON object with the following structure: - {{ - "initial_goal": "The goal of the plan", - "steps": [ - {{ - "action": "Detailed description of the step action", - "agent": "AgentName" - }} - ], - "summary_plan_and_steps": "Brief summary of the plan and steps", - "human_clarification_request": "Any additional information needed from the human" - }} """ \ No newline at end of file diff --git a/src/backend/models/messages_kernel.py b/src/backend/models/messages_kernel.py index 69965fb9..0da1bb7d 100644 --- a/src/backend/models/messages_kernel.py +++ b/src/backend/models/messages_kernel.py @@ -393,6 +393,18 @@ async def clear_history(self, session_id: str): # This assumes your memory store has a method to delete a collection await self.memory_store.delete_collection_async(f"message_{session_id}") + +# Define the expected structure of the LLM response +class PlannerResponseStep(KernelBaseModel): + action: str + agent: AgentType + +class PlannerResponsePlan(KernelBaseModel): + initial_goal: str + steps: List[PlannerResponseStep] + summary_plan_and_steps: str + human_clarification_request: Optional[str] = None + # Helper class for Semantic Kernel function calling class SKFunctionRegistry: """Helper class to register and execute functions in Semantic Kernel.""" diff --git a/src/backend/utils_kernel.py b/src/backend/utils_kernel.py index d6874414..b1bbb517 100644 --- a/src/backend/utils_kernel.py +++ b/src/backend/utils_kernel.py @@ -96,109 +96,6 @@ async def get_agents(session_id: str, user_id: str) -> Dict[str, Any]: logging.error(f"Error creating agents: {str(e)}") raise -async def get_azure_ai_agent( - session_id: str, - agent_name: str, - system_prompt: str, - tools: List[KernelFunction] = None -) -> AzureAIAgent: - """ - Get or create an Azure AI Agent instance. - - Args: - session_id: The session identifier - agent_name: The name for the agent - system_prompt: The system prompt for the agent - tools: Optional list of tools for the agent - - Returns: - An Azure AI Agent instance - """ - cache_key = f"{session_id}_{agent_name}" - - if session_id in azure_agent_instances and cache_key in azure_agent_instances[session_id]: - agent = azure_agent_instances[session_id][cache_key] - # Add any new tools if provided - if tools: - for tool in tools: - agent.add_function(tool) - return agent - - try: - # Create the agent using the factory - agent = await AgentFactory.create_azure_ai_agent( - agent_name=agent_name, - session_id=session_id, - system_prompt=system_prompt, - tools=tools - ) - - # Cache the agent - if session_id not in azure_agent_instances: - azure_agent_instances[session_id] = {} - azure_agent_instances[session_id][cache_key] = agent - - return agent - except Exception as e: - logging.error(f"Error creating Azure AI Agent '{agent_name}': {str(e)}") - raise - -async def retrieve_all_agent_tools() -> List[Dict[str, Any]]: - """ - Retrieves all agent tools information. - - Returns: - List of dictionaries containing tool information - """ - functions = [] - - try: - # Create a temporary session for tool discovery - temp_session_id = "tools-discovery-session" - temp_user_id = "tools-discovery-user" - - # Create all agents for this session to extract their tools - agents = await get_agents(temp_session_id, temp_user_id) - - # Process each agent's tools - for agent_name, agent in agents.items(): - if not hasattr(agent, '_tools') or agent._tools is None: - continue - - # Make agent name more readable for display - display_name = agent_name.replace('Agent', '') - - # Extract tool information from the agent - for tool in agent._tools: - try: - # Extract parameters information - parameters_info = {} - if hasattr(tool, 'metadata') and tool.metadata.get('parameters'): - parameters_info = tool.metadata.get('parameters', {}) - - # Create tool info dictionary - tool_info = { - "agent": display_name, - "function": tool.name, - "description": tool.description if hasattr(tool, 'description') and tool.description else "", - "parameters": str(parameters_info) - } - functions.append(tool_info) - except Exception as e: - logging.warning(f"Error extracting tool information from {agent_name}.{tool.name}: {str(e)}") - - # Clean up cache - cache_key = f"{temp_session_id}_{temp_user_id}" - if cache_key in agent_instances: - del agent_instances[cache_key] - - except Exception as e: - logging.error(f"Error retrieving agent tools: {str(e)}") - # Fallback to loading tool information from JSON files - functions = load_tools_from_json_files() - - return functions - def load_tools_from_json_files() -> List[Dict[str, Any]]: """ Load tool definitions from JSON files in the tools directory.