Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
318 changes: 91 additions & 227 deletions src/backend/kernel_agents/planner_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,16 @@ async def _create_structured_plan(self, input_task: InputTask) -> Tuple[Plan, Li
"""
try:
# Generate the instruction for the LLM
logging.info("Generating instruction for the LLM")
logging.debug(f"Input: {input_task}")
logging.debug(f"Available agents: {self._available_agents}")

instruction = self._generate_instruction(input_task.description)

logging.info(f"Generated instruction: {instruction}")
# Log the input task for debugging
logging.info(f"Creating plan for task: '{input_task.description}'")
logging.info(f"Using available agents: {self._available_agents}")

# Use the Azure AI Agent instead of direct function invocation
if self._azure_ai_agent is None:
Expand Down Expand Up @@ -293,6 +300,7 @@ async def _create_structured_plan(self, input_task: InputTask) -> Tuple[Plan, Li
arguments=kernel_args,
settings={
"temperature": 0.0, # Keep temperature low for consistent planning
"max_tokens": 4096 # Ensure we have enough tokens for the full plan
}
)

Expand All @@ -301,267 +309,123 @@ async def _create_structured_plan(self, input_task: InputTask) -> Tuple[Plan, Li
if chunk is not None:
response_content += str(chunk)


logging.info(f"Response content: {response_content}")

# Check if response is empty or whitespace
if not response_content or response_content.isspace():
raise ValueError("Received empty response from Azure AI Agent")

# Parse the JSON response using the structured output model
# Parse the JSON response directly to PlannerResponsePlan
parsed_result = None

# Try to parse the raw response first
try:
# First try to parse using Pydantic model
try:
parsed_result = PlannerResponsePlan.parse_raw(response_content)
except Exception as e1:
logging.warning(f"Failed to parse direct JSON with Pydantic: {str(e1)}")

# If direct parsing fails, try to extract JSON first
json_match = re.search(r'```json\s*(.*?)\s*```', response_content, re.DOTALL)
if json_match:
json_content = json_match.group(1)
logging.info(f"Found JSON content in markdown code block, length: {len(json_content)}")
try:
parsed_result = PlannerResponsePlan.parse_raw(json_content)
except Exception as e2:
logging.warning(f"Failed to parse extracted JSON with Pydantic: {str(e2)}")
# Try conventional JSON parsing as fallback
json_data = json.loads(json_content)
parsed_result = PlannerResponsePlan.parse_obj(json_data)
else:
# Try to extract JSON without code blocks - maybe it's embedded in text
# Look for patterns like { ... } that contain "initial_goal" and "steps"
json_pattern = r'\{.*?"initial_goal".*?"steps".*?\}'
alt_match = re.search(json_pattern, response_content, re.DOTALL)

if alt_match:
potential_json = alt_match.group(0)
logging.info(f"Found potential JSON in text, length: {len(potential_json)}")
try:
json_data = json.loads(potential_json)
parsed_result = PlannerResponsePlan.parse_obj(json_data)
except Exception as e3:
logging.warning(f"Failed to parse potential JSON: {str(e3)}")
# If all extraction attempts fail, try parsing the whole response as JSON
json_data = json.loads(response_content)
parsed_result = PlannerResponsePlan.parse_obj(json_data)
else:
# If we can't find JSON patterns, create a fallback plan from the text
logging.info("Using fallback plan creation from text response")
return await self._create_fallback_plan_from_text(input_task, response_content)
parsed_result = PlannerResponsePlan.parse_raw(response_content)
except Exception as e:
logging.warning(f"Failed to parse raw response: {e}")

# Extract plan details and log for debugging
initial_goal = parsed_result.initial_goal
steps_data = parsed_result.steps
summary = parsed_result.summary_plan_and_steps
human_clarification_request = parsed_result.human_clarification_request
# Try to extract JSON from markdown code blocks
json_match = re.search(r'```(?:json)?\s*(.*?)\s*```', response_content, re.DOTALL)
if json_match:
json_content = json_match.group(1)
logging.info(f"Found JSON in code block, attempting to parse")
parsed_result = PlannerResponsePlan.parse_raw(json_content)
else:
# If still not parsed, raise the error to be handled by outer exception
raise ValueError(f"Failed to parse response as PlannerResponsePlan: {e}")

# At this point, we have a valid parsed_result or an exception was raised

# Extract plan details
initial_goal = parsed_result.initial_goal
steps_data = parsed_result.steps
summary = parsed_result.summary_plan_and_steps
human_clarification_request = parsed_result.human_clarification_request

# Create the Plan instance
plan = Plan(
id=str(uuid.uuid4()),
session_id=input_task.session_id,
user_id=self._user_id,
initial_goal=initial_goal,
overall_status=PlanStatus.in_progress,
summary=summary,
human_clarification_request=human_clarification_request
)

# Store the plan
await self._memory_store.add_plan(plan)

track_event_if_configured(
"Planner - Initial plan and added into the cosmos",
{
"session_id": input_task.session_id,
"user_id": self._user_id,
"initial_goal": initial_goal,
"overall_status": PlanStatus.in_progress,
"source": "PlannerAgent",
"summary": summary,
"human_clarification_request": human_clarification_request,
},
)

# Create steps from the parsed data
steps = []
for step_data in steps_data:
action = step_data.action
agent_name = step_data.agent

# Log potential mismatches between task and plan for debugging
if "onboard" in input_task.description.lower() and "marketing" in initial_goal.lower():
logging.warning(f"Potential mismatch: Task was about onboarding but plan goal mentions marketing: {initial_goal}")

# Log the steps and agent assignments for debugging
for i, step in enumerate(steps_data):
logging.info(f"Step {i+1} - Agent: {step.agent}, Action: {step.action}")
# Validate agent name
if agent_name not in self._available_agents:
logging.warning(f"Invalid agent name: {agent_name}, defaulting to GenericAgent")
agent_name = "GenericAgent"

# Create the Plan instance
plan = Plan(
# Create the step
step = Step(
id=str(uuid.uuid4()),
plan_id=plan.id,
session_id=input_task.session_id,
user_id=self._user_id,
initial_goal=initial_goal,
overall_status=PlanStatus.in_progress,
summary=summary,
human_clarification_request=human_clarification_request
action=action,
agent=agent_name,
status=StepStatus.planned,
human_approval_status=HumanFeedbackStatus.requested
)

# Store the plan
await self._memory_store.add_plan(plan)
# Store the step
await self._memory_store.add_step(step)
steps.append(step)

track_event_if_configured(
"Planner - Initial plan and added into the cosmos",
"Planner - Added planned individual step into the cosmos",
{
"plan_id": plan.id,
"action": action,
"agent": agent_name,
"status": StepStatus.planned,
"session_id": input_task.session_id,
"user_id": self._user_id,
"initial_goal": initial_goal,
"overall_status": PlanStatus.in_progress,
"source": "PlannerAgent",
"summary": summary,
"human_clarification_request": human_clarification_request,
"human_approval_status": HumanFeedbackStatus.requested,
},
)

# Create steps from the parsed data
steps = []
for step_data in steps_data:
action = step_data.action
agent_name = step_data.agent

# Log any unusual agent assignments for debugging
if "onboard" in input_task.description.lower() and agent_name != "HrAgent":
logging.warning(f"UNUSUAL AGENT ASSIGNMENT: Task contains 'onboard' but assigned to {agent_name} instead of HrAgent")

# Validate agent name
if agent_name not in self._available_agents:
logging.warning(f"Invalid agent name: {agent_name}, defaulting to GenericAgent")
agent_name = "GenericAgent"

# Create the step
step = Step(
id=str(uuid.uuid4()),
plan_id=plan.id,
session_id=input_task.session_id,
user_id=self._user_id,
action=action,
agent=agent_name,
status=StepStatus.planned,
human_approval_status=HumanFeedbackStatus.requested
)

# Store the step
await self._memory_store.add_step(step)
steps.append(step)

track_event_if_configured(
"Planner - Added planned individual step into the cosmos",
{
"plan_id": plan.id,
"action": action,
"agent": agent_name,
"status": StepStatus.planned,
"session_id": input_task.session_id,
"user_id": self._user_id,
"human_approval_status": HumanFeedbackStatus.requested,
},
)

return plan, steps

except Exception as e:
# If JSON parsing fails, log error and create error plan
logging.exception(f"Failed to parse JSON response: {e}")
logging.info(f"Raw response was: {response_content[:1000]}...")
# Try a fallback approach
return await self._create_fallback_plan_from_text(input_task, response_content)

return plan, steps

except Exception as e:
logging.exception(f"Error creating structured plan: {e}")

track_event_if_configured(
f"Planner - Error in create_structured_plan: {e} into the cosmos",
f"Planner - Error in create_structured_plan: {e}",
{
"session_id": input_task.session_id,
"user_id": self._user_id,
"initial_goal": "Error generating plan",
"overall_status": PlanStatus.failed,
"error": str(e),
"source": "PlannerAgent",
"summary": f"Error generating plan: {e}",
},
)

# Create an error plan
error_plan = Plan(
id=str(uuid.uuid4()),
session_id=input_task.session_id,
user_id=self._user_id,
initial_goal="Error generating plan",
overall_status=PlanStatus.failed,
summary=f"Error generating plan: {str(e)}"
)

await self._memory_store.add_plan(error_plan)
return error_plan, []

async def _create_fallback_plan_from_text(self, input_task: InputTask, text_content: str) -> Tuple[Plan, List[Step]]:
"""Create a plan from unstructured text when JSON parsing fails.

Args:
input_task: The input task
text_content: The text content from the LLM

Returns:
Tuple containing the created plan and list of steps
"""
logging.info("Creating fallback plan from text content")

# Extract goal from the text (first line or use input task description)
goal_match = re.search(r"(?:Goal|Initial Goal|Plan):\s*(.+?)(?:\n|$)", text_content)
goal = goal_match.group(1).strip() if goal_match else input_task.description

# Create the plan
plan = Plan(
id=str(uuid.uuid4()),
session_id=input_task.session_id,
user_id=self._user_id,
initial_goal=goal,
overall_status=PlanStatus.in_progress,
summary=f"Plan created from {input_task.description}"
)

# Store the plan
await self._memory_store.add_plan(plan)

# Parse steps using regex
step_pattern = re.compile(r'(?:Step|)\s*(\d+)[:.]\s*\*?\*?(?:Agent|):\s*\*?([^:*\n]+)\*?[:\s]*(.+?)(?=(?:Step|)\s*\d+[:.]\s*|$)', re.DOTALL)
matches = step_pattern.findall(text_content)

if not matches:
# Fallback to simpler pattern
step_pattern = re.compile(r'(\d+)[.:\)]\s*([^:]*?):\s*(.*?)(?=\d+[.:\)]|$)', re.DOTALL)
matches = step_pattern.findall(text_content)

# If still no matches, look for bullet points or numbered lists
if not matches:
step_pattern = re.compile(r'[•\-*]\s*([^:]*?):\s*(.*?)(?=[•\-*]|$)', re.DOTALL)
bullet_matches = step_pattern.findall(text_content)
if bullet_matches:
# Convert bullet matches to our expected format (number, agent, action)
matches = []
for i, (agent_text, action) in enumerate(bullet_matches, 1):
matches.append((str(i), agent_text.strip(), action.strip()))

steps = []
# If we found no steps at all, create at least one generic step
if not matches:
generic_step = Step(
id=str(uuid.uuid4()),
plan_id=plan.id,
session_id=input_task.session_id,
user_id=self._user_id,
action=f"Process the request: {input_task.description}",
agent="GenericAgent",
status=StepStatus.planned,
human_approval_status=HumanFeedbackStatus.requested
)
await self._memory_store.add_step(generic_step)
steps.append(generic_step)
else:
for match in matches:
number = match[0].strip()
agent_text = match[1].strip()
action = match[2].strip()

# Clean up agent name
agent = re.sub(r'\s+', '', agent_text)
if not agent or agent not in self._available_agents:
agent = "GenericAgent" # Default to GenericAgent if not recognized

# Create and store the step
step = Step(
id=str(uuid.uuid4()),
plan_id=plan.id,
session_id=input_task.session_id,
user_id=self._user_id,
action=action,
agent=agent,
status=StepStatus.planned,
human_approval_status=HumanFeedbackStatus.requested
)

await self._memory_store.add_step(step)
steps.append(step)

return plan, steps
# Re-raise the exception to be handled by the calling method
raise

def _generate_instruction(self, objective: str) -> str:
"""Generate instruction for the LLM to create a plan.
Expand Down
Loading