diff --git a/docs/images/readme/agent_flow.png b/docs/images/readme/agent_flow.png index 9b8b93701..1ba7c9fe6 100644 Binary files a/docs/images/readme/agent_flow.png and b/docs/images/readme/agent_flow.png differ diff --git a/docs/images/readme/architecture.png b/docs/images/readme/architecture.png index c0fce0f0d..12304dda5 100644 Binary files a/docs/images/readme/architecture.png and b/docs/images/readme/architecture.png differ diff --git a/src/backend/v3/orchestration/helper/plan_to_mplan_converter.py b/src/backend/v3/orchestration/helper/plan_to_mplan_converter.py new file mode 100644 index 000000000..13c0d7a69 --- /dev/null +++ b/src/backend/v3/orchestration/helper/plan_to_mplan_converter.py @@ -0,0 +1,194 @@ +import logging +import re +from typing import Iterable, List, Optional + +from v3.models.models import MPlan, MStep + +logger = logging.getLogger(__name__) + + +class PlanToMPlanConverter: + """ + Convert a free-form, bullet-style plan string into an MPlan object. + + Bullet parsing rules: + 1. Recognizes lines starting (optionally with indentation) followed by -, *, or • + 2. Attempts to resolve the agent in priority order: + a. First bolded token (**AgentName**) if within detection window and in team + b. Any team agent name appearing (case-insensitive) within the first detection window chars + c. Fallback agent name (default 'MagenticAgent') + 3. Removes the matched agent token from the action text + 4. Ignores bullet lines whose remaining action is blank + + Notes: + - This does not mutate MPlan.user_id (caller can assign after parsing). + - You can supply task text (becomes user_request) and facts text. + - Optionally detect sub-bullets (indent > 0). If enabled, a `level` integer is + returned alongside each MStep in an auxiliary `step_levels` list (since the + current MStep model doesn’t have a level field). + + Example: + converter = PlanToMPlanConverter(team=["ResearchAgent","AnalysisAgent"]) + mplan = converter.parse(plan_text=raw, task="Analyze Q4", facts="Some facts") + + """ + + BULLET_RE = re.compile(r'^(?P\s*)[-•*]\s+(?P.+)$') + BOLD_AGENT_RE = re.compile(r'\*\*([A-Za-z0-9_]+)\*\*') + STRIP_BULLET_MARKER_RE = re.compile(r'^[-•*]\s+') + + def __init__( + self, + team: Iterable[str], + task: str = "", + facts: str = "", + detection_window: int = 25, + fallback_agent: str = "MagenticAgent", + enable_sub_bullets: bool = False, + trim_actions: bool = True, + collapse_internal_whitespace: bool = True, + ): + self.team: List[str] = list(team) + self.task = task + self.facts = facts + self.detection_window = detection_window + self.fallback_agent = fallback_agent + self.enable_sub_bullets = enable_sub_bullets + self.trim_actions = trim_actions + self.collapse_internal_whitespace = collapse_internal_whitespace + + # Map for faster case-insensitive lookups while preserving canonical form + self._team_lookup = {t.lower(): t for t in self.team} + + # ---------------- Public API ---------------- # + + def parse(self, plan_text: str) -> MPlan: + """ + Parse the supplied bullet-style plan text into an MPlan. + + Returns: + MPlan with team, user_request, facts, steps populated. + + Side channel (if sub-bullets enabled): + self.last_step_levels: List[int] parallel to steps (0 = top, 1 = sub, etc.) + """ + mplan = MPlan() + mplan.team = self.team.copy() + mplan.user_request = self.task or mplan.user_request + mplan.facts = self.facts or mplan.facts + + lines = self._preprocess_lines(plan_text) + + step_levels: List[int] = [] + for raw_line in lines: + bullet_match = self.BULLET_RE.match(raw_line) + if not bullet_match: + continue # ignore non-bullet lines entirely + + indent = bullet_match.group("indent") or "" + body = bullet_match.group("body").strip() + + level = 0 + if self.enable_sub_bullets and indent: + # Simple heuristic: any indentation => level 1 (could extend to deeper) + level = 1 + + agent, action = self._extract_agent_and_action(body) + + if not action: + continue + + mplan.steps.append(MStep(agent=agent, action=action)) + if self.enable_sub_bullets: + step_levels.append(level) + + if self.enable_sub_bullets: + # Expose levels so caller can correlate (parallel list) + self.last_step_levels = step_levels # type: ignore[attr-defined] + + return mplan + + # ---------------- Internal Helpers ---------------- # + + def _preprocess_lines(self, plan_text: str) -> List[str]: + lines = plan_text.splitlines() + cleaned: List[str] = [] + for line in lines: + stripped = line.rstrip() + if stripped: + cleaned.append(stripped) + return cleaned + + def _extract_agent_and_action(self, body: str) -> (str, str): + """ + Apply bold-first strategy, then window scan fallback. + Returns (agent, action_text). + """ + original = body + + # 1. Try bold token + agent, body_after = self._try_bold_agent(original) + if agent: + action = self._finalize_action(body_after) + return agent, action + + # 2. Try window scan + agent2, body_after2 = self._try_window_agent(original) + if agent2: + action = self._finalize_action(body_after2) + return agent2, action + + # 3. Fallback + action = self._finalize_action(original) + return self.fallback_agent, action + + def _try_bold_agent(self, text: str) -> (Optional[str], str): + m = self.BOLD_AGENT_RE.search(text) + if not m: + return None, text + if m.start() <= self.detection_window: + candidate = m.group(1) + canonical = self._team_lookup.get(candidate.lower()) + if canonical: # valid agent + cleaned = text[:m.start()] + text[m.end():] + return canonical, cleaned.strip() + return None, text + + def _try_window_agent(self, text: str) -> (Optional[str], str): + head_segment = text[: self.detection_window].lower() + for canonical in self.team: + if canonical.lower() in head_segment: + # Remove first occurrence (case-insensitive) + pattern = re.compile(re.escape(canonical), re.IGNORECASE) + cleaned = pattern.sub("", text, count=1) + cleaned = cleaned.replace("*", "") + return canonical, cleaned.strip() + return None, text + + def _finalize_action(self, action: str) -> str: + if self.trim_actions: + action = action.strip() + if self.collapse_internal_whitespace: + action = re.sub(r'\s+', ' ', action) + return action + + # --------------- Convenience (static) --------------- # + + @staticmethod + def convert( + plan_text: str, + team: Iterable[str], + task: str = "", + facts: str = "", + **kwargs, + ) -> MPlan: + """ + One-shot convenience method: + mplan = PlanToMPlanConverter.convert(plan_text, team, task="X") + """ + return PlanToMPlanConverter( + team=team, + task=task, + facts=facts, + **kwargs, + ).parse(plan_text) \ No newline at end of file diff --git a/src/backend/v3/orchestration/human_approval_manager.py b/src/backend/v3/orchestration/human_approval_manager.py index 3cedf9f76..5d91133cd 100644 --- a/src/backend/v3/orchestration/human_approval_manager.py +++ b/src/backend/v3/orchestration/human_approval_manager.py @@ -4,32 +4,34 @@ """ import asyncio +import logging import re -from typing import Any, List, Optional +from typing import Any, Optional import v3.models.messages as messages -from semantic_kernel.agents import Agent from semantic_kernel.agents.orchestration.magentic import ( MagenticContext, ProgressLedger, ProgressLedgerItem, StandardMagenticManager) from semantic_kernel.agents.orchestration.prompts._magentic_prompts import ( - ORCHESTRATOR_TASK_LEDGER_FACTS_PROMPT, - ORCHESTRATOR_TASK_LEDGER_PLAN_PROMPT, + ORCHESTRATOR_FINAL_ANSWER_PROMPT, ORCHESTRATOR_TASK_LEDGER_PLAN_PROMPT, ORCHESTRATOR_TASK_LEDGER_PLAN_UPDATE_PROMPT) from semantic_kernel.contents import ChatMessageContent from v3.config.settings import (connection_config, current_user_id, orchestration_config) from v3.models.models import MPlan, MStep +from v3.orchestration.helper.plan_to_mplan_converter import \ + PlanToMPlanConverter -# Create a progress ledger that indicates the request is satisfied (task complete) - +# Using a module level logger to avoid pydantic issues around inherited fields +logger = logging.getLogger(__name__) +# Create a progress ledger that indicates the request is satisfied (task completed) class HumanApprovalMagenticManager(StandardMagenticManager): """ Extended Magentic manager that requires human approval before executing plan steps. Provides interactive approval for each step in the orchestration plan. """ - + # Define Pydantic fields to avoid validation errors approval_enabled: bool = True magentic_plan: Optional[MPlan] = None @@ -38,13 +40,6 @@ class HumanApprovalMagenticManager(StandardMagenticManager): def __init__(self, *args, **kwargs): # Remove any custom kwargs before passing to parent - # Use object.__setattr__ to bypass Pydantic validation - # object.__setattr__(self, 'current_user_id', None) - - facts_append = """ - - """ - plan_append = """ IMPORTANT: Never ask the user for information or clarification until all agents on the team have been asked first. @@ -64,12 +59,18 @@ def __init__(self, *args, **kwargs): - **ProxyAgent** to review the drafted onboarding plan for clarity and completeness. - **MagenticManager** to finalize the onboarding plan and prepare it for presentation to stakeholders. +""" + + final_append = """ +The final answer should not include any offers of further conversation or assistance. The application will not all further interaction with the user. +The final answer should be a complete and final response to the user's original request. """ # kwargs["task_ledger_facts_prompt"] = ORCHESTRATOR_TASK_LEDGER_FACTS_PROMPT + facts_append kwargs['task_ledger_plan_prompt'] = ORCHESTRATOR_TASK_LEDGER_PLAN_PROMPT + plan_append kwargs['task_ledger_plan_update_prompt'] = ORCHESTRATOR_TASK_LEDGER_PLAN_UPDATE_PROMPT + plan_append - + kwargs['final_answer_prompt'] = ORCHESTRATOR_FINAL_ANSWER_PROMPT + final_append + super().__init__(*args, **kwargs) async def plan(self, magentic_context: MagenticContext) -> Any: @@ -82,15 +83,16 @@ async def plan(self, magentic_context: MagenticContext) -> Any: task_text = task_text.content elif not isinstance(task_text, str): task_text = str(task_text) - - print(f"\n Human-in-the-Loop Magentic Manager Creating Plan:") - print(f" Task: {task_text}") - print("-" * 60) - + + logger.info("\n Human-in-the-Loop Magentic Manager Creating Plan:") + logger.info(" Task: %s", task_text) + logger.info("-" * 60) + # First, let the parent create the actual plan - print(" Creating execution plan...") + logger.info(" Creating execution plan...") plan = await super().plan(magentic_context) - print(f" Plan created: {plan}") + logger.info(" Plan created: %s",plan) + self.magentic_plan = self.plan_to_obj( magentic_context, self.task_ledger) self.magentic_plan.user_id = current_user_id.get() @@ -107,7 +109,7 @@ async def plan(self, magentic_context: MagenticContext) -> Any: try: orchestration_config.plans[self.magentic_plan.id] = self.magentic_plan except Exception as e: - print(f"Error processing plan approval: {e}") + logger.error("Error processing plan approval: %s", e) # Send the approval request to the user's WebSocket @@ -116,31 +118,31 @@ async def plan(self, magentic_context: MagenticContext) -> Any: message=approval_message, user_id=current_user_id.get(), message_type=messages.WebsocketMessageType.PLAN_APPROVAL_REQUEST) - + # Wait for user approval approval_response = await self._wait_for_user_approval(approval_message.plan.id) - + if approval_response and approval_response.approved: - print("Plan approved - proceeding with execution...") + logger.info("Plan approved - proceeding with execution...") return plan else: - print("Plan execution cancelled by user") + logger.debug("Plan execution cancelled by user") await connection_config.send_status_update_async({ "type": messages.WebsocketMessageType.PLAN_APPROVAL_RESPONSE, "data": approval_response }, user_id=current_user_id.get(), message_type=messages.WebsocketMessageType.PLAN_APPROVAL_RESPONSE) raise Exception("Plan execution cancelled by user") - + async def replan(self,magentic_context: MagenticContext) -> Any: """ Override to add websocket messages for replanning events. """ - print(f"\nHuman-in-the-Loop Magentic Manager replanned:") + logger.info("\nHuman-in-the-Loop Magentic Manager replanned:") replan = await super().replan(magentic_context=magentic_context) - print("Replanned: %s", replan) + logger.info("Replanned: %s", replan) return replan - + async def create_progress_ledger(self, magentic_context: MagenticContext) -> ProgressLedger: """ Check for max rounds exceeded and send final message if so. """ if magentic_context.round_count >= orchestration_config.max_rounds: @@ -150,12 +152,12 @@ async def create_progress_ledger(self, magentic_context: MagenticContext) -> Pro status="terminated", summary=f"Stopped after {magentic_context.round_count} rounds (max: {orchestration_config.max_rounds})" ) - + await connection_config.send_status_update_async( message= final_message, user_id=current_user_id.get(), message_type=messages.WebsocketMessageType.FINAL_RESULT_MESSAGE) - + return ProgressLedger( is_request_satisfied=ProgressLedgerItem(reason="Maximum rounds exceeded", answer=True), is_in_loop=ProgressLedgerItem(reason="Terminating", answer=False), @@ -163,12 +165,13 @@ async def create_progress_ledger(self, magentic_context: MagenticContext) -> Pro next_speaker=ProgressLedgerItem(reason="Task complete", answer=""), instruction_or_question=ProgressLedgerItem(reason="Task complete", answer="Process terminated due to maximum rounds exceeded") ) - + return await super().create_progress_ledger(magentic_context) - - async def _wait_for_user_approval(self, m_plan_id: Optional[str] = None) -> Optional[messages.PlanApprovalResponse]: # plan_id will not be optional in future + + # plan_id will not be optional in future + async def _wait_for_user_approval(self, m_plan_id: Optional[str] = None) -> Optional[messages.PlanApprovalResponse]: """Wait for user approval response.""" - + # To do: implement timeout and error handling if m_plan_id not in orchestration_config.approvals: orchestration_config.approvals[m_plan_id] = None @@ -180,85 +183,72 @@ async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatM """ Override to ensure final answer is prepared after all steps are executed. """ - print("\n Magentic Manager - Preparing final answer...") + logger.info("\n Magentic Manager - Preparing final answer...") return await super().prepare_final_answer(magentic_context) - - - async def _get_plan_approval_with_details(self, task: str, participant_descriptions: dict, plan: Any) -> bool: - while True: - approval = input("\ Approve this execution plan? [y/n/details]: ").strip().lower() - - if approval in ['y', 'yes']: - print(" Plan approved by user") - return True - elif approval in ['n', 'no']: - print(" Plan rejected by user") - return False - # elif approval in ['d', 'details']: - # self._show_detailed_plan_info(task, participant_descriptions, plan) - else: - print("Please enter 'y' for yes, 'n' for no, or 'details' for more info") def plan_to_obj(self, magentic_context, ledger) -> MPlan: """ Convert the generated plan from the ledger into a structured MPlan object. """ - - return_plan: MPlan = MPlan() - - # get the request text from the ledger - if hasattr(magentic_context, 'task'): - return_plan.user_request = magentic_context.task - - return_plan.team = list(magentic_context.participant_descriptions.keys()) - - # Get the facts content from the ledger - if hasattr(ledger, 'facts') and ledger.facts.content: - return_plan.facts = ledger.facts.content - - # Get the plan / steps content from the ledger - # Split the description into lines and clean them - lines = [line.strip() for line in ledger.plan.content.strip().split('\n') if line.strip()] - - found_agent = None - prefix = None - - for line in lines: - # match lines that look like bullet points - if re.match(r'^[-•*]\s+', line): - # Remove the bullet point marker - line = re.sub(r'^[-•*]\s+', '', line).strip() - - # Look for agent names in the line - - for agent_name in return_plan.team: - # Check if agent name appears in the line (case insensitive) - if agent_name.lower() in line.lower(): - found_agent = agent_name - line = line.split(agent_name, 1) - line = line[1].strip() if len(line) > 1 else "" - line = line.replace('*', '').strip() - break - - if not found_agent: - # If no agent found, assign to ProxyAgent if available - found_agent = "MagenticAgent" - # If line indicates a following list of actions (e.g. "Assign **EnhancedResearchAgent** - # to gather authoritative data on:") save and prefix to the steps - if line.endswith(':'): - line = line.replace(':', '').strip() - prefix = line + " " - - # Don't create a step if action is blank - if line.strip() != "": - if prefix: - line = prefix + line - # Create the step object - step = MStep(agent=found_agent, action=line) - - # add the step to the plan - return_plan.steps.append(step) # pylint: disable=E1101 - - return return_plan + return_plan: MPlan = PlanToMPlanConverter.convert(plan_text=ledger.plan.content,facts=ledger.facts.content, team=list(magentic_context.participant_descriptions.keys()), task=magentic_context.task) + + # # get the request text from the ledger + # if hasattr(magentic_context, 'task'): + # return_plan.user_request = magentic_context.task + + # return_plan.team = list(magentic_context.participant_descriptions.keys()) + + # # Get the facts content from the ledger + # if hasattr(ledger, 'facts') and ledger.facts.content: + # return_plan.facts = ledger.facts.content + + # # Get the plan / steps content from the ledger + # # Split the description into lines and clean them + # lines = [line.strip() for line in ledger.plan.content.strip().split('\n') if line.strip()] + + # found_agent = None + # prefix = None + + # for line in lines: + # found_agent = None + # prefix = None + # # log the line for troubleshooting + # logger.debug("Processing plan line: %s", line) + + # # match only lines that have bullet points + # if re.match(r'^[-•*]\s+', line): + # # Remove the bullet point marker + # line = re.sub(r'^[-•*]\s+', '', line).strip() + + # # Look for agent names in the line + + # for agent_name in return_plan.team: + # # Check if agent name appears in the line (case insensitive) + # if agent_name.lower() in line[:20].lower(): + # found_agent = agent_name + # line = line.split(agent_name, 1) + # line = line[1].strip() if len(line) > 1 else "" + # line = line.replace('*', '').strip() + # break + + # if not found_agent: + # # If no agent found, assign to ProxyAgent if available + # found_agent = "MagenticAgent" + # # If line indicates a following list of actions (e.g. "Assign **EnhancedResearchAgent** + # # to gather authoritative data on:") save and prefix to the steps + # # if line.endswith(':'): + # # line = line.replace(':', '').strip() + # # prefix = line + " " + + # # Don't create a step if action is blank + # if line.strip() != "": + # if prefix: + # line = prefix + line + # # Create the step object + # step = MStep(agent=found_agent, action=line) + + # # add the step to the plan + # return_plan.steps.append(step) # pylint: disable=E1101 + return return_plan diff --git a/src/backend/v3/orchestration/orchestration_manager.py b/src/backend/v3/orchestration/orchestration_manager.py index 026c754ec..82f2eeb9d 100644 --- a/src/backend/v3/orchestration/orchestration_manager.py +++ b/src/backend/v3/orchestration/orchestration_manager.py @@ -3,6 +3,7 @@ import asyncio import contextvars +import logging import os import uuid from contextvars import ContextVar @@ -18,12 +19,12 @@ AzureChatCompletion, OpenAIChatPromptExecutionSettings) from semantic_kernel.contents import (ChatMessageContent, StreamingChatMessageContent) -from v3.models.messages import WebsocketMessageType from v3.callbacks.response_handlers import (agent_response_callback, streaming_agent_response_callback) from v3.config.settings import (config, connection_config, current_user_id, orchestration_config) from v3.magentic_agents.magentic_agent_factory import MagenticAgentFactory +from v3.models.messages import WebsocketMessageType from v3.orchestration.human_approval_manager import \ HumanApprovalMagenticManager @@ -33,8 +34,13 @@ class OrchestrationManager: """Manager for handling orchestration logic.""" + # Class-scoped logger (always available to classmethods) + logger = logging.getLogger(f"{__name__}.OrchestrationManager") + def __init__(self): self.user_id: Optional[str] = None + # Optional alias (helps with autocomplete) + self.logger = self.__class__.logger @classmethod async def init_orchestration(cls, agents: List, user_id: str = None)-> MagenticOrchestration: @@ -83,7 +89,7 @@ async def callback(streaming_message: StreamingChatMessageContent, is_final: boo return callback @classmethod - async def get_current_or_new_orchestration(self, user_id: str, team_config: TeamConfiguration, team_switched: bool) -> MagenticOrchestration: # add team_switched: bool parameter + async def get_current_or_new_orchestration(cls, user_id: str, team_config: TeamConfiguration, team_switched: bool) -> MagenticOrchestration: # add team_switched: bool parameter """get existing orchestration instance.""" current_orchestration = orchestration_config.get_current_orchestration(user_id) if current_orchestration is None or team_switched: # add check for team_switched flag @@ -93,10 +99,10 @@ async def get_current_or_new_orchestration(self, user_id: str, team_config: Team try: await agent.close() except Exception as e: - print(f"Error closing agent: {e}") + cls.logger.error("Error closing agent: %s", e) factory = MagenticAgentFactory() agents = await factory.get_agents(team_config_input=team_config) - orchestration_config.orchestrations[user_id] = await self.init_orchestration(agents, user_id) + orchestration_config.orchestrations[user_id] = await cls.init_orchestration(agents, user_id) return orchestration_config.get_current_orchestration(user_id) async def run_orchestration(self, user_id, input_task) -> None: @@ -114,9 +120,9 @@ async def run_orchestration(self, user_id, input_task) -> None: try: if hasattr(magentic_orchestration, '_manager') and hasattr(magentic_orchestration._manager, 'current_user_id'): magentic_orchestration._manager.current_user_id = user_id - print(f"🔍 DEBUG: Set user_id on manager = {user_id}") + self.logger.debug(f"DEBUG: Set user_id on manager = {user_id}") except Exception as e: - print(f"Error setting user_id on manager: {e}") + self.logger.error(f"Error setting user_id on manager: {e}") runtime = InProcessRuntime() runtime.start() @@ -129,10 +135,10 @@ async def run_orchestration(self, user_id, input_task) -> None: ) try: - print("\nAgent responses:") + self.logger.info("\nAgent responses:") value = await orchestration_result.get() - print(f"\nFinal result:\n{value}") - print("=" * 50) + self.logger.info(f"\nFinal result:\n{value}") + self.logger.info("=" * 50) # Send final result via WebSocket await connection_config.send_status_update_async({ @@ -143,16 +149,16 @@ async def run_orchestration(self, user_id, input_task) -> None: "timestamp": asyncio.get_event_loop().time() } }, user_id, message_type=WebsocketMessageType.FINAL_RESULT_MESSAGE) - print(f"Final result sent via WebSocket to user {user_id}") + self.logger.info(f"Final result sent via WebSocket to user {user_id}") except Exception as e: - print(f"Error: {e}") - print(f"Error type: {type(e).__name__}") + self.logger.info(f"Error: {e}") + self.logger.info(f"Error type: {type(e).__name__}") if hasattr(e, '__dict__'): - print(f"Error attributes: {e.__dict__}") - print("=" * 50) + self.logger.info(f"Error attributes: {e.__dict__}") + self.logger.info("=" * 50) except Exception as e: - print(f"Unexpected error: {e}") + self.logger.error(f"Unexpected error: {e}") finally: await runtime.stop_when_idle() current_user_id.reset(token) diff --git a/src/frontend/package-lock.json b/src/frontend/package-lock.json index c239c8a62..3e17847f7 100644 --- a/src/frontend/package-lock.json +++ b/src/frontend/package-lock.json @@ -3991,9 +3991,9 @@ } }, "node_modules/axios": { - "version": "1.11.0", - "resolved": "https://registry.npmjs.org/axios/-/axios-1.11.0.tgz", - "integrity": "sha512-1Lx3WLFQWm3ooKDYZD1eXmoGO9fxYQjrycfHFC8P0sCfQVXyROp0p9PFWBehewBOdCwHc+f/b8I0fMto5eSfwA==", + "version": "1.12.2", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.12.2.tgz", + "integrity": "sha512-vMJzPewAlRyOgxV2dU0Cuz2O8zzzx9VYtbJOaBgXFeLc4IV/Eg50n4LowmehOOR61S8ZMpc2K5Sa7g6A4jfkUw==", "license": "MIT", "dependencies": { "follow-redirects": "^1.15.6", diff --git a/src/tests/agents/test_foundry_integration.py b/src/tests/agents/test_foundry_integration.py index c59ae2da3..b35661d6a 100644 --- a/src/tests/agents/test_foundry_integration.py +++ b/src/tests/agents/test_foundry_integration.py @@ -4,10 +4,7 @@ """ # pylint: disable=E0401, E0611, C0413 -import asyncio -import os import sys -import time from pathlib import Path import pytest diff --git a/src/tests/agents/test_human_approval_manager.py b/src/tests/agents/test_human_approval_manager.py new file mode 100644 index 000000000..0cba88424 --- /dev/null +++ b/src/tests/agents/test_human_approval_manager.py @@ -0,0 +1,213 @@ +import sys +from pathlib import Path + +import pytest + +# Add the backend path to sys.path so we can import v3 modules +backend_path = Path(__file__).parent.parent.parent / "backend" +sys.path.insert(0, str(backend_path)) + +from v3.models.models import MPlan, MStep +from v3.orchestration.human_approval_manager import \ + HumanApprovalMagenticManager + +# +# Helper dummies to simulate the minimal shape required by plan_to_obj +# + +class _Obj: + def __init__(self, content: str): + self.content = content + +class DummyLedger: + def __init__(self, plan_content: str, facts_content: str = ""): + self.plan = _Obj(plan_content) + self.facts = _Obj(facts_content) + +class DummyContext: + def __init__(self, task: str, participant_descriptions: dict[str, str]): + self.task = task + self.participant_descriptions = participant_descriptions + + +def _make_manager(): + """ + Create a HumanApprovalMagenticManager instance without calling its __init__ + (avoids needing the full semantic kernel dependencies for this focused unit test). + """ + return HumanApprovalMagenticManager.__new__(HumanApprovalMagenticManager) + +def test_plan_to_obj_basic_parsing(): + plan_text = """ +- **ProductAgent** to provide detailed information about the company's current products. +- **MarketingAgent** to gather relevant market positioning insights, key messaging strategies. +- **MarketingAgent** to draft an initial press release outline based on the product details. +- **ProductAgent** to review the press release outline for technical accuracy and completeness of product details. +- **MarketingAgent** to finalize the press release draft incorporating the ProductAgent’s feedback. +- **ProxyAgent** to step in and request additional clarification or missing details from ProductAgent and MarketingAgent. +""" + ctx = DummyContext( + task="Analyze Q4 performance", + participant_descriptions={ + "ProductAgent": "Provide product info", + "MarketingAgent": "Handle marketing", + "ProxyAgent": "Ask user for missing info", + }, + ) + ledger = DummyLedger(plan_text) + mgr = _make_manager() + + mplan = mgr.plan_to_obj(ctx, ledger) + + assert isinstance(mplan, MPlan) + assert mplan.user_request == "Analyze Q4 performance" + assert len(mplan.steps) == 6 + + agents = [s.agent for s in mplan.steps] + assert agents == ["ProductAgent", "MarketingAgent", "MarketingAgent","ProductAgent", "MarketingAgent", "ProxyAgent"] + + actions = [s.action for s in mplan.steps] + assert "to provide detailed information about the company's current products" in actions[0] + assert "to gather relevant market positioning insights, key messaging strategies" in actions[1].lower() + assert "to draft an initial press release outline based on the product details" in actions[2] + assert "to review the press release outline for technical accuracy and completeness of product details" in actions[3] + assert "to finalize the press release draft incorporating the productagent’s feedback" in actions[4].lower() + assert "to step in and request additional clarification or missing details from productagent and marketingagent" in actions[5].lower() + + +def test_plan_to_obj_ignores_non_bullet_lines_and_uses_fallback(): + plan_text = """ +Introduction line that should be ignored +- **ResearchAgent** to collect competitor pricing +Some trailing commentary +- finalize compiled dataset +""" + ctx = DummyContext( + task="Compile competitive pricing dataset", + participant_descriptions={ + "ResearchAgent": "Collect data", + }, + ) + ledger = DummyLedger(plan_text) + mgr = _make_manager() + + mplan = mgr.plan_to_obj(ctx, ledger) + + # Only 2 bullet lines + assert len(mplan.steps) == 2 + assert mplan.steps[0].agent == "ResearchAgent" + # Second bullet has no recognizable agent => fallback + assert mplan.steps[1].agent == "MagenticAgent" + assert "finalize compiled dataset" in mplan.steps[1].action.lower() + + +def test_plan_to_obj_resets_agent_each_line(): + plan_text = """ +- **ResearchAgent** to gather initial statistics +- finalize normalizing collected values +""" + ctx = DummyContext( + task="Normalize stats", + participant_descriptions={ + "ResearchAgent": "Collect data", + }, + ) + ledger = DummyLedger(plan_text) + mgr = _make_manager() + + mplan = mgr.plan_to_obj(ctx, ledger) + + assert len(mplan.steps) == 2 + assert mplan.steps[0].agent == "ResearchAgent" + # Ensure no leakage of previous agent + assert mplan.steps[1].agent == "MagenticAgent" + + +@pytest.mark.xfail(reason="Current implementation duplicates text when a line ends with ':' due to prefix handling.") +def test_plan_to_obj_colon_prefix_current_behavior(): + plan_text = """ +- **ResearchAgent** to gather quarterly metrics: +""" + ctx = DummyContext( + task="Quarterly metrics", + participant_descriptions={ + "ResearchAgent": "Collect metrics", + }, + ) + ledger = DummyLedger(plan_text) + mgr = _make_manager() + + mplan = mgr.plan_to_obj(ctx, ledger) + + # Expect 1 step + assert len(mplan.steps) == 1 + # Current code creates duplicated phrase if colon is present (likely a bug) + action = mplan.steps[0].action + # This assertion documents present behavior; adjust when you fix prefix logic. + assert action.count("gather quarterly metrics") == 1 # Will fail until fixed + + +def test_plan_to_obj_empty_or_whitespace_plan(): + plan_text = " \n \n" + ctx = DummyContext( + task="Empty plan test", + participant_descriptions={ + "AgentA": "A", + }, + ) + ledger = DummyLedger(plan_text) + mgr = _make_manager() + + mplan = mgr.plan_to_obj(ctx, ledger) + assert len(mplan.steps) == 0 + assert mplan.user_request == "Empty plan test" + + +def test_plan_to_obj_multiple_agents_case_insensitive(): + plan_text = """ +- **researchagent** to collect raw feeds +- **ANALYSISAGENT** to process raw feeds +""" + ctx = DummyContext( + task="Case insensitivity test", + participant_descriptions={ + "ResearchAgent": "Collect", + "AnalysisAgent": "Process", + }, + ) + ledger = DummyLedger(plan_text) + mgr = _make_manager() + + mplan = mgr.plan_to_obj(ctx, ledger) + assert [s.agent for s in mplan.steps] == ["ResearchAgent", "AnalysisAgent"] + + +def test_plan_to_obj_facts_copied(): + plan_text = "- **ResearchAgent** to gather X" + facts_text = "Known constraints: Budget capped." + ctx = DummyContext( + task="Gather X", + participant_descriptions={"ResearchAgent": "Collect"}, + ) + ledger = DummyLedger(plan_text, facts_text) + mgr = _make_manager() + + mplan = mgr.plan_to_obj(ctx, ledger) + assert mplan.facts == "Known constraints: Budget capped." + assert len(mplan.steps) == 1 + assert mplan.steps[0].agent == "ResearchAgent" + + +def test_plan_to_obj_fallback_when_agent_not_in_team(): + plan_text = "- **UnknownAgent** to do something unusual" + ctx = DummyContext( + task="Unknown agent test", + participant_descriptions={"ResearchAgent": "Collect"}, + ) + ledger = DummyLedger(plan_text) + mgr = _make_manager() + + mplan = mgr.plan_to_obj(ctx, ledger) + assert len(mplan.steps) == 1 + assert mplan.steps[0].agent == "MagenticAgent" + assert "do something unusual" in mplan.steps[0].action.lower() \ No newline at end of file