Skip to content

Commit 76a43e8

Browse files
committed
Refactor WebSocket message handling and types
Standardizes WebSocket message types using the WebsocketMessageType enum across multiple modules. Refactors message sending to include explicit message type and data structure, and updates API endpoint for plan retrieval. Fixes and improves consistency in how messages are sent and handled throughout the backend.
1 parent 7884e82 commit 76a43e8

File tree

5 files changed

+27
-30
lines changed

5 files changed

+27
-30
lines changed

src/backend/v3/api/router.py

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1036,8 +1036,8 @@ async def get_plans(request: Request):
10361036

10371037

10381038
# Get plans is called in the initial side rendering of the frontend
1039-
@app_v3.get("/plan/{plan_id}")
1040-
async def get_plan_by_id(request: Request, plan_id: str):
1039+
@app_v3.get("/plan")
1040+
async def get_plan_by_id(request: Request, plan_id: str):
10411041
"""
10421042
Retrieve plans for the current user.
10431043
@@ -1133,21 +1133,8 @@ async def get_plan_by_id(request: Request, plan_id: str):
11331133
)
11341134

11351135
return [plan_with_steps, formatted_messages]
1136-
1137-
current_team = await memory_store.get_current_team(user_id=user_id)
1138-
if not current_team:
1139-
return []
1140-
1141-
all_plans = await memory_store.get_all_plans_by_team_id(team_id=current_team.id)
1142-
# Fetch steps for all plans concurrently
1143-
steps_for_all_plans = await asyncio.gather(
1144-
*[memory_store.get_steps_by_plan(plan_id=plan.id) for plan in all_plans]
1145-
)
1146-
# Create list of PlanWithSteps and update step counts
1147-
list_of_plans_with_steps = []
1148-
for plan, steps in zip(all_plans, steps_for_all_plans):
1149-
plan_with_steps = PlanWithSteps(**plan.model_dump(), steps=steps)
1150-
plan_with_steps.update_step_counts()
1151-
list_of_plans_with_steps.append(plan_with_steps)
1152-
1153-
return []
1136+
else:
1137+
track_event_if_configured(
1138+
"GetPlanId", {"status_code": 400, "detail": "no plan id"}
1139+
)
1140+
raise HTTPException(status_code=400, detail="no plan id")

src/backend/v3/callbacks/response_handlers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,6 @@ async def streaming_agent_response_callback(streaming_message: StreamingChatMess
5656
if user_id:
5757
try:
5858
message = AgentMessageStreaming(agent_name=streaming_message.name or "Unknown Agent", content=streaming_message.content, is_final=is_final)
59-
await connection_config.send_status_update_async(message, user_id)
59+
await connection_config.send_status_update_async(message, user_id, message_type=WebsocketMessageType.AGENT_MESSAGE_STREAMING)
6060
except Exception as e:
6161
logging.error(f"Response_callback: Error sending streaming WebSocket message: {e}")

src/backend/v3/config/settings.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,10 +199,20 @@ async def send_status_update_async(
199199
)
200200
return
201201

202+
try:
203+
if hasattr(message, "data") and hasattr(message, "type"):
204+
message = message.data
205+
except Exception as e:
206+
print(f"Error loading message data: {e}")
207+
208+
standard_message = {
209+
"type": message_type,
210+
"data": message
211+
}
202212
connection = self.get_connection(process_id)
203213
if connection:
204214
try:
205-
str_message = json.dumps(message, default=str)
215+
str_message = json.dumps(standard_message, default=str)
206216
await connection.send_text(str_message)
207217
logger.debug(f"Message sent to user {user_id} via process {process_id}")
208218
except Exception as e:

src/backend/v3/magentic_agents/proxy_agent.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from v3.config.settings import (connection_config, current_user_id,
2525
orchestration_config)
2626
from v3.models.messages import (UserClarificationRequest,
27-
UserClarificationResponse)
27+
UserClarificationResponse, WebsocketMessageType)
2828

2929

3030
class DummyAgentThread(AgentThread):
@@ -155,10 +155,10 @@ async def invoke(self, message: str,*, thread: AgentThread | None = None,**kwarg
155155

156156
# Send the approval request to the user's WebSocket
157157
await connection_config.send_status_update_async({
158-
"type": "user_clarification_request",
158+
"type": WebsocketMessageType.USER_CLARIFICATION_REQUEST,
159159
"data": clarification_message
160-
})
161-
160+
}, user_id=current_user_id.get(), message_type=WebsocketMessageType.USER_CLARIFICATION_REQUEST)
161+
162162
# Get human input
163163
human_response = await self._wait_for_user_clarification(clarification_message.request_id)
164164

@@ -203,10 +203,10 @@ async def invoke_stream(self, messages, thread=None, **kwargs) -> AsyncIterator[
203203
# Send the approval request to the user's WebSocket
204204
# The user_id will be automatically retrieved from context
205205
await connection_config.send_status_update_async({
206-
"type": "user_clarification_request",
206+
"type": WebsocketMessageType.USER_CLARIFICATION_REQUEST,
207207
"data": clarification_message
208-
})
209-
208+
}, user_id=current_user_id.get(), message_type=WebsocketMessageType.USER_CLARIFICATION_REQUEST)
209+
210210
# Get human input - replace with websocket call when available
211211
human_response = await self._wait_for_user_clarification(clarification_message.request_id)
212212

src/backend/v3/orchestration/orchestration_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ async def run_orchestration(self, user_id, input_task) -> None:
136136

137137
# Send final result via WebSocket
138138
await connection_config.send_status_update_async({
139-
"type": "final_result",
139+
"type": WebsocketMessageType.FINAL_RESULT_MESSAGE,
140140
"data": {
141141
"content": str(value),
142142
"status": "completed",

0 commit comments

Comments
 (0)