Skip to content

Commit 065f152

Browse files
committed
clean up user safe websocket calling across contexts
1 parent 67733d3 commit 065f152

File tree

3 files changed

+51
-41
lines changed

3 files changed

+51
-41
lines changed

src/backend/v3/callbacks/response_handlers.py

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,61 +9,61 @@
99
StreamingChatMessageContent)
1010
from v3.config.settings import connection_config, current_user_id
1111

12-
coderagent = False
1312

14-
# Module-level variable to store current user_id
15-
_current_user_id: str = None
16-
17-
def set_user_context(user_id: str):
18-
"""Set the user context for callbacks in this module."""
19-
global _current_user_id
20-
_current_user_id = user_id
21-
22-
def agent_response_callback(message: ChatMessageContent) -> None:
13+
def agent_response_callback(message: ChatMessageContent, user_id: str = None) -> None:
2314
"""Observer function to print detailed information about streaming messages."""
24-
global coderagent
2515
# import sys
2616

2717
# Get agent name to determine handling
2818
agent_name = message.name or "Unknown Agent"
2919

20+
3021
# Debug information about the message
31-
message_type = type(message).__name__
32-
metadata = getattr(message, 'metadata', {})
33-
# when streaming code - list the coder info first once -
34-
if 'code' in metadata and metadata['code'] is True:
35-
if coderagent == False:
36-
print(f"\n🧠 **{agent_name}** [{message_type}]")
37-
print("-" * (len(agent_name) + len(message_type) + 10))
38-
coderagent = True
39-
print(message.content, end='', flush=False)
40-
return
41-
elif coderagent == True:
42-
coderagent = False
22+
# message_type = type(message).__name__
23+
# metadata = getattr(message, 'metadata', {})
24+
# # when streaming code - list the coder info first once -
25+
# if 'code' in metadata and metadata['code'] is True:
26+
# if coderagent == False:
27+
# print(f"\n **{agent_name}** [{message_type}]")
28+
# print("-" * (len(agent_name) + len(message_type) + 10))
29+
# coderagent = True
30+
# print(message.content, end='', flush=False)
31+
# return
32+
# elif coderagent == True:
33+
# coderagent = False
4334

4435
role = getattr(message, 'role', 'unknown')
4536

4637
# Send to WebSocket
47-
if _current_user_id:
38+
if user_id:
4839
try:
4940
asyncio.create_task(connection_config.send_status_update_async({
5041
"type": "agent_message",
5142
"data": {"agent_name": agent_name, "content": message.content, "role": role}
52-
}, _current_user_id))
43+
}, user_id))
5344
except Exception as e:
5445
print(f"Error sending WebSocket message: {e}")
5546

56-
print(f"\n🧠 **{agent_name}** [{message_type}] (role: {role})")
57-
print("-" * (len(agent_name) + len(message_type) + 10))
47+
print(f"\n **{agent_name}** (role: {role})")
48+
5849
if message.items[-1].content_type == 'function_call':
5950
print(f"🔧 Function call: {message.items[-1].function_name}, Arguments: {message.items[-1].arguments}")
60-
if metadata:
61-
print(f"📋 Metadata: {metadata}")
51+
6252

6353
# Add this function after your agent_response_callback function
64-
async def streaming_agent_response_callback(streaming_message: StreamingChatMessageContent, is_final: bool) -> None:
54+
async def streaming_agent_response_callback(streaming_message: StreamingChatMessageContent, is_final: bool, user_id: str = None) -> None:
6555
"""Simple streaming callback to show real-time agent responses."""
6656
if streaming_message.name != "CoderAgent":
6757
# Print streaming content as it arrives
6858
if hasattr(streaming_message, 'content') and streaming_message.content:
6959
print(streaming_message.content, end='', flush=False)
60+
61+
# Send to WebSocket
62+
if user_id:
63+
try:
64+
await connection_config.send_status_update_async({
65+
"type": "streaming_message",
66+
"data": {"agent_name": streaming_message.name or "Unknown Agent", "content": streaming_message.content, "is_final": is_final}
67+
}, user_id)
68+
except Exception as e:
69+
print(f"Error sending streaming WebSocket message: {e}")

src/backend/v3/magentic_agents/proxy_agent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ async def invoke(self, message: str,*, thread: AgentThread | None = None,**kwarg
102102
expected_type=DummyAgentThread,
103103
)
104104
# Replace with websocket call when available
105-
print(f"\n🤔 ProxyAgent: Another agent is asking for clarification about:")
105+
print(f"\nProxyAgent: Another agent is asking for clarification about:")
106106
print(f" Request: {message}")
107107
print("-" * 60)
108108

src/backend/v3/orchestration/orchestration_manager.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@
1414
# Create custom execution settings to fix schema issues
1515
from semantic_kernel.connectors.ai.open_ai import (
1616
AzureChatCompletion, OpenAIChatPromptExecutionSettings)
17+
from semantic_kernel.contents import (ChatMessageContent,
18+
StreamingChatMessageContent)
1719
from v3.callbacks.response_handlers import (agent_response_callback,
18-
set_user_context,
1920
streaming_agent_response_callback)
2021
from v3.config.settings import config, current_user_id, orchestration_config
2122
from v3.magentic_agents.magentic_agent_factory import MagenticAgentFactory
@@ -32,7 +33,7 @@ def __init__(self):
3233
self.user_id: Optional[str] = None
3334

3435
@classmethod
35-
async def init_orchestration(cls, agents: List)-> MagenticOrchestration:
36+
async def init_orchestration(cls, agents: List, user_id: str = None)-> MagenticOrchestration:
3637
"""Main function to run the agents."""
3738

3839
# Custom execution settings that should work with Azure OpenAI
@@ -59,28 +60,39 @@ def get_token():
5960
),
6061
execution_settings=execution_settings
6162
),
62-
agent_response_callback=agent_response_callback,
63-
streaming_agent_response_callback=streaming_agent_response_callback, # Add streaming callback
63+
agent_response_callback=cls._user_aware_agent_callback(user_id),
64+
streaming_agent_response_callback=cls._user_aware_streaming_callback(user_id)
6465
)
6566
return magentic_orchestration
6667

68+
@staticmethod
69+
def _user_aware_agent_callback(user_id: str):
70+
"""Factory method that creates a callback with captured user_id"""
71+
def callback(message: ChatMessageContent):
72+
return agent_response_callback(message, user_id)
73+
return callback
74+
75+
@staticmethod
76+
def _user_aware_streaming_callback(user_id: str):
77+
"""Factory method that creates a streaming callback with captured user_id"""
78+
async def callback(streaming_message: StreamingChatMessageContent, is_final: bool):
79+
return await streaming_agent_response_callback(streaming_message, is_final, user_id)
80+
return callback
81+
6782
@classmethod
6883
async def get_current_or_new_orchestration(self, user_id: str, team_config: TeamConfiguration) -> MagenticOrchestration:
6984
"""get existing orchestration instance."""
7085
current_orchestration = orchestration_config.get_current_orchestration(user_id)
7186
if current_orchestration is None:
7287
factory = MagenticAgentFactory()
73-
# to do: change to parsing teams from cosmos db
7488
agents = await factory.get_agents(team_config_input=team_config)
75-
orchestration_config.orchestrations[user_id] = await self.init_orchestration(agents)
89+
orchestration_config.orchestrations[user_id] = await self.init_orchestration(agents, user_id)
7690
return orchestration_config.get_current_orchestration(user_id)
7791

7892
async def run_orchestration(self, user_id, input_task) -> None:
7993
""" Run the orchestration with user input loop."""
8094
token = current_user_id.set(user_id)
8195

82-
set_user_context(user_id)
83-
8496
job_id = str(uuid.uuid4())
8597
orchestration_config.approvals[job_id] = None
8698

@@ -90,9 +102,7 @@ async def run_orchestration(self, user_id, input_task) -> None:
90102
raise ValueError("Orchestration not initialized for user.")
91103

92104
try:
93-
# ADD THIS: Set user_id on the approval manager before invoke
94105
if hasattr(magentic_orchestration, '_manager') and hasattr(magentic_orchestration._manager, 'current_user_id'):
95-
#object.__setattr__(magentic_orchestration._manager, 'current_user_id', user_id)
96106
magentic_orchestration._manager.current_user_id = user_id
97107
print(f"🔍 DEBUG: Set user_id on manager = {user_id}")
98108
except Exception as e:

0 commit comments

Comments
 (0)