Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 52 additions & 15 deletions src/backend/af/orchestration/human_approval_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,12 @@ def __init__(self, user_id: str, *args, **kwargs):
DO NOT EVER OFFER TO HELP FURTHER IN THE FINAL ANSWER! Just provide the final answer and end with a polite closing.
"""

kwargs["task_ledger_plan_prompt"] = ORCHESTRATOR_TASK_LEDGER_PLAN_PROMPT + plan_append
kwargs["task_ledger_plan_update_prompt"] = ORCHESTRATOR_TASK_LEDGER_PLAN_UPDATE_PROMPT + plan_append
kwargs["task_ledger_plan_prompt"] = (
ORCHESTRATOR_TASK_LEDGER_PLAN_PROMPT + plan_append
)
kwargs["task_ledger_plan_update_prompt"] = (
ORCHESTRATOR_TASK_LEDGER_PLAN_UPDATE_PROMPT + plan_append
)
kwargs["final_answer_prompt"] = ORCHESTRATOR_FINAL_ANSWER_PROMPT + final_append
kwargs["current_user_id"] = user_id # retained for downstream usage if needed

Expand All @@ -90,7 +94,10 @@ async def plan(self, magentic_context: MagenticContext) -> Any:

logger.info(" Creating execution plan...")
plan_message = await super().plan(magentic_context)
logger.info(" Plan created (assistant message length=%d)", len(plan_message.text) if plan_message and plan_message.text else 0)
logger.info(
" Plan created (assistant message length=%d)",
len(plan_message.text) if plan_message and plan_message.text else 0,
)

# Build structured MPlan from task ledger
if self.task_ledger is None:
Expand Down Expand Up @@ -148,10 +155,15 @@ async def replan(self, magentic_context: MagenticContext) -> Any:
"""
logger.info("\nHuman-in-the-Loop Magentic Manager replanned:")
replan_message = await super().replan(magentic_context=magentic_context)
logger.info("Replanned message length: %d", len(replan_message.text) if replan_message and replan_message.text else 0)
logger.info(
"Replanned message length: %d",
len(replan_message.text) if replan_message and replan_message.text else 0,
)
return replan_message

async def create_progress_ledger(self, magentic_context: MagenticContext) -> ProgressLedger:
async def create_progress_ledger(
self, magentic_context: MagenticContext
) -> ProgressLedger:
"""
Check for max rounds exceeded and send final message if so, else defer to base.
"""
Expand All @@ -169,9 +181,13 @@ async def create_progress_ledger(self, magentic_context: MagenticContext) -> Pro
)

return ProgressLedger(
is_request_satisfied=ProgressLedgerItem(reason="Maximum rounds exceeded", answer=True),
is_request_satisfied=ProgressLedgerItem(
reason="Maximum rounds exceeded", answer=True
),
is_in_loop=ProgressLedgerItem(reason="Terminating", answer=False),
is_progress_being_made=ProgressLedgerItem(reason="Terminating", answer=False),
is_progress_being_made=ProgressLedgerItem(
reason="Terminating", answer=False
),
next_speaker=ProgressLedgerItem(reason="Task complete", answer=""),
instruction_or_question=ProgressLedgerItem(
reason="Task complete",
Expand Down Expand Up @@ -202,7 +218,10 @@ async def _wait_for_user_approval(
return messages.PlanApprovalResponse(approved=approved, m_plan_id=m_plan_id)

except asyncio.TimeoutError:
logger.debug("Approval timeout for plan %s - notifying user and terminating process", m_plan_id)
logger.debug(
"Approval timeout for plan %s - notifying user and terminating process",
m_plan_id,
)

timeout_message = messages.TimeoutNotification(
timeout_type="approval",
Expand All @@ -218,7 +237,11 @@ async def _wait_for_user_approval(
user_id=self.current_user_id,
message_type=messages.WebsocketMessageType.TIMEOUT_NOTIFICATION,
)
logger.info("Timeout notification sent to user %s for plan %s", self.current_user_id, m_plan_id)
logger.info(
"Timeout notification sent to user %s for plan %s",
self.current_user_id,
m_plan_id,
)
except Exception as e: # noqa: BLE001
logger.error("Failed to send timeout notification: %s", e)

Expand All @@ -235,16 +258,24 @@ async def _wait_for_user_approval(
return None

except Exception as e: # noqa: BLE001
logger.debug("Unexpected error waiting for approval: %s - terminating process silently", e)
logger.debug(
"Unexpected error waiting for approval: %s - terminating process silently",
e,
)
orchestration_config.cleanup_approval(m_plan_id)
return None

finally:
if m_plan_id in orchestration_config.approvals and orchestration_config.approvals[m_plan_id] is None:
if (
m_plan_id in orchestration_config.approvals
and orchestration_config.approvals[m_plan_id] is None
):
logger.debug("Final cleanup for pending approval plan %s", m_plan_id)
orchestration_config.cleanup_approval(m_plan_id)

async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatMessage:
async def prepare_final_answer(
self, magentic_context: MagenticContext
) -> ChatMessage:
"""
Override to ensure final answer is prepared after all steps are executed.
"""
Expand All @@ -253,8 +284,14 @@ async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatM

def plan_to_obj(self, magentic_context: MagenticContext, ledger) -> MPlan:
"""Convert the generated plan from the ledger into a structured MPlan object."""
if ledger is None or not hasattr(ledger, "plan") or not hasattr(ledger, "facts"):
raise ValueError("Invalid ledger structure; expected plan and facts attributes.")
if (
ledger is None
or not hasattr(ledger, "plan")
or not hasattr(ledger, "facts")
):
raise ValueError(
"Invalid ledger structure; expected plan and facts attributes."
)

task_text = getattr(magentic_context.task, "text", str(magentic_context.task))

Expand All @@ -265,4 +302,4 @@ def plan_to_obj(self, magentic_context: MagenticContext, ledger) -> MPlan:
task=task_text,
)

return return_plan
return return_plan
53 changes: 36 additions & 17 deletions src/backend/af/orchestration/orchestration_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@
import uuid
from typing import List, Optional, Callable, Awaitable

from common.config.app_config import config
from common.models.messages_af import TeamConfiguration

# agent_framework imports
from agent_framework import ChatMessage, Role, ChatOptions
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework._workflows import (
MagenticBuilder
)
from agent_framework import ChatMessage, ChatOptions
from agent_framework._workflows import MagenticBuilder
from agent_framework._workflows._magentic import AgentRunResponseUpdate # type: ignore


from common.config.app_config import config
from common.models.messages_af import TeamConfiguration

# Existing (legacy) callbacks expecting SK content; we'll adapt to them.
# If you've created af-native callbacks (e.g. response_handlers_af) import those instead.
from af.callbacks.response_handlers import (
Expand All @@ -40,15 +39,19 @@ def __init__(self):
# Internal callback adapters
# ---------------------------
@staticmethod
def _user_aware_agent_callback(user_id: str) -> Callable[[str, ChatMessage], Awaitable[None]]:
def _user_aware_agent_callback(
user_id: str,
) -> Callable[[str, ChatMessage], Awaitable[None]]:
"""Adapts agent_framework final agent ChatMessage to legacy agent_response_callback signature."""

async def _cb(agent_id: str, message: ChatMessage):
# Reuse existing callback expecting (ChatMessageContent, user_id). We pass text directly.
try:
agent_response_callback(message, user_id) # existing callback is sync
except Exception as e: # noqa: BLE001
logging.getLogger(__name__).error("agent_response_callback error: %s", e)
logging.getLogger(__name__).error(
"agent_response_callback error: %s", e
)

return _cb

Expand All @@ -72,7 +75,9 @@ def __init__(self, agent_id: str, update: AgentRunResponseUpdate):
try:
await streaming_agent_response_callback(shim, is_final, user_id)
except Exception as e: # noqa: BLE001
logging.getLogger(__name__).error("streaming_agent_response_callback error: %s", e)
logging.getLogger(__name__).error(
"streaming_agent_response_callback error: %s", e
)

return _cb

Expand Down Expand Up @@ -146,14 +151,19 @@ def get_token():
"_agent_response_callback",
cls._user_aware_agent_callback(user_id),
)
if getattr(orchestrator, "_streaming_agent_response_callback", None) is None:
if (
getattr(orchestrator, "_streaming_agent_response_callback", None)
is None
):
setattr(
orchestrator,
"_streaming_agent_response_callback",
cls._user_aware_streaming_callback(user_id),
)
except Exception as e: # noqa: BLE001
cls.logger.warning("Could not attach callbacks to workflow orchestrator: %s", e)
cls.logger.warning(
"Could not attach callbacks to workflow orchestrator: %s", e
)

return workflow

Expand All @@ -174,7 +184,10 @@ async def get_current_or_new_orchestration(
if current is not None and team_switched:
# Close prior agents (skip ProxyAgent if desired)
for agent in getattr(current, "_participants", {}).values():
if getattr(agent, "agent_name", getattr(agent, "name", "")) != "ProxyAgent":
if (
getattr(agent, "agent_name", getattr(agent, "name", ""))
!= "ProxyAgent"
):
close_coro = getattr(agent, "close", None)
if callable(close_coro):
try:
Expand All @@ -183,10 +196,14 @@ async def get_current_or_new_orchestration(
cls.logger.error("Error closing agent: %s", e)

# Build new participants via existing factory)
from af.magentic_agents.magentic_agent_factory import MagenticAgentFactory # local import to avoid circular
from af.magentic_agents.magentic_agent_factory import (
MagenticAgentFactory,
) # local import to avoid circular

factory = MagenticAgentFactory()
agents = await factory.get_agents(user_id=user_id, team_config_input=team_config)
agents = await factory.get_agents(
user_id=user_id, team_config_input=team_config
)
orchestration_config.orchestrations[user_id] = await cls.init_orchestration(
agents, user_id
)
Expand Down Expand Up @@ -225,7 +242,9 @@ async def run_orchestration(self, user_id: str, input_task) -> None:

try:
# Invoke orchestrator; API may be workflow.invoke(task=..., chat_options=...)
result_msg: ChatMessage = await workflow.invoke(task=task_text, chat_options=chat_options)
result_msg: ChatMessage = await workflow.invoke(
task=task_text, chat_options=chat_options
)

final_text = result_msg.text if result_msg else ""
self.logger.info("Final result:\n%s", final_text)
Expand All @@ -245,4 +264,4 @@ async def run_orchestration(self, user_id: str, input_task) -> None:
)
self.logger.info("Final result sent via WebSocket to user %s", user_id)
except Exception as e: # noqa: BLE001
self.logger.error("Unexpected orchestration error: %s", e)
self.logger.error("Unexpected orchestration error: %s", e)
65 changes: 0 additions & 65 deletions src/mcp_server/auth.py

This file was deleted.

Loading