Skip to content

Commit 6039bf7

Browse files
Merge pull request #437 from microsoft/macae-v3-dev-marktayl
feat: Macae v3 dev marktayl
2 parents 4016f9b + 7d43fb4 commit 6039bf7

File tree

5 files changed

+92
-49
lines changed

5 files changed

+92
-49
lines changed

src/backend/v3/api/router.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -306,26 +306,48 @@ async def plan_approval(human_feedback: messages.PlanApprovalResponse, request:
306306
status_code=401, detail="Missing or invalid user information"
307307
)
308308
# Set the approval in the orchestration config
309-
if user_id and human_feedback.plan_id:
310-
if orchestration_config and human_feedback.plan_id in orchestration_config.approvals:
311-
orchestration_config.approvals[human_feedback.plan_id] = human_feedback.approved
309+
if user_id and human_feedback.plan_dot_id:
310+
if orchestration_config and human_feedback.plan_dot_id in orchestration_config.approvals:
311+
orchestration_config.approvals[human_feedback.plan_dot_id] = human_feedback.approved
312312
track_event_if_configured(
313313
"PlanApprovalReceived",
314314
{
315-
"plan_id": human_feedback.plan_id,
315+
"plan_id": human_feedback.plan_dot_id,
316316
"approved": human_feedback.approved,
317317
"user_id": user_id,
318318
"feedback": human_feedback.feedback
319319
},
320320
)
321321
return {"status": "approval recorded"}
322322
else:
323-
logging.warning(f"No orchestration or plan found for plan_id: {human_feedback.plan_id}")
323+
logging.warning(f"No orchestration or plan found for plan_id: {human_feedback.plan_dot_id}")
324324
raise HTTPException(status_code=404, detail="No active plan found for approval")
325325

326326
@app_v3.post("/user_clarification")
327327
async def user_clarification(human_feedback: messages.UserClarificationResponse, request: Request):
328-
pass
328+
""" Endpoint to receive plan approval or rejection from the user. """
329+
authenticated_user = get_authenticated_user_details(request_headers=request.headers)
330+
user_id = authenticated_user["user_principal_id"]
331+
if not user_id:
332+
raise HTTPException(
333+
status_code=401, detail="Missing or invalid user information"
334+
)
335+
# Set the approval in the orchestration config
336+
if user_id and human_feedback.request_id:
337+
if orchestration_config and human_feedback.request_id in orchestration_config.clarifications:
338+
orchestration_config.clarifications[human_feedback.request_id] = human_feedback.answer
339+
track_event_if_configured(
340+
"PlanApprovalReceived",
341+
{
342+
"request_id": human_feedback.request_id,
343+
"answer": human_feedback.answer,
344+
"user_id": user_id,
345+
},
346+
)
347+
return {"status": "clarification recorded"}
348+
else:
349+
logging.warning(f"No orchestration or plan found for request_id: {human_feedback.request_id}")
350+
raise HTTPException(status_code=404, detail="No active plan found for clarification")
329351

330352

331353
@app_v3.post("/upload_team_config")

src/backend/v3/config/settings.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import contextvars
88
import json
99
import logging
10-
import uuid
1110
from typing import Dict, Optional
1211

1312
from common.config.app_config import config
@@ -76,10 +75,11 @@ class OrchestrationConfig:
7675
"""Configuration for orchestration settings."""
7776

7877
def __init__(self):
79-
self.orchestrations = {}
80-
self.plans = {} # job_id -> current plan
81-
self.approvals = {} # job_id -> True/False/None
82-
self.sockets = {} # job_id -> WebSocket
78+
self.orchestrations: Dict[str, MagenticOrchestration] = {} # user_id -> orchestration instance
79+
self.plans: Dict[str, any] = {} # plan_id -> plan details
80+
self.approvals: Dict[str, bool] = {} # plan_dot_id -> approval status
81+
self.sockets: Dict[str, WebSocket] = {} # user_id -> WebSocket
82+
self.clarifications: Dict[str, str] = {} # plan_dot_id -> clarification response
8383

8484
def get_current_orchestration(self, user_id: str) -> MagenticOrchestration:
8585
"""get existing orchestration instance."""

src/backend/v3/magentic_agents/proxy_agent.py

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Copyright (c) Microsoft. All rights reserved.
22
""" Proxy agent that prompts for human clarification."""
33

4+
import asyncio
45
import logging
56
import uuid
67
from collections.abc import AsyncIterable
@@ -20,7 +21,8 @@
2021
from typing_extensions import override
2122
from v3.callbacks.response_handlers import (agent_response_callback,
2223
streaming_agent_response_callback)
23-
from v3.config.settings import current_user_id
24+
from v3.config.settings import (connection_config, current_user_id,
25+
orchestration_config)
2426
from v3.models.messages import (UserClarificationRequest,
2527
UserClarificationResponse)
2628

@@ -144,24 +146,30 @@ async def invoke(self, message: str,*, thread: AgentThread | None = None,**kwarg
144146
construct_thread=lambda: DummyAgentThread(),
145147
expected_type=DummyAgentThread,
146148
)
147-
# Send clarification request via response handlers
149+
150+
# Send clarification request via streaming callbacks
148151
clarification_request = f"I need clarification about: {message}"
152+
153+
clarification_message = UserClarificationRequest(
154+
question=clarification_request,
155+
request_id=str(uuid.uuid4()) # Unique ID for the request
156+
)
157+
158+
# Send the approval request to the user's WebSocket
159+
await connection_config.send_status_update_async({
160+
"type": "user_clarification_request",
161+
"data": clarification_message
162+
})
149163

150-
clarification_message = self._create_message_content(clarification_request, thread.id)
151-
await self._trigger_response_callbacks(clarification_message)
152-
153-
# Get human input - replace this with awaiting a websocket call or API handler when available
154-
human_response = input("Please provide clarification: ").strip()
164+
# Get human input
165+
human_response = await self._wait_for_user_clarification(clarification_message.request_id)
155166

156167
if not human_response:
157168
human_response = "No additional clarification provided."
158169

159170
response = f"Human clarification: {human_response}"
160171

161-
# Send response via response handlers
162-
response_message = self._create_message_content(response, thread.id)
163-
164-
chat_message = response_message
172+
chat_message = self._create_message_content(response, thread.id)
165173

166174
yield AgentResponseItem(
167175
message=chat_message,
@@ -188,11 +196,23 @@ async def invoke_stream(self, messages, thread=None, **kwargs) -> AsyncIterator[
188196

189197
# Send clarification request via streaming callbacks
190198
clarification_request = f"I need clarification about: {message}"
191-
self._create_message_content(clarification_request, thread.id)
192-
await self._trigger_streaming_callbacks(clarification_request)
199+
#self._create_message_content(clarification_request, thread.id)
200+
# await self._trigger_streaming_callbacks(clarification_request)
201+
202+
clarification_message = UserClarificationRequest(
203+
question=clarification_request,
204+
request_id=str(uuid.uuid4()) # Unique ID for the request
205+
)
206+
207+
# Send the approval request to the user's WebSocket
208+
# The user_id will be automatically retrieved from context
209+
await connection_config.send_status_update_async({
210+
"type": "user_clarification_request",
211+
"data": clarification_message
212+
})
193213

194214
# Get human input - replace with websocket call when available
195-
human_response = input("Please provide clarification: ").strip()
215+
human_response = await self._wait_for_user_clarification(clarification_message.request_id)
196216

197217
if not human_response:
198218
human_response = "No additional clarification provided."
@@ -206,6 +226,15 @@ async def invoke_stream(self, messages, thread=None, **kwargs) -> AsyncIterator[
206226
thread=thread
207227
)
208228

229+
async def _wait_for_user_clarification(self, request_id:str) -> Optional[UserClarificationResponse]:
230+
"""Wait for user clarification response."""
231+
# To do: implement timeout and error handling
232+
if request_id not in orchestration_config.clarifications:
233+
orchestration_config.clarifications[request_id] = None
234+
while orchestration_config.clarifications[request_id] is None:
235+
await asyncio.sleep(0.2)
236+
return UserClarificationResponse(request_id=request_id,answer=orchestration_config.clarifications[request_id])
237+
209238
async def get_response(self, chat_history, **kwargs):
210239
"""Get response from the agent - required by Agent base class."""
211240
# Extract the latest user message
@@ -220,6 +249,7 @@ async def get_response(self, chat_history, **kwargs):
220249
role=AuthorRole.ASSISTANT,
221250
content="No clarification provided."
222251
)
252+
223253

224254
async def create_proxy_agent(user_id: str = None):
225255
"""Factory function for human proxy agent."""

src/backend/v3/models/messages.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Messages from the backend to the frontend via WebSocket."""
22

3+
import uuid
34
from dataclasses import dataclass
45
from typing import Any, Dict, List, Literal, Optional
56

@@ -48,7 +49,7 @@ class PlanApprovalRequest:
4849
@dataclass(slots=True)
4950
class PlanApprovalResponse:
5051
"""Response for plan approval from the frontend."""
51-
plan_id: str
52+
plan_dot_id: str
5253
approved: bool
5354
feedback: str | None = None
5455

@@ -70,20 +71,19 @@ class ReplanApprovalResponse:
7071
class UserClarificationRequest:
7172
"""Request for user clarification from the frontend."""
7273
question: str
73-
context: dict | None = None
74+
request_id: str
7475

7576
@dataclass(slots=True)
7677
class UserClarificationResponse:
7778
"""Response for user clarification from the frontend."""
78-
plan_id: str | None
79+
request_id: str
7980
answer: str = ""
8081

8182
@dataclass(slots=True)
8283
class FinalResultMessage:
8384
"""Final result message from the backend to the frontend."""
8485
result: str
8586
summary: str | None = None
86-
context: dict | None = None
8787

8888
@dataclass(slots=True)
8989
class HumanFeedback(KernelBaseModel):

src/backend/v3/orchestration/human_approval_manager.py

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ async def plan(self, magentic_context: MagenticContext) -> Any:
6767
plan = await super().plan(magentic_context)
6868
self.magentic_plan = self.plan_to_obj( magentic_context, self.task_ledger)
6969

70+
self.magentic_plan.user_id = current_user_id.get()
71+
7072
# Request approval from the user before executing the plan
7173
approval_message = messages.PlanApprovalRequest(
7274
plan=self.magentic_plan,
@@ -76,9 +78,6 @@ async def plan(self, magentic_context: MagenticContext) -> Any:
7678
"participant_descriptions": magentic_context.participant_descriptions
7779
} if hasattr(magentic_context, 'participant_descriptions') else {}
7880
)
79-
80-
# Send the current plan to the frontend via WebSocket
81-
#await connection_config.send_status_update_async(approval_message,)
8281

8382
# Send the approval request to the user's WebSocket
8483
# The user_id will be automatically retrieved from context
@@ -87,8 +86,8 @@ async def plan(self, magentic_context: MagenticContext) -> Any:
8786
"data": approval_message
8887
})
8988

90-
# Wait for user approval (you'll need to implement this)
91-
approval_response = await self._wait_for_user_approval()
89+
# Wait for user approval
90+
approval_response = await self._wait_for_user_approval(approval_message.plan.id)
9291

9392
if approval_response and approval_response.approved:
9493
print("Plan approved - proceeding with execution...")
@@ -106,23 +105,15 @@ async def plan(self, magentic_context: MagenticContext) -> Any:
106105
# )
107106

108107

109-
async def _wait_for_user_approval(self, plan_id: Optional[str] = None) -> Optional[messages.PlanApprovalResponse]: # plan_id will not be optional in future
108+
async def _wait_for_user_approval(self, plan_dot_id: Optional[str] = None) -> Optional[messages.PlanApprovalResponse]: # plan_id will not be optional in future
110109
"""Wait for user approval response."""
111-
# Temporarily use console input for approval - will switch to WebSocket or API in future
112-
# response = input("\nApprove this execution plan? [y/n]: ").strip().lower()
113-
# if response in ['y', 'yes']:
114-
# return messages.PlanApprovalResponse(approved=True, plan_id=plan_id if plan_id else "input")
115-
# elif response in ['n', 'no']:
116-
# return messages.PlanApprovalResponse(approved=False, plan_id=plan_id if plan_id else "input")
117-
# else:
118-
# print("Invalid input. Please enter 'y' for yes or 'n' for no.")
119-
# return await self._wait_for_user_approval()
120-
# In future, implement actual waiting for WebSocket or API response here
121-
if plan_id not in orchestration_config.approvals:
122-
orchestration_config.approvals[plan_id] = None
123-
while orchestration_config.approvals[plan_id] is None:
110+
111+
# To do: implement timeout and error handling
112+
if plan_dot_id not in orchestration_config.approvals:
113+
orchestration_config.approvals[plan_dot_id] = None
114+
while orchestration_config.approvals[plan_dot_id] is None:
124115
await asyncio.sleep(0.2)
125-
return messages.PlanApprovalResponse(approved=orchestration_config.approvals[plan_id], plan_id=plan_id)
116+
return messages.PlanApprovalResponse(approved=orchestration_config.approvals[plan_dot_id], plan_dot_id=plan_dot_id)
126117

127118

128119
async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatMessageContent:

0 commit comments

Comments
 (0)