Skip to content

Commit dfcc1e7

Browse files
committed
Merge branch 'macae-v3-dev-marktayl' of https://github.com/microsoft/Multi-Agent-Custom-Automation-Engine-Solution-Accelerator into macae-v3-dev-marktayl
2 parents d91a5d0 + af3086d commit dfcc1e7

File tree

1 file changed

+21
-50
lines changed

1 file changed

+21
-50
lines changed

src/frontend/src/services/WebSocketService.tsx

Lines changed: 21 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { getUserId } from '../api/config';
1+
import { headerBuilder } from '../api/config';
22

33
export interface StreamMessage {
44
type: 'plan_update' | 'step_update' | 'agent_message' | 'error' | 'connection_status' | 'plan_approval_request' | 'final_result';
@@ -19,7 +19,6 @@ export interface StreamingPlanUpdate {
1919
timestamp?: number;
2020
}
2121

22-
// Add these new interfaces after StreamingPlanUpdate
2322
export interface PlanApprovalRequestData {
2423
plan_id: string;
2524
session_id: string;
@@ -45,49 +44,25 @@ export interface PlanApprovalResponseData {
4544

4645
class WebSocketService {
4746
private ws: WebSocket | null = null;
48-
private processId: string | null = null; // Add this to store the process ID
4947
private reconnectAttempts = 0;
5048
private maxReconnectAttempts = 5;
51-
private reconnectDelay = 12000; // Changed from 1000ms to 12000ms (12 seconds)
49+
private reconnectDelay = 12000;
5250
private listeners: Map<string, Set<(message: StreamMessage) => void>> = new Map();
5351
private planSubscriptions: Set<string> = new Set();
54-
private reconnectTimer: NodeJS.Timeout | null = null; // Add timer tracking
55-
private isConnecting = false; // Add connection state tracking
52+
private reconnectTimer: NodeJS.Timeout | null = null;
53+
private isConnecting = false;
5654

5755
/**
5856
* Connect to WebSocket server
5957
*/
6058
connect(): Promise<void> {
61-
// If already connected, return resolved promise
62-
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
63-
console.log('WebSocket already connected');
64-
return Promise.resolve();
65-
}
66-
67-
// If already connecting, wait for that connection
68-
if (this.ws && this.ws.readyState === WebSocket.CONNECTING) {
69-
console.log('WebSocket connection already in progress');
70-
return new Promise((resolve, reject) => {
71-
const checkConnection = () => {
72-
if (this.ws?.readyState === WebSocket.OPEN) {
73-
resolve();
74-
} else if (this.ws?.readyState === WebSocket.CLOSED) {
75-
reject(new Error('Connection failed'));
76-
} else {
77-
setTimeout(checkConnection, 100);
78-
}
79-
};
80-
checkConnection();
81-
});
82-
}
8359
return new Promise((resolve, reject) => {
84-
// Prevent multiple simultaneous connection attempts
8560
if (this.isConnecting) {
8661
console.log('Connection attempt already in progress');
62+
resolve();
8763
return;
8864
}
8965

90-
// Clear any existing reconnection timer
9166
if (this.reconnectTimer) {
9267
clearTimeout(this.reconnectTimer);
9368
this.reconnectTimer = null;
@@ -96,19 +71,27 @@ class WebSocketService {
9671
try {
9772
this.isConnecting = true;
9873

99-
// Get WebSocket URL from environment or default to localhost
10074
const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
10175
const wsHost = process.env.REACT_APP_WS_HOST || '127.0.0.1:8000';
102-
const processId = crypto.randomUUID(); // Generate unique process ID for this session
76+
const processId = crypto.randomUUID();
10377

104-
// Build WebSocket URL with authentication headers as query parameters
105-
const userId = getUserId(); // Import this from config
78+
const authHeaders = headerBuilder();
79+
const userId = authHeaders['x-ms-client-principal-id'];
80+
81+
if (!userId) {
82+
console.error('No user ID available for WebSocket connection');
83+
this.isConnecting = false;
84+
reject(new Error('Authentication required for WebSocket connection'));
85+
return;
86+
}
87+
88+
// Use query parameter for WebSocket authentication (as backend expects)
10689
const wsUrl = `${wsProtocol}//${wsHost}/api/v3/socket/${processId}?user_id=${encodeURIComponent(userId)}`;
10790

10891
console.log('Connecting to WebSocket:', wsUrl);
10992

11093
this.ws = new WebSocket(wsUrl);
111-
94+
11295
this.ws.onopen = () => {
11396
console.log('WebSocket connected successfully');
11497
this.reconnectAttempts = 0;
@@ -132,8 +115,7 @@ class WebSocketService {
132115
this.isConnecting = false;
133116
this.emit('connection_status', { connected: false });
134117

135-
// Only attempt reconnect if it wasn't a manual disconnect
136-
if (event.code !== 1000) { // 1000 = normal closure
118+
if (event.code !== 1000) {
137119
this.attemptReconnect();
138120
}
139121
};
@@ -158,16 +140,15 @@ class WebSocketService {
158140
disconnect(): void {
159141
console.log('Manually disconnecting WebSocket');
160142

161-
// Clear any pending reconnection attempts
162143
if (this.reconnectTimer) {
163144
clearTimeout(this.reconnectTimer);
164145
this.reconnectTimer = null;
165146
}
166147

167-
this.reconnectAttempts = this.maxReconnectAttempts; // Prevent reconnection
148+
this.reconnectAttempts = this.maxReconnectAttempts;
168149

169150
if (this.ws) {
170-
this.ws.close(1000, 'Manual disconnect'); // Use normal closure code
151+
this.ws.close(1000, 'Manual disconnect');
171152
this.ws = null;
172153
}
173154
this.planSubscriptions.clear();
@@ -216,7 +197,6 @@ class WebSocketService {
216197

217198
this.listeners.get(eventType)!.add(callback);
218199

219-
// Return unsubscribe function
220200
return () => {
221201
const eventListeners = this.listeners.get(eventType);
222202
if (eventListeners) {
@@ -269,17 +249,14 @@ class WebSocketService {
269249
private handleMessage(message: StreamMessage): void {
270250
console.log('WebSocket message received:', message);
271251

272-
// Emit to specific event listeners
273252
if (message.type) {
274253
this.emit(message.type, message.data);
275254
}
276255

277-
// Handle plan approval requests specifically
278256
if (message.type === 'plan_approval_request') {
279257
console.log('Plan approval request received via WebSocket:', message.data);
280258
}
281259

282-
// Emit to general message listeners
283260
this.emit('message', message);
284261
}
285262

@@ -293,14 +270,12 @@ class WebSocketService {
293270
return;
294271
}
295272

296-
// Prevent multiple simultaneous reconnection attempts
297273
if (this.isConnecting || this.reconnectTimer) {
298274
console.log('Reconnection attempt already in progress');
299275
return;
300276
}
301277

302278
this.reconnectAttempts++;
303-
// Use exponential backoff: 12s, 24s, 48s, 96s, 192s
304279
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
305280

306281
console.log(`Scheduling reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay/1000}s`);
@@ -312,15 +287,12 @@ class WebSocketService {
312287
this.connect()
313288
.then(() => {
314289
console.log('Reconnection successful - re-subscribing to plans');
315-
// Re-subscribe to all plans
316290
this.planSubscriptions.forEach(planId => {
317291
this.subscribeToPlan(planId);
318292
});
319293
})
320294
.catch((error) => {
321295
console.error('Reconnection failed:', error);
322-
// The connect() method will trigger another reconnection attempt
323-
// through the onclose handler if needed
324296
});
325297
}, delay);
326298
}
@@ -367,6 +339,5 @@ class WebSocketService {
367339
}
368340
}
369341

370-
// Export singleton instance
371342
export const webSocketService = new WebSocketService();
372343
export default webSocketService;

0 commit comments

Comments
 (0)