diff --git a/src/backend/kernel_agents/planner_agent.py b/src/backend/kernel_agents/planner_agent.py index e247cce88..45695d595 100644 --- a/src/backend/kernel_agents/planner_agent.py +++ b/src/backend/kernel_agents/planner_agent.py @@ -263,9 +263,16 @@ async def _create_structured_plan(self, input_task: InputTask) -> Tuple[Plan, Li """ try: # Generate the instruction for the LLM + logging.info("Generating instruction for the LLM") + logging.debug(f"Input: {input_task}") + logging.debug(f"Available agents: {self._available_agents}") instruction = self._generate_instruction(input_task.description) + logging.info(f"Generated instruction: {instruction}") + # Log the input task for debugging + logging.info(f"Creating plan for task: '{input_task.description}'") + logging.info(f"Using available agents: {self._available_agents}") # Use the Azure AI Agent instead of direct function invocation if self._azure_ai_agent is None: @@ -293,6 +300,7 @@ async def _create_structured_plan(self, input_task: InputTask) -> Tuple[Plan, Li arguments=kernel_args, settings={ "temperature": 0.0, # Keep temperature low for consistent planning + "max_tokens": 4096 # Ensure we have enough tokens for the full plan } ) @@ -301,267 +309,123 @@ async def _create_structured_plan(self, input_task: InputTask) -> Tuple[Plan, Li if chunk is not None: response_content += str(chunk) - logging.info(f"Response content: {response_content}") # Check if response is empty or whitespace if not response_content or response_content.isspace(): raise ValueError("Received empty response from Azure AI Agent") - # Parse the JSON response using the structured output model + # Parse the JSON response directly to PlannerResponsePlan + parsed_result = None + + # Try to parse the raw response first try: - # First try to parse using Pydantic model - try: - parsed_result = PlannerResponsePlan.parse_raw(response_content) - except Exception as e1: - logging.warning(f"Failed to parse direct JSON with Pydantic: {str(e1)}") - - # If direct parsing fails, try to extract JSON first - json_match = re.search(r'```json\s*(.*?)\s*```', response_content, re.DOTALL) - if json_match: - json_content = json_match.group(1) - logging.info(f"Found JSON content in markdown code block, length: {len(json_content)}") - try: - parsed_result = PlannerResponsePlan.parse_raw(json_content) - except Exception as e2: - logging.warning(f"Failed to parse extracted JSON with Pydantic: {str(e2)}") - # Try conventional JSON parsing as fallback - json_data = json.loads(json_content) - parsed_result = PlannerResponsePlan.parse_obj(json_data) - else: - # Try to extract JSON without code blocks - maybe it's embedded in text - # Look for patterns like { ... } that contain "initial_goal" and "steps" - json_pattern = r'\{.*?"initial_goal".*?"steps".*?\}' - alt_match = re.search(json_pattern, response_content, re.DOTALL) - - if alt_match: - potential_json = alt_match.group(0) - logging.info(f"Found potential JSON in text, length: {len(potential_json)}") - try: - json_data = json.loads(potential_json) - parsed_result = PlannerResponsePlan.parse_obj(json_data) - except Exception as e3: - logging.warning(f"Failed to parse potential JSON: {str(e3)}") - # If all extraction attempts fail, try parsing the whole response as JSON - json_data = json.loads(response_content) - parsed_result = PlannerResponsePlan.parse_obj(json_data) - else: - # If we can't find JSON patterns, create a fallback plan from the text - logging.info("Using fallback plan creation from text response") - return await self._create_fallback_plan_from_text(input_task, response_content) + parsed_result = PlannerResponsePlan.parse_raw(response_content) + except Exception as e: + logging.warning(f"Failed to parse raw response: {e}") - # Extract plan details and log for debugging - initial_goal = parsed_result.initial_goal - steps_data = parsed_result.steps - summary = parsed_result.summary_plan_and_steps - human_clarification_request = parsed_result.human_clarification_request + # Try to extract JSON from markdown code blocks + json_match = re.search(r'```(?:json)?\s*(.*?)\s*```', response_content, re.DOTALL) + if json_match: + json_content = json_match.group(1) + logging.info(f"Found JSON in code block, attempting to parse") + parsed_result = PlannerResponsePlan.parse_raw(json_content) + else: + # If still not parsed, raise the error to be handled by outer exception + raise ValueError(f"Failed to parse response as PlannerResponsePlan: {e}") + + # At this point, we have a valid parsed_result or an exception was raised + + # Extract plan details + initial_goal = parsed_result.initial_goal + steps_data = parsed_result.steps + summary = parsed_result.summary_plan_and_steps + human_clarification_request = parsed_result.human_clarification_request + + # Create the Plan instance + plan = Plan( + id=str(uuid.uuid4()), + session_id=input_task.session_id, + user_id=self._user_id, + initial_goal=initial_goal, + overall_status=PlanStatus.in_progress, + summary=summary, + human_clarification_request=human_clarification_request + ) + + # Store the plan + await self._memory_store.add_plan(plan) + + track_event_if_configured( + "Planner - Initial plan and added into the cosmos", + { + "session_id": input_task.session_id, + "user_id": self._user_id, + "initial_goal": initial_goal, + "overall_status": PlanStatus.in_progress, + "source": "PlannerAgent", + "summary": summary, + "human_clarification_request": human_clarification_request, + }, + ) + + # Create steps from the parsed data + steps = [] + for step_data in steps_data: + action = step_data.action + agent_name = step_data.agent - # Log potential mismatches between task and plan for debugging - if "onboard" in input_task.description.lower() and "marketing" in initial_goal.lower(): - logging.warning(f"Potential mismatch: Task was about onboarding but plan goal mentions marketing: {initial_goal}") - - # Log the steps and agent assignments for debugging - for i, step in enumerate(steps_data): - logging.info(f"Step {i+1} - Agent: {step.agent}, Action: {step.action}") + # Validate agent name + if agent_name not in self._available_agents: + logging.warning(f"Invalid agent name: {agent_name}, defaulting to GenericAgent") + agent_name = "GenericAgent" - # Create the Plan instance - plan = Plan( + # Create the step + step = Step( id=str(uuid.uuid4()), + plan_id=plan.id, session_id=input_task.session_id, user_id=self._user_id, - initial_goal=initial_goal, - overall_status=PlanStatus.in_progress, - summary=summary, - human_clarification_request=human_clarification_request + action=action, + agent=agent_name, + status=StepStatus.planned, + human_approval_status=HumanFeedbackStatus.requested ) - # Store the plan - await self._memory_store.add_plan(plan) + # Store the step + await self._memory_store.add_step(step) + steps.append(step) track_event_if_configured( - "Planner - Initial plan and added into the cosmos", + "Planner - Added planned individual step into the cosmos", { + "plan_id": plan.id, + "action": action, + "agent": agent_name, + "status": StepStatus.planned, "session_id": input_task.session_id, "user_id": self._user_id, - "initial_goal": initial_goal, - "overall_status": PlanStatus.in_progress, - "source": "PlannerAgent", - "summary": summary, - "human_clarification_request": human_clarification_request, + "human_approval_status": HumanFeedbackStatus.requested, }, ) - - # Create steps from the parsed data - steps = [] - for step_data in steps_data: - action = step_data.action - agent_name = step_data.agent - - # Log any unusual agent assignments for debugging - if "onboard" in input_task.description.lower() and agent_name != "HrAgent": - logging.warning(f"UNUSUAL AGENT ASSIGNMENT: Task contains 'onboard' but assigned to {agent_name} instead of HrAgent") - - # Validate agent name - if agent_name not in self._available_agents: - logging.warning(f"Invalid agent name: {agent_name}, defaulting to GenericAgent") - agent_name = "GenericAgent" - - # Create the step - step = Step( - id=str(uuid.uuid4()), - plan_id=plan.id, - session_id=input_task.session_id, - user_id=self._user_id, - action=action, - agent=agent_name, - status=StepStatus.planned, - human_approval_status=HumanFeedbackStatus.requested - ) - - # Store the step - await self._memory_store.add_step(step) - steps.append(step) - - track_event_if_configured( - "Planner - Added planned individual step into the cosmos", - { - "plan_id": plan.id, - "action": action, - "agent": agent_name, - "status": StepStatus.planned, - "session_id": input_task.session_id, - "user_id": self._user_id, - "human_approval_status": HumanFeedbackStatus.requested, - }, - ) - - return plan, steps - - except Exception as e: - # If JSON parsing fails, log error and create error plan - logging.exception(f"Failed to parse JSON response: {e}") - logging.info(f"Raw response was: {response_content[:1000]}...") - # Try a fallback approach - return await self._create_fallback_plan_from_text(input_task, response_content) + + return plan, steps except Exception as e: logging.exception(f"Error creating structured plan: {e}") track_event_if_configured( - f"Planner - Error in create_structured_plan: {e} into the cosmos", + f"Planner - Error in create_structured_plan: {e}", { "session_id": input_task.session_id, "user_id": self._user_id, - "initial_goal": "Error generating plan", - "overall_status": PlanStatus.failed, + "error": str(e), "source": "PlannerAgent", - "summary": f"Error generating plan: {e}", }, ) - # Create an error plan - error_plan = Plan( - id=str(uuid.uuid4()), - session_id=input_task.session_id, - user_id=self._user_id, - initial_goal="Error generating plan", - overall_status=PlanStatus.failed, - summary=f"Error generating plan: {str(e)}" - ) - - await self._memory_store.add_plan(error_plan) - return error_plan, [] - - async def _create_fallback_plan_from_text(self, input_task: InputTask, text_content: str) -> Tuple[Plan, List[Step]]: - """Create a plan from unstructured text when JSON parsing fails. - - Args: - input_task: The input task - text_content: The text content from the LLM - - Returns: - Tuple containing the created plan and list of steps - """ - logging.info("Creating fallback plan from text content") - - # Extract goal from the text (first line or use input task description) - goal_match = re.search(r"(?:Goal|Initial Goal|Plan):\s*(.+?)(?:\n|$)", text_content) - goal = goal_match.group(1).strip() if goal_match else input_task.description - - # Create the plan - plan = Plan( - id=str(uuid.uuid4()), - session_id=input_task.session_id, - user_id=self._user_id, - initial_goal=goal, - overall_status=PlanStatus.in_progress, - summary=f"Plan created from {input_task.description}" - ) - - # Store the plan - await self._memory_store.add_plan(plan) - - # Parse steps using regex - step_pattern = re.compile(r'(?:Step|)\s*(\d+)[:.]\s*\*?\*?(?:Agent|):\s*\*?([^:*\n]+)\*?[:\s]*(.+?)(?=(?:Step|)\s*\d+[:.]\s*|$)', re.DOTALL) - matches = step_pattern.findall(text_content) - - if not matches: - # Fallback to simpler pattern - step_pattern = re.compile(r'(\d+)[.:\)]\s*([^:]*?):\s*(.*?)(?=\d+[.:\)]|$)', re.DOTALL) - matches = step_pattern.findall(text_content) - - # If still no matches, look for bullet points or numbered lists - if not matches: - step_pattern = re.compile(r'[•\-*]\s*([^:]*?):\s*(.*?)(?=[•\-*]|$)', re.DOTALL) - bullet_matches = step_pattern.findall(text_content) - if bullet_matches: - # Convert bullet matches to our expected format (number, agent, action) - matches = [] - for i, (agent_text, action) in enumerate(bullet_matches, 1): - matches.append((str(i), agent_text.strip(), action.strip())) - - steps = [] - # If we found no steps at all, create at least one generic step - if not matches: - generic_step = Step( - id=str(uuid.uuid4()), - plan_id=plan.id, - session_id=input_task.session_id, - user_id=self._user_id, - action=f"Process the request: {input_task.description}", - agent="GenericAgent", - status=StepStatus.planned, - human_approval_status=HumanFeedbackStatus.requested - ) - await self._memory_store.add_step(generic_step) - steps.append(generic_step) - else: - for match in matches: - number = match[0].strip() - agent_text = match[1].strip() - action = match[2].strip() - - # Clean up agent name - agent = re.sub(r'\s+', '', agent_text) - if not agent or agent not in self._available_agents: - agent = "GenericAgent" # Default to GenericAgent if not recognized - - # Create and store the step - step = Step( - id=str(uuid.uuid4()), - plan_id=plan.id, - session_id=input_task.session_id, - user_id=self._user_id, - action=action, - agent=agent, - status=StepStatus.planned, - human_approval_status=HumanFeedbackStatus.requested - ) - - await self._memory_store.add_step(step) - steps.append(step) - - return plan, steps + # Re-raise the exception to be handled by the calling method + raise def _generate_instruction(self, objective: str) -> str: """Generate instruction for the LLM to create a plan.