From 75b51cd04ecbdda82003ece76b01a4564a0a2d27 Mon Sep 17 00:00:00 2001 From: Francia Riesco Date: Thu, 23 Oct 2025 12:17:12 -0400 Subject: [PATCH 1/2] Remove MCP authentication and plugin management module Deleted src/mcp_server/auth.py, which contained MCP authentication and HostedMCPTool setup logic for the employee onboarding system. This change may indicate a refactor, deprecation, or migration of authentication and plugin management functionality. --- src/mcp_server/auth.py | 65 ------------------------------------------ 1 file changed, 65 deletions(-) delete mode 100644 src/mcp_server/auth.py diff --git a/src/mcp_server/auth.py b/src/mcp_server/auth.py deleted file mode 100644 index 06aa7486..00000000 --- a/src/mcp_server/auth.py +++ /dev/null @@ -1,65 +0,0 @@ -""" -MCP authentication and plugin (tool) management for employee onboarding system. - -""" - -from azure.identity import InteractiveBrowserCredential -from agent_framework import HostedMCPTool # agent_framework substitute -from config.settings import TENANT_ID, CLIENT_ID, mcp_config - - -async def setup_mcp_authentication(): - """ - Set up MCP authentication and return an access token string for downstream header use. - Returns: - str | None: Access token (bearer) or None if authentication fails. - """ - try: - interactive_credential = InteractiveBrowserCredential( - tenant_id=TENANT_ID, - client_id=CLIENT_ID, - ) - token = interactive_credential.get_token(f"api://{CLIENT_ID}/access_as_user") - print("✅ Successfully obtained MCP authentication token") - return token.token - except Exception as e: # noqa: BLE001 - print(f"❌ Failed to get MCP token: {e}") - print("🔄 Continuing without MCP authentication...") - return None - - -async def create_mcp_plugin(token: str | None = None): - """ - Create and initialize an MCP tool (agent_framework HostedMCPTool) for onboarding tools. - - Parameters: - token: Optional bearer token string for authenticated MCP requests. - - Returns: - HostedMCPTool | None - """ - if not token: - print("⚠️ No MCP token available, skipping MCP tool creation") - return None - - try: - headers = mcp_config.get_headers(token) - # HostedMCPTool currently doesn’t require headers directly in its constructor in some builds; - # if your version supports passing headers, include them. We store them on the instance for later use. - mcp_tool = HostedMCPTool( - name=mcp_config.name, - description=mcp_config.description, - server_label=mcp_config.name.replace(" ", "_"), - url=mcp_config.url, - ) - # Optionally attach headers for downstream custom transport layers - try: - setattr(mcp_tool, "headers", headers) - except Exception: - pass - - print("✅ MCP tool (HostedMCPTool) created successfully for employee onboarding") - return mcp_tool - except Exception as e: # noqa: BLE001 - print(f"⚠️ Warning: Could not create MCP tool: {e}") - return None \ No newline at end of file From 3fed251124459ee890e140a2702bd0f9aca89206 Mon Sep 17 00:00:00 2001 From: Francia Riesco Date: Thu, 23 Oct 2025 14:07:50 -0400 Subject: [PATCH 2/2] Refactor for improved readability and formatting Reformatted code in human_approval_manager.py and orchestration_manager.py for better readability, including consistent indentation, line breaks, and argument formatting. No functional changes were made; this refactor aims to enhance maintainability and clarity. --- .../orchestration/human_approval_manager.py | 67 ++++++++++++++----- .../af/orchestration/orchestration_manager.py | 53 ++++++++++----- 2 files changed, 88 insertions(+), 32 deletions(-) diff --git a/src/backend/af/orchestration/human_approval_manager.py b/src/backend/af/orchestration/human_approval_manager.py index 40eff479..9af0c020 100644 --- a/src/backend/af/orchestration/human_approval_manager.py +++ b/src/backend/af/orchestration/human_approval_manager.py @@ -68,8 +68,12 @@ def __init__(self, user_id: str, *args, **kwargs): DO NOT EVER OFFER TO HELP FURTHER IN THE FINAL ANSWER! Just provide the final answer and end with a polite closing. """ - kwargs["task_ledger_plan_prompt"] = ORCHESTRATOR_TASK_LEDGER_PLAN_PROMPT + plan_append - kwargs["task_ledger_plan_update_prompt"] = ORCHESTRATOR_TASK_LEDGER_PLAN_UPDATE_PROMPT + plan_append + kwargs["task_ledger_plan_prompt"] = ( + ORCHESTRATOR_TASK_LEDGER_PLAN_PROMPT + plan_append + ) + kwargs["task_ledger_plan_update_prompt"] = ( + ORCHESTRATOR_TASK_LEDGER_PLAN_UPDATE_PROMPT + plan_append + ) kwargs["final_answer_prompt"] = ORCHESTRATOR_FINAL_ANSWER_PROMPT + final_append kwargs["current_user_id"] = user_id # retained for downstream usage if needed @@ -90,7 +94,10 @@ async def plan(self, magentic_context: MagenticContext) -> Any: logger.info(" Creating execution plan...") plan_message = await super().plan(magentic_context) - logger.info(" Plan created (assistant message length=%d)", len(plan_message.text) if plan_message and plan_message.text else 0) + logger.info( + " Plan created (assistant message length=%d)", + len(plan_message.text) if plan_message and plan_message.text else 0, + ) # Build structured MPlan from task ledger if self.task_ledger is None: @@ -148,10 +155,15 @@ async def replan(self, magentic_context: MagenticContext) -> Any: """ logger.info("\nHuman-in-the-Loop Magentic Manager replanned:") replan_message = await super().replan(magentic_context=magentic_context) - logger.info("Replanned message length: %d", len(replan_message.text) if replan_message and replan_message.text else 0) + logger.info( + "Replanned message length: %d", + len(replan_message.text) if replan_message and replan_message.text else 0, + ) return replan_message - async def create_progress_ledger(self, magentic_context: MagenticContext) -> ProgressLedger: + async def create_progress_ledger( + self, magentic_context: MagenticContext + ) -> ProgressLedger: """ Check for max rounds exceeded and send final message if so, else defer to base. """ @@ -169,9 +181,13 @@ async def create_progress_ledger(self, magentic_context: MagenticContext) -> Pro ) return ProgressLedger( - is_request_satisfied=ProgressLedgerItem(reason="Maximum rounds exceeded", answer=True), + is_request_satisfied=ProgressLedgerItem( + reason="Maximum rounds exceeded", answer=True + ), is_in_loop=ProgressLedgerItem(reason="Terminating", answer=False), - is_progress_being_made=ProgressLedgerItem(reason="Terminating", answer=False), + is_progress_being_made=ProgressLedgerItem( + reason="Terminating", answer=False + ), next_speaker=ProgressLedgerItem(reason="Task complete", answer=""), instruction_or_question=ProgressLedgerItem( reason="Task complete", @@ -202,7 +218,10 @@ async def _wait_for_user_approval( return messages.PlanApprovalResponse(approved=approved, m_plan_id=m_plan_id) except asyncio.TimeoutError: - logger.debug("Approval timeout for plan %s - notifying user and terminating process", m_plan_id) + logger.debug( + "Approval timeout for plan %s - notifying user and terminating process", + m_plan_id, + ) timeout_message = messages.TimeoutNotification( timeout_type="approval", @@ -218,7 +237,11 @@ async def _wait_for_user_approval( user_id=self.current_user_id, message_type=messages.WebsocketMessageType.TIMEOUT_NOTIFICATION, ) - logger.info("Timeout notification sent to user %s for plan %s", self.current_user_id, m_plan_id) + logger.info( + "Timeout notification sent to user %s for plan %s", + self.current_user_id, + m_plan_id, + ) except Exception as e: # noqa: BLE001 logger.error("Failed to send timeout notification: %s", e) @@ -235,16 +258,24 @@ async def _wait_for_user_approval( return None except Exception as e: # noqa: BLE001 - logger.debug("Unexpected error waiting for approval: %s - terminating process silently", e) + logger.debug( + "Unexpected error waiting for approval: %s - terminating process silently", + e, + ) orchestration_config.cleanup_approval(m_plan_id) return None finally: - if m_plan_id in orchestration_config.approvals and orchestration_config.approvals[m_plan_id] is None: + if ( + m_plan_id in orchestration_config.approvals + and orchestration_config.approvals[m_plan_id] is None + ): logger.debug("Final cleanup for pending approval plan %s", m_plan_id) orchestration_config.cleanup_approval(m_plan_id) - async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatMessage: + async def prepare_final_answer( + self, magentic_context: MagenticContext + ) -> ChatMessage: """ Override to ensure final answer is prepared after all steps are executed. """ @@ -253,8 +284,14 @@ async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatM def plan_to_obj(self, magentic_context: MagenticContext, ledger) -> MPlan: """Convert the generated plan from the ledger into a structured MPlan object.""" - if ledger is None or not hasattr(ledger, "plan") or not hasattr(ledger, "facts"): - raise ValueError("Invalid ledger structure; expected plan and facts attributes.") + if ( + ledger is None + or not hasattr(ledger, "plan") + or not hasattr(ledger, "facts") + ): + raise ValueError( + "Invalid ledger structure; expected plan and facts attributes." + ) task_text = getattr(magentic_context.task, "text", str(magentic_context.task)) @@ -265,4 +302,4 @@ def plan_to_obj(self, magentic_context: MagenticContext, ledger) -> MPlan: task=task_text, ) - return return_plan \ No newline at end of file + return return_plan diff --git a/src/backend/af/orchestration/orchestration_manager.py b/src/backend/af/orchestration/orchestration_manager.py index 80e25e0d..c76a6b67 100644 --- a/src/backend/af/orchestration/orchestration_manager.py +++ b/src/backend/af/orchestration/orchestration_manager.py @@ -5,17 +5,16 @@ import uuid from typing import List, Optional, Callable, Awaitable -from common.config.app_config import config -from common.models.messages_af import TeamConfiguration - # agent_framework imports -from agent_framework import ChatMessage, Role, ChatOptions from agent_framework.azure import AzureOpenAIChatClient -from agent_framework._workflows import ( - MagenticBuilder -) +from agent_framework import ChatMessage, ChatOptions +from agent_framework._workflows import MagenticBuilder from agent_framework._workflows._magentic import AgentRunResponseUpdate # type: ignore + +from common.config.app_config import config +from common.models.messages_af import TeamConfiguration + # Existing (legacy) callbacks expecting SK content; we'll adapt to them. # If you've created af-native callbacks (e.g. response_handlers_af) import those instead. from af.callbacks.response_handlers import ( @@ -40,7 +39,9 @@ def __init__(self): # Internal callback adapters # --------------------------- @staticmethod - def _user_aware_agent_callback(user_id: str) -> Callable[[str, ChatMessage], Awaitable[None]]: + def _user_aware_agent_callback( + user_id: str, + ) -> Callable[[str, ChatMessage], Awaitable[None]]: """Adapts agent_framework final agent ChatMessage to legacy agent_response_callback signature.""" async def _cb(agent_id: str, message: ChatMessage): @@ -48,7 +49,9 @@ async def _cb(agent_id: str, message: ChatMessage): try: agent_response_callback(message, user_id) # existing callback is sync except Exception as e: # noqa: BLE001 - logging.getLogger(__name__).error("agent_response_callback error: %s", e) + logging.getLogger(__name__).error( + "agent_response_callback error: %s", e + ) return _cb @@ -72,7 +75,9 @@ def __init__(self, agent_id: str, update: AgentRunResponseUpdate): try: await streaming_agent_response_callback(shim, is_final, user_id) except Exception as e: # noqa: BLE001 - logging.getLogger(__name__).error("streaming_agent_response_callback error: %s", e) + logging.getLogger(__name__).error( + "streaming_agent_response_callback error: %s", e + ) return _cb @@ -146,14 +151,19 @@ def get_token(): "_agent_response_callback", cls._user_aware_agent_callback(user_id), ) - if getattr(orchestrator, "_streaming_agent_response_callback", None) is None: + if ( + getattr(orchestrator, "_streaming_agent_response_callback", None) + is None + ): setattr( orchestrator, "_streaming_agent_response_callback", cls._user_aware_streaming_callback(user_id), ) except Exception as e: # noqa: BLE001 - cls.logger.warning("Could not attach callbacks to workflow orchestrator: %s", e) + cls.logger.warning( + "Could not attach callbacks to workflow orchestrator: %s", e + ) return workflow @@ -174,7 +184,10 @@ async def get_current_or_new_orchestration( if current is not None and team_switched: # Close prior agents (skip ProxyAgent if desired) for agent in getattr(current, "_participants", {}).values(): - if getattr(agent, "agent_name", getattr(agent, "name", "")) != "ProxyAgent": + if ( + getattr(agent, "agent_name", getattr(agent, "name", "")) + != "ProxyAgent" + ): close_coro = getattr(agent, "close", None) if callable(close_coro): try: @@ -183,10 +196,14 @@ async def get_current_or_new_orchestration( cls.logger.error("Error closing agent: %s", e) # Build new participants via existing factory) - from af.magentic_agents.magentic_agent_factory import MagenticAgentFactory # local import to avoid circular + from af.magentic_agents.magentic_agent_factory import ( + MagenticAgentFactory, + ) # local import to avoid circular factory = MagenticAgentFactory() - agents = await factory.get_agents(user_id=user_id, team_config_input=team_config) + agents = await factory.get_agents( + user_id=user_id, team_config_input=team_config + ) orchestration_config.orchestrations[user_id] = await cls.init_orchestration( agents, user_id ) @@ -225,7 +242,9 @@ async def run_orchestration(self, user_id: str, input_task) -> None: try: # Invoke orchestrator; API may be workflow.invoke(task=..., chat_options=...) - result_msg: ChatMessage = await workflow.invoke(task=task_text, chat_options=chat_options) + result_msg: ChatMessage = await workflow.invoke( + task=task_text, chat_options=chat_options + ) final_text = result_msg.text if result_msg else "" self.logger.info("Final result:\n%s", final_text) @@ -245,4 +264,4 @@ async def run_orchestration(self, user_id: str, input_task) -> None: ) self.logger.info("Final result sent via WebSocket to user %s", user_id) except Exception as e: # noqa: BLE001 - self.logger.error("Unexpected orchestration error: %s", e) \ No newline at end of file + self.logger.error("Unexpected orchestration error: %s", e)