Skip to content

Commit 891853a

Browse files
Refactor user ID handling across multiple modules to remove context variable dependency
1 parent 1540324 commit 891853a

File tree

6 files changed

+38
-46
lines changed

6 files changed

+38
-46
lines changed

src/backend/v3/callbacks/response_handlers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
from semantic_kernel.contents import (ChatMessageContent,
1212
StreamingChatMessageContent)
13-
from v3.config.settings import connection_config, current_user_id
13+
from v3.config.settings import connection_config
1414
from v3.models.messages import (AgentMessage, AgentMessageStreaming,
1515
AgentToolCall, AgentToolMessage, WebsocketMessageType)
1616

src/backend/v3/config/settings.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
"""
55

66
import asyncio
7-
import contextvars
87
import json
98
import logging
109
from typing import Dict, Optional
@@ -19,11 +18,6 @@
1918

2019
logger = logging.getLogger(__name__)
2120

22-
# Create a context variable to track current user
23-
current_user_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar(
24-
"current_user_id", default=None
25-
)
26-
2721

2822
class AzureConfig:
2923
"""Azure OpenAI and authentication configuration."""
@@ -177,13 +171,10 @@ async def close_connection(self, process_id):
177171
async def send_status_update_async(
178172
self,
179173
message: any,
180-
user_id: Optional[str] = None,
174+
user_id: str,
181175
message_type: WebsocketMessageType = WebsocketMessageType.SYSTEM_MESSAGE,
182176
):
183177
"""Send a status update to a specific client."""
184-
# If no process_id provided, get from context
185-
if user_id is None:
186-
user_id = current_user_id.get()
187178

188179
if not user_id:
189180
logger.warning("No user_id available for WebSocket message")

src/backend/v3/magentic_agents/magentic_agent_factory.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
from common.config.app_config import config
1212
from common.models.messages_kernel import TeamConfiguration
13-
from v3.config.settings import current_user_id
1413
from v3.magentic_agents.foundry_agent import FoundryAgentTemplate
1514
from v3.magentic_agents.models.agent_models import MCPConfig, SearchConfig
1615
# from v3.magentic_agents.models.agent_models import (BingConfig, MCPConfig,
@@ -42,12 +41,13 @@ def __init__(self):
4241
# with open(file_path, 'r') as f:
4342
# data = json.load(f)
4443
# return json.loads(json.dumps(data), object_hook=lambda d: SimpleNamespace(**d))
45-
46-
async def create_agent_from_config(self, agent_obj: SimpleNamespace) -> Union[FoundryAgentTemplate, ReasoningAgentTemplate, ProxyAgent]:
44+
45+
async def create_agent_from_config(self, user_id: str, agent_obj: SimpleNamespace) -> Union[FoundryAgentTemplate, ReasoningAgentTemplate, ProxyAgent]:
4746
"""
4847
Create an agent from configuration object.
4948
5049
Args:
50+
user_id: User ID
5151
agent_obj: Agent object from parsed JSON (SimpleNamespace)
5252
team_model: Model name to determine which template to use
5353
@@ -63,7 +63,6 @@ async def create_agent_from_config(self, agent_obj: SimpleNamespace) -> Union[Fo
6363

6464
if not deployment_name and agent_obj.name.lower() == "proxyagent":
6565
self.logger.info("Creating ProxyAgent")
66-
user_id = current_user_id.get()
6766
return ProxyAgent(user_id=user_id)
6867

6968
# Validate supported models
@@ -124,11 +123,12 @@ async def create_agent_from_config(self, agent_obj: SimpleNamespace) -> Union[Fo
124123
self.logger.info(f"Successfully created and initialized agent '{agent_obj.name}'")
125124
return agent
126125

127-
async def get_agents(self, team_config_input: TeamConfiguration) -> List:
126+
async def get_agents(self, user_id: str, team_config_input: TeamConfiguration) -> List:
128127
"""
129128
Create and return a team of agents from JSON configuration.
130129
131130
Args:
131+
user_id: User ID
132132
team_config_input: team configuration object from cosmos db
133133
134134
Returns:
@@ -143,8 +143,8 @@ async def get_agents(self, team_config_input: TeamConfiguration) -> List:
143143
for i, agent_cfg in enumerate(team_config_input.agents, 1):
144144
try:
145145
self.logger.info(f"Creating agent {i}/{len(team_config_input.agents)}: {agent_cfg.name}")
146-
147-
agent = await self.create_agent_from_config(agent_cfg)
146+
147+
agent = await self.create_agent_from_config(user_id, agent_cfg)
148148
initalized_agents.append(agent)
149149
self._agent_list.append(agent) # Keep track for cleanup
150150

src/backend/v3/magentic_agents/proxy_agent.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@
2121
from typing_extensions import override
2222
from v3.callbacks.response_handlers import (agent_response_callback,
2323
streaming_agent_response_callback)
24-
from v3.config.settings import (connection_config, current_user_id,
25-
orchestration_config)
24+
from v3.config.settings import connection_config, orchestration_config
2625
from v3.models.messages import (UserClarificationRequest,
2726
UserClarificationResponse, WebsocketMessageType)
2827

@@ -94,11 +93,11 @@ class ProxyAgent(Agent):
9493
"""Simple proxy agent that prompts for human clarification."""
9594

9695
# Declare as Pydantic field
97-
user_id: Optional[str] = Field(default=None, description="User ID for WebSocket messaging")
96+
user_id: str = Field(default=None, description="User ID for WebSocket messaging")
9897

99-
def __init__(self, user_id: str = None, **kwargs):
100-
# Get user_id from parameter or context, fallback to empty string
101-
effective_user_id = user_id or current_user_id.get() or ""
98+
def __init__(self, user_id: str, **kwargs):
99+
# Get user_id from parameter, fallback to empty string
100+
effective_user_id = user_id or ""
102101
super().__init__(
103102
name="ProxyAgent",
104103
description="Call this agent when you need to clarify requests by asking the human user for more information. Ask it for more details about any unclear requirements, missing information, or if you need the user to elaborate on any aspect of the task.",
@@ -119,15 +118,15 @@ def _create_message_content(self, content: str, thread_id: str = None) -> ChatMe
119118
async def _trigger_response_callbacks(self, message_content: ChatMessageContent):
120119
"""Manually trigger the same response callbacks used by other agents."""
121120
# Get current user_id dynamically instead of using stored value
122-
current_user = current_user_id.get() or self.user_id or ""
121+
current_user = self.user_id or ""
123122

124123
# Trigger the standard agent response callback
125124
agent_response_callback(message_content, current_user)
126125

127126
async def _trigger_streaming_callbacks(self, content: str, is_final: bool = False):
128127
"""Manually trigger streaming callbacks for real-time updates."""
129128
# Get current user_id dynamically instead of using stored value
130-
current_user = current_user_id.get() or self.user_id or ""
129+
current_user = self.user_id or ""
131130
streaming_message = StreamingChatMessageContent(
132131
role=AuthorRole.ASSISTANT,
133132
content=content,
@@ -158,7 +157,7 @@ async def invoke(self, message: str,*, thread: AgentThread | None = None,**kwarg
158157
await connection_config.send_status_update_async({
159158
"type": WebsocketMessageType.USER_CLARIFICATION_REQUEST,
160159
"data": clarification_message
161-
}, user_id=current_user_id.get(), message_type=WebsocketMessageType.USER_CLARIFICATION_REQUEST)
160+
}, user_id=self.user_id, message_type=WebsocketMessageType.USER_CLARIFICATION_REQUEST)
162161

163162
# Get human input
164163
human_response = await self._wait_for_user_clarification(clarification_message.request_id)
@@ -206,7 +205,7 @@ async def invoke_stream(self, messages, thread=None, **kwargs) -> AsyncIterator[
206205
await connection_config.send_status_update_async({
207206
"type": WebsocketMessageType.USER_CLARIFICATION_REQUEST,
208207
"data": clarification_message
209-
}, user_id=current_user_id.get(), message_type=WebsocketMessageType.USER_CLARIFICATION_REQUEST)
208+
}, user_id=self.user_id, message_type=WebsocketMessageType.USER_CLARIFICATION_REQUEST)
210209

211210
# Get human input - replace with websocket call when available
212211
human_response = await self._wait_for_user_clarification(clarification_message.request_id)

src/backend/v3/orchestration/human_approval_manager.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616
ORCHESTRATOR_FINAL_ANSWER_PROMPT, ORCHESTRATOR_TASK_LEDGER_PLAN_PROMPT,
1717
ORCHESTRATOR_TASK_LEDGER_PLAN_UPDATE_PROMPT)
1818
from semantic_kernel.contents import ChatMessageContent
19-
from v3.config.settings import (connection_config, current_user_id,
20-
orchestration_config)
19+
from v3.config.settings import connection_config, orchestration_config
2120
from v3.models.models import MPlan, MStep
2221
from v3.orchestration.helper.plan_to_mplan_converter import \
2322
PlanToMPlanConverter
@@ -35,9 +34,17 @@ class HumanApprovalMagenticManager(StandardMagenticManager):
3534
# Define Pydantic fields to avoid validation errors
3635
approval_enabled: bool = True
3736
magentic_plan: Optional[MPlan] = None
38-
current_user_id: Optional[str] = None
37+
current_user_id: str
38+
39+
def __init__(self, user_id: str, *args, **kwargs):
40+
"""
41+
Initialize the HumanApprovalMagenticManager.
42+
Args:
43+
user_id: ID of the user to associate with this orchestration instance.
44+
*args: Additional positional arguments for the parent StandardMagenticManager.
45+
**kwargs: Additional keyword arguments for the parent StandardMagenticManager.
46+
"""
3947

40-
def __init__(self, *args, **kwargs):
4148
# Remove any custom kwargs before passing to parent
4249

4350
plan_append = """
@@ -70,6 +77,8 @@ def __init__(self, *args, **kwargs):
7077
kwargs['task_ledger_plan_update_prompt'] = ORCHESTRATOR_TASK_LEDGER_PLAN_UPDATE_PROMPT + plan_append
7178
kwargs['final_answer_prompt'] = ORCHESTRATOR_FINAL_ANSWER_PROMPT + final_append
7279

80+
kwargs['current_user_id'] = user_id
81+
7382
super().__init__(*args, **kwargs)
7483

7584
async def plan(self, magentic_context: MagenticContext) -> Any:
@@ -94,7 +103,7 @@ async def plan(self, magentic_context: MagenticContext) -> Any:
94103

95104
self.magentic_plan = self.plan_to_obj( magentic_context, self.task_ledger)
96105

97-
self.magentic_plan.user_id = current_user_id.get()
106+
self.magentic_plan.user_id = self.current_user_id
98107

99108
# Request approval from the user before executing the plan
100109
approval_message = messages.PlanApprovalRequest(
@@ -115,7 +124,7 @@ async def plan(self, magentic_context: MagenticContext) -> Any:
115124
# The user_id will be automatically retrieved from context
116125
await connection_config.send_status_update_async(
117126
message=approval_message,
118-
user_id=current_user_id.get(),
127+
user_id=self.current_user_id,
119128
message_type=messages.WebsocketMessageType.PLAN_APPROVAL_REQUEST)
120129

121130
# Wait for user approval
@@ -129,7 +138,7 @@ async def plan(self, magentic_context: MagenticContext) -> Any:
129138
await connection_config.send_status_update_async({
130139
"type": messages.WebsocketMessageType.PLAN_APPROVAL_RESPONSE,
131140
"data": approval_response
132-
}, user_id=current_user_id.get(), message_type=messages.WebsocketMessageType.PLAN_APPROVAL_RESPONSE)
141+
}, user_id=self.current_user_id, message_type=messages.WebsocketMessageType.PLAN_APPROVAL_RESPONSE)
133142
raise Exception("Plan execution cancelled by user")
134143

135144
async def replan(self,magentic_context: MagenticContext) -> Any:
@@ -154,7 +163,7 @@ async def create_progress_ledger(self, magentic_context: MagenticContext) -> Pro
154163

155164
await connection_config.send_status_update_async(
156165
message= final_message,
157-
user_id=current_user_id.get(),
166+
user_id=self.current_user_id,
158167
message_type=messages.WebsocketMessageType.FINAL_RESULT_MESSAGE)
159168

160169
return ProgressLedger(

src/backend/v3/orchestration/orchestration_manager.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,9 @@
22
""" Orchestration manager to handle the orchestration logic. """
33

44
import asyncio
5-
import contextvars
65
import logging
76
import os
87
import uuid
9-
from contextvars import ContextVar
108
from typing import List, Optional
119

1210
from azure.identity import DefaultAzureCredential as SyncDefaultAzureCredential
@@ -21,16 +19,12 @@
2119
StreamingChatMessageContent)
2220
from v3.callbacks.response_handlers import (agent_response_callback,
2321
streaming_agent_response_callback)
24-
from v3.config.settings import (config, connection_config, current_user_id,
25-
orchestration_config)
22+
from v3.config.settings import config, connection_config, orchestration_config
2623
from v3.magentic_agents.magentic_agent_factory import MagenticAgentFactory
2724
from v3.models.messages import WebsocketMessageType
2825
from v3.orchestration.human_approval_manager import \
2926
HumanApprovalMagenticManager
3027

31-
# Context variable to hold the current user ID
32-
current_user_id: ContextVar[Optional[str]] = contextvars.ContextVar("current_user_id", default=None)
33-
3428
class OrchestrationManager:
3529
"""Manager for handling orchestration logic."""
3630

@@ -62,6 +56,7 @@ def get_token():
6256
magentic_orchestration = MagenticOrchestration(
6357
members=agents,
6458
manager=HumanApprovalMagenticManager(
59+
user_id=user_id,
6560
chat_completion_service=AzureChatCompletion(
6661
deployment_name=config.AZURE_OPENAI_DEPLOYMENT_NAME,
6762
endpoint=config.AZURE_OPENAI_ENDPOINT,
@@ -101,13 +96,12 @@ async def get_current_or_new_orchestration(cls, user_id: str, team_config: TeamC
10196
except Exception as e:
10297
cls.logger.error("Error closing agent: %s", e)
10398
factory = MagenticAgentFactory()
104-
agents = await factory.get_agents(team_config_input=team_config)
99+
agents = await factory.get_agents(user_id=user_id, team_config_input=team_config)
105100
orchestration_config.orchestrations[user_id] = await cls.init_orchestration(agents, user_id)
106101
return orchestration_config.get_current_orchestration(user_id)
107102

108103
async def run_orchestration(self, user_id, input_task) -> None:
109104
""" Run the orchestration with user input loop."""
110-
token = current_user_id.set(user_id)
111105

112106
job_id = str(uuid.uuid4())
113107
orchestration_config.approvals[job_id] = None
@@ -161,5 +155,4 @@ async def run_orchestration(self, user_id, input_task) -> None:
161155
self.logger.error(f"Unexpected error: {e}")
162156
finally:
163157
await runtime.stop_when_idle()
164-
current_user_id.reset(token)
165158

0 commit comments

Comments
 (0)