Skip to content

Commit 1cde0d6

Browse files
committed
ws testing
1 parent 610cae8 commit 1cde0d6

File tree

10 files changed

+369
-111
lines changed

10 files changed

+369
-111
lines changed

src/backend/app_kernel.py

Lines changed: 136 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@
2323
from common.utils.websocket_streaming import (websocket_streaming_endpoint,
2424
ws_manager)
2525
# FastAPI imports
26-
from fastapi import FastAPI, HTTPException, Query, Request, WebSocket
26+
from fastapi import (FastAPI, HTTPException, Query, Request, WebSocket,
27+
WebSocketDisconnect)
2728
from fastapi.middleware.cors import CORSMiddleware
2829
from kernel_agents.agent_factory import AgentFactory
2930
# Local imports
3031
from middleware.health_check import HealthCheckMiddleware
3132
from v3.api.router import app_v3
33+
from v3.config.settings import connection_config
3234
# Semantic Kernel imports
3335
from v3.orchestration.orchestration_manager import OrchestrationManager
3436

@@ -69,7 +71,9 @@
6971
app.add_middleware(
7072
CORSMiddleware,
7173
allow_origins=[
72-
frontend_url
74+
"http://localhost:3000", # Add this for local development
75+
"https://localhost:3000", # Add this if using HTTPS locally
76+
"http://127.0.0.1:3000",
7377
], # Allow all origins for development; restrict in production
7478
allow_credentials=True,
7579
allow_methods=["*"],
@@ -84,10 +88,59 @@
8488

8589

8690
# WebSocket streaming endpoint
87-
@app.websocket("/ws/streaming")
88-
async def websocket_endpoint(websocket: WebSocket):
89-
"""WebSocket endpoint for real-time plan execution streaming"""
90-
await websocket_streaming_endpoint(websocket)
91+
# @app.websocket("/ws/streaming")
92+
# async def websocket_endpoint(websocket: WebSocket):
93+
# """WebSocket endpoint for real-time plan execution streaming"""
94+
# await websocket_streaming_endpoint(websocket)
95+
96+
# @app.websocket("/socket/{process_id}")
97+
# async def process_outputs(websocket: WebSocket, process_id: str):
98+
# """ Web-Socket endpoint for real-time process status updates. """
99+
100+
# # Always accept the WebSocket connection first
101+
# await websocket.accept()
102+
# connection_config.add_connection(process_id=process_id, connection=websocket)
103+
104+
@app.websocket("/socket/{process_id}")
105+
async def process_outputs(websocket: WebSocket, process_id: str):
106+
""" Web-Socket endpoint for real-time process status updates. """
107+
108+
# Always accept the WebSocket connection first
109+
await websocket.accept()
110+
111+
# user_id = None
112+
# try:
113+
# # WebSocket headers are different, try to get user info
114+
# headers = dict(websocket.headers)
115+
# authenticated_user = get_authenticated_user_details(request_headers=headers)
116+
# user_id = authenticated_user.get("user_principal_id")
117+
# if not user_id:
118+
# user_id = f"anonymous_{process_id}"
119+
# except Exception as e:
120+
# logging.warning(f"Could not extract user from WebSocket headers: {e}")
121+
# user_id = f"anonymous_{user_id}"
122+
123+
# Add to the connection manager for backend updates
124+
125+
connection_config.add_connection(process_id, websocket)
126+
track_event_if_configured("WebSocketConnectionAccepted", {"process_id": "user_id"})
127+
128+
# Keep the connection open - FastAPI will close the connection if this returns
129+
while True:
130+
# no expectation that we will receive anything from the client but this keeps
131+
# the connection open and does not take cpu cycle
132+
try:
133+
await websocket.receive_text()
134+
except asyncio.TimeoutError:
135+
pass
136+
137+
except WebSocketDisconnect:
138+
track_event_if_configured("WebSocketDisconnect", {"process_id": user_id})
139+
logging.info(f"Client disconnected from batch {user_id}")
140+
await connection_config.close_connection(user_id)
141+
except Exception as e:
142+
logging.error("Error in WebSocket connection", error=str(e))
143+
await connection_config.close_connection(user_id)
91144

92145

93146
@app.post("/api/user_browser_language")
@@ -670,9 +723,11 @@ async def get_plans(
670723
"UserIdNotFound", {"status_code": 400, "detail": "no user"}
671724
)
672725
raise HTTPException(status_code=400, detail="no user")
726+
727+
await connection_config.send_status_update_async("Test message from get_plans", user_id)
673728

674729
# Initialize agent team for this user session
675-
await OrchestrationManager.get_current_orchestration(user_id=user_id)
730+
#await OrchestrationManager.get_current_orchestration(user_id=user_id)
676731

677732
# Replace the following with code to get plan run history from the database
678733

@@ -895,80 +950,80 @@ async def get_agent_tools():
895950
return []
896951

897952

898-
@app.post("/api/test/streaming/{plan_id}")
899-
async def test_streaming_updates(plan_id: str):
900-
"""
901-
Test endpoint to simulate streaming updates for a plan.
902-
This is for testing the WebSocket streaming functionality.
903-
"""
904-
from common.utils.websocket_streaming import (send_agent_message,
905-
send_plan_update,
906-
send_step_update)
907-
908-
try:
909-
# Simulate a series of streaming updates
910-
await send_agent_message(
911-
plan_id=plan_id,
912-
agent_name="Data Analyst",
913-
content="Starting analysis of the data...",
914-
message_type="thinking",
915-
)
916-
917-
await asyncio.sleep(1)
918-
919-
await send_plan_update(
920-
plan_id=plan_id,
921-
step_id="step_1",
922-
agent_name="Data Analyst",
923-
content="Analyzing customer data patterns...",
924-
status="in_progress",
925-
message_type="action",
926-
)
927-
928-
await asyncio.sleep(2)
929-
930-
await send_agent_message(
931-
plan_id=plan_id,
932-
agent_name="Data Analyst",
933-
content="Found 3 key insights in the customer data. Processing recommendations...",
934-
message_type="result",
935-
)
936-
937-
await asyncio.sleep(1)
938-
939-
await send_step_update(
940-
plan_id=plan_id,
941-
step_id="step_1",
942-
status="completed",
943-
content="Data analysis completed successfully!",
944-
)
945-
946-
await send_agent_message(
947-
plan_id=plan_id,
948-
agent_name="Business Advisor",
949-
content="Reviewing the analysis results and preparing strategic recommendations...",
950-
message_type="thinking",
951-
)
952-
953-
await asyncio.sleep(2)
954-
955-
await send_plan_update(
956-
plan_id=plan_id,
957-
step_id="step_2",
958-
agent_name="Business Advisor",
959-
content="Based on the data analysis, I recommend focusing on customer retention strategies for the identified high-value segments.",
960-
status="completed",
961-
message_type="result",
962-
)
963-
964-
return {
965-
"status": "success",
966-
"message": f"Test streaming updates sent for plan {plan_id}",
967-
}
968-
969-
except Exception as e:
970-
logging.error(f"Error sending test streaming updates: {e}")
971-
raise HTTPException(status_code=500, detail=str(e))
953+
# @app.post("/api/test/streaming/{plan_id}")
954+
# async def test_streaming_updates(plan_id: str):
955+
# """
956+
# Test endpoint to simulate streaming updates for a plan.
957+
# This is for testing the WebSocket streaming functionality.
958+
# """
959+
# from common.utils.websocket_streaming import (send_agent_message,
960+
# send_plan_update,
961+
# send_step_update)
962+
963+
# try:
964+
# # Simulate a series of streaming updates
965+
# await send_agent_message(
966+
# plan_id=plan_id,
967+
# agent_name="Data Analyst",
968+
# content="Starting analysis of the data...",
969+
# message_type="thinking",
970+
# )
971+
972+
# await asyncio.sleep(1)
973+
974+
# await send_plan_update(
975+
# plan_id=plan_id,
976+
# step_id="step_1",
977+
# agent_name="Data Analyst",
978+
# content="Analyzing customer data patterns...",
979+
# status="in_progress",
980+
# message_type="action",
981+
# )
982+
983+
# await asyncio.sleep(2)
984+
985+
# await send_agent_message(
986+
# plan_id=plan_id,
987+
# agent_name="Data Analyst",
988+
# content="Found 3 key insights in the customer data. Processing recommendations...",
989+
# message_type="result",
990+
# )
991+
992+
# await asyncio.sleep(1)
993+
994+
# await send_step_update(
995+
# plan_id=plan_id,
996+
# step_id="step_1",
997+
# status="completed",
998+
# content="Data analysis completed successfully!",
999+
# )
1000+
1001+
# await send_agent_message(
1002+
# plan_id=plan_id,
1003+
# agent_name="Business Advisor",
1004+
# content="Reviewing the analysis results and preparing strategic recommendations...",
1005+
# message_type="thinking",
1006+
# )
1007+
1008+
# await asyncio.sleep(2)
1009+
1010+
# await send_plan_update(
1011+
# plan_id=plan_id,
1012+
# step_id="step_2",
1013+
# agent_name="Business Advisor",
1014+
# content="Based on the data analysis, I recommend focusing on customer retention strategies for the identified high-value segments.",
1015+
# status="completed",
1016+
# message_type="result",
1017+
# )
1018+
1019+
# return {
1020+
# "status": "success",
1021+
# "message": f"Test streaming updates sent for plan {plan_id}",
1022+
# }
1023+
1024+
# except Exception as e:
1025+
# logging.error(f"Error sending test streaming updates: {e}")
1026+
# raise HTTPException(status_code=500, detail=str(e))
9721027

9731028

9741029
# Run the app

src/backend/common/utils/utils_date.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import json
22
import locale
3-
from datetime import datetime
43
import logging
4+
from datetime import datetime
55
from typing import Optional
6+
7+
import regex as re
68
from dateutil import parser
79

810

0 commit comments

Comments
 (0)