Skip to content

Commit 7098537

Browse files
committed
parse strings, and create error queue
1 parent 685fc19 commit 7098537

File tree

2 files changed

+112
-30
lines changed

2 files changed

+112
-30
lines changed

packages/ai/src/platform/browser/websocket.ts

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -75,28 +75,57 @@ export class BrowserWebSocketHandler implements WebSocketHandler {
7575
}
7676

7777
const messageQueue: unknown[] = [];
78+
const errorQueue: Error[] = [];
7879
let resolvePromise: (() => void) | null = null;
7980
let isClosed = false;
8081

8182
const messageListener = async (event: MessageEvent): Promise<void> => {
83+
let data: string;
8284
if (event.data instanceof Blob) {
83-
try {
84-
const obj = JSON.parse(await event.data.text()) as unknown;
85-
messageQueue.push(obj);
86-
if (resolvePromise) {
87-
resolvePromise();
88-
resolvePromise = null;
89-
}
90-
} catch (e) {
91-
console.warn('Failed to parse WebSocket message to JSON:', e);
92-
}
85+
data = await event.data.text();
86+
} else if (typeof event.data === 'string') {
87+
data = event.data;
9388
} else {
94-
throw new AIError(
95-
AIErrorCode.PARSE_FAILED,
96-
`Failed to parse WebSocket response to JSON. ` +
97-
`Expected data to be a Blob, but was ${typeof event.data}.`
89+
errorQueue.push(
90+
new AIError(
91+
AIErrorCode.PARSE_FAILED,
92+
`Failed to parse WebSocket response. Expected data to be a Blob or string, but was ${typeof event.data}.`
93+
)
94+
);
95+
if (resolvePromise) {
96+
resolvePromise();
97+
resolvePromise = null;
98+
}
99+
return;
100+
}
101+
102+
try {
103+
const obj = JSON.parse(data) as unknown;
104+
messageQueue.push(obj);
105+
} catch (e) {
106+
const err = e as Error;
107+
errorQueue.push(
108+
new AIError(
109+
AIErrorCode.PARSE_FAILED,
110+
`Error parsing WebSocket message to JSON: ${err.message}`
111+
)
98112
);
99113
}
114+
115+
if (resolvePromise) {
116+
resolvePromise();
117+
resolvePromise = null;
118+
}
119+
};
120+
121+
const errorListener = (): void => {
122+
errorQueue.push(
123+
new AIError(AIErrorCode.FETCH_ERROR, 'WebSocket connection error.')
124+
);
125+
if (resolvePromise) {
126+
resolvePromise();
127+
resolvePromise = null;
128+
}
100129
};
101130

102131
const closeListener = (): void => {
@@ -108,12 +137,18 @@ export class BrowserWebSocketHandler implements WebSocketHandler {
108137
// Clean up listeners to prevent memory leaks
109138
this.ws?.removeEventListener('message', messageListener);
110139
this.ws?.removeEventListener('close', closeListener);
140+
this.ws?.removeEventListener('error', errorListener);
111141
};
112142

113143
this.ws.addEventListener('message', messageListener);
114144
this.ws.addEventListener('close', closeListener);
145+
this.ws.addEventListener('error', errorListener);
115146

116147
while (!isClosed) {
148+
if (errorQueue.length > 0) {
149+
const error = errorQueue.shift()!;
150+
throw error;
151+
}
117152
if (messageQueue.length > 0) {
118153
yield messageQueue.shift()!;
119154
} else {
@@ -122,6 +157,12 @@ export class BrowserWebSocketHandler implements WebSocketHandler {
122157
});
123158
}
124159
}
160+
161+
// If the loop terminated because isClosed is true, check for any final errors
162+
if (errorQueue.length > 0) {
163+
const error = errorQueue.shift()!;
164+
throw error;
165+
}
125166
}
126167

127168
close(code?: number, reason?: string): Promise<void> {

packages/ai/src/platform/node/websocket.ts

Lines changed: 57 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -92,28 +92,57 @@ export class NodeWebSocketHandler implements WebSocketHandler {
9292
}
9393

9494
const messageQueue: unknown[] = [];
95+
const errorQueue: Error[] = [];
9596
let resolvePromise: (() => void) | null = null;
9697
let isClosed = false;
9798

9899
const messageListener = async (event: MessageEvent): Promise<void> => {
100+
let data: string;
99101
if (event.data instanceof Blob) {
100-
try {
101-
const obj = JSON.parse(await event.data.text()) as unknown;
102-
messageQueue.push(obj);
103-
if (resolvePromise) {
104-
resolvePromise();
105-
resolvePromise = null;
106-
}
107-
} catch (e) {
108-
console.warn('Failed to parse WebSocket message to JSON:', e);
109-
}
102+
data = await event.data.text();
103+
} else if (typeof event.data === 'string') {
104+
data = event.data;
110105
} else {
111-
throw new AIError(
112-
AIErrorCode.PARSE_FAILED,
113-
`Failed to parse WebSocket response to JSON. ` +
114-
`Expected data to be a Blob, but was ${typeof event.data}.`
106+
errorQueue.push(
107+
new AIError(
108+
AIErrorCode.PARSE_FAILED,
109+
`Failed to parse WebSocket response. Expected data to be a Blob or string, but was ${typeof event.data}.`
110+
)
111+
);
112+
if (resolvePromise) {
113+
resolvePromise();
114+
resolvePromise = null;
115+
}
116+
return;
117+
}
118+
119+
try {
120+
const obj = JSON.parse(data) as unknown;
121+
messageQueue.push(obj);
122+
} catch (e) {
123+
const err = e as Error;
124+
errorQueue.push(
125+
new AIError(
126+
AIErrorCode.PARSE_FAILED,
127+
`Error parsing WebSocket message to JSON: ${err.message}`
128+
)
115129
);
116130
}
131+
132+
if (resolvePromise) {
133+
resolvePromise();
134+
resolvePromise = null;
135+
}
136+
};
137+
138+
const errorListener = (): void => {
139+
errorQueue.push(
140+
new AIError(AIErrorCode.FETCH_ERROR, 'WebSocket connection error.')
141+
);
142+
if (resolvePromise) {
143+
resolvePromise();
144+
resolvePromise = null;
145+
}
117146
};
118147

119148
const closeListener = (): void => {
@@ -122,15 +151,21 @@ export class NodeWebSocketHandler implements WebSocketHandler {
122151
resolvePromise();
123152
resolvePromise = null;
124153
}
125-
// Clean up listeners to prevent memory leaks
154+
// Clean up listeners to prevent memory leaks.
126155
this.ws?.removeEventListener('message', messageListener);
127156
this.ws?.removeEventListener('close', closeListener);
157+
this.ws?.removeEventListener('error', errorListener);
128158
};
129159

130160
this.ws.addEventListener('message', messageListener);
131161
this.ws.addEventListener('close', closeListener);
162+
this.ws.addEventListener('error', errorListener);
132163

133164
while (!isClosed) {
165+
if (errorQueue.length > 0) {
166+
const error = errorQueue.shift()!;
167+
throw error;
168+
}
134169
if (messageQueue.length > 0) {
135170
yield messageQueue.shift()!;
136171
} else {
@@ -139,6 +174,12 @@ export class NodeWebSocketHandler implements WebSocketHandler {
139174
});
140175
}
141176
}
177+
178+
// If the loop terminated because isClosed is true, check for any final errors
179+
if (errorQueue.length > 0) {
180+
const error = errorQueue.shift()!;
181+
throw error;
182+
}
142183
}
143184

144185
close(code?: number, reason?: string): Promise<void> {
@@ -148,7 +189,7 @@ export class NodeWebSocketHandler implements WebSocketHandler {
148189
}
149190

150191
this.ws.addEventListener('close', () => resolve(), { once: true });
151-
// Calling 'close' during these states results in an error.
192+
// Calling 'close' during these states results in an error
152193
if (
153194
this.ws.readyState === WebSocket.CLOSED ||
154195
this.ws.readyState === WebSocket.CONNECTING

0 commit comments

Comments
 (0)