Skip to content

Commit 64fadf3

Browse files
committed
resume streams on error - initial implementation
1 parent 055fe61 commit 64fadf3

File tree

4 files changed

+391
-67
lines changed

4 files changed

+391
-67
lines changed

src/client/streamableHttp.test.ts

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -275,41 +275,55 @@ describe("StreamableHTTPClientTransport", () => {
275275
})).toBe(true);
276276
});
277277

278-
it("should include last-event-id header when resuming a broken connection", async () => {
279-
// First make a successful connection that provides an event ID
278+
it("should handle reconnection with last-event-id for resumability", async () => {
279+
// Set up a stream that will send an event with ID then error
280280
const encoder = new TextEncoder();
281281
const stream = new ReadableStream({
282282
start(controller) {
283283
const event = "id: event-123\nevent: message\ndata: {\"jsonrpc\": \"2.0\", \"method\": \"serverNotification\", \"params\": {}}\n\n";
284284
controller.enqueue(encoder.encode(event));
285-
controller.close();
285+
// Simulate network error
286+
setTimeout(() => {
287+
controller.error(new Error("Network error"));
288+
}, 10);
286289
}
287290
});
288291

292+
// Mock the initial connection
289293
(global.fetch as jest.Mock).mockResolvedValueOnce({
290294
ok: true,
291295
status: 200,
292296
headers: new Headers({ "content-type": "text/event-stream" }),
293297
body: stream
294298
});
295299

300+
const errorSpy = jest.fn();
301+
transport.onerror = errorSpy;
302+
296303
await transport.start();
297304
await transport["_startOrAuthStandaloneSSE"]();
305+
306+
// Let the stream process and error
298307
await new Promise(resolve => setTimeout(resolve, 50));
299-
300-
// Now simulate attempting to reconnect
308+
309+
// Verify error was caught
310+
expect(errorSpy).toHaveBeenCalled();
311+
312+
// Mock the reconnection (the transport should try to reconnect after error)
301313
(global.fetch as jest.Mock).mockResolvedValueOnce({
302314
ok: true,
303315
status: 200,
304316
headers: new Headers({ "content-type": "text/event-stream" }),
305317
body: null
306318
});
307-
308-
await transport["_startOrAuthStandaloneSSE"]();
309-
310-
// Check that Last-Event-ID was included
319+
320+
// Allow time for automatic reconnection
321+
await new Promise(resolve => setTimeout(resolve, 1100)); // > 1 second delay
322+
323+
// Verify that the client attempted to reconnect with the last-event-id
311324
const calls = (global.fetch as jest.Mock).mock.calls;
312325
const lastCall = calls[calls.length - 1];
326+
expect(lastCall[1].method).toBe("GET");
313327
expect(lastCall[1].headers.get("last-event-id")).toBe("event-123");
314328
});
315329

src/client/streamableHttp.ts

Lines changed: 52 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ export class StreamableHTTPClientTransport implements Transport {
4949
private _requestInit?: RequestInit;
5050
private _authProvider?: OAuthClientProvider;
5151
private _sessionId?: string;
52-
private _lastEventId?: string;
5352

5453
onclose?: () => void;
5554
onerror?: (error: Error) => void;
@@ -102,16 +101,16 @@ export class StreamableHTTPClientTransport implements Transport {
102101
);
103102
}
104103

105-
private async _startOrAuthStandaloneSSE(): Promise<void> {
104+
private async _startOrAuthStandaloneSSE(lastEventId?: string): Promise<void> {
106105
try {
107106
// Try to open an initial SSE stream with GET to listen for server messages
108107
// This is optional according to the spec - server may not support it
109108
const headers = await this._commonHeaders();
110109
headers.set("Accept", "text/event-stream");
111110

112-
// Include Last-Event-ID header for resumable streams
113-
if (this._lastEventId) {
114-
headers.set("last-event-id", this._lastEventId);
111+
// Include Last-Event-ID header for resumable streams if provided
112+
if (lastEventId) {
113+
headers.set("last-event-id", lastEventId);
115114
}
116115

117116
const response = await fetch(this._url, {
@@ -150,31 +149,61 @@ export class StreamableHTTPClientTransport implements Transport {
150149
return;
151150
}
152151

152+
let lastEventId: string | undefined;
153+
153154
const processStream = async () => {
154155
// Create a pipeline: binary stream -> text decoder -> SSE parser
155156
const eventStream = stream
156157
.pipeThrough(new TextDecoderStream())
157158
.pipeThrough(new EventSourceParserStream());
158159

159-
for await (const event of eventStream) {
160-
// Update last event ID if provided
161-
if (event.id) {
162-
this._lastEventId = event.id;
163-
}
164-
// Handle message events (default event type is undefined per docs)
165-
// or explicit 'message' event type
166-
if (!event.event || event.event === "message") {
167-
try {
168-
const message = JSONRPCMessageSchema.parse(JSON.parse(event.data));
169-
this.onmessage?.(message);
170-
} catch (error) {
171-
this.onerror?.(error as Error);
160+
try {
161+
for await (const event of eventStream) {
162+
// Update last event ID if provided
163+
if (event.id) {
164+
lastEventId = event.id;
165+
}
166+
167+
// Handle message events (default event type is undefined per docs)
168+
// or explicit 'message' event type
169+
if (!event.event || event.event === "message") {
170+
try {
171+
const message = JSONRPCMessageSchema.parse(JSON.parse(event.data));
172+
this.onmessage?.(message);
173+
} catch (error) {
174+
this.onerror?.(error as Error);
175+
}
172176
}
173177
}
178+
} catch (error) {
179+
// Handle stream errors - likely a network disconnect
180+
this.onerror?.(new Error(`SSE stream disconnected: ${error instanceof Error ? error.message : String(error)}`));
181+
182+
// Attempt to reconnect if the stream disconnects unexpectedly
183+
// Wait a short time before reconnecting to avoid rapid reconnection loops
184+
if (this._abortController && !this._abortController.signal.aborted) {
185+
setTimeout(() => {
186+
// Use the last event ID to resume where we left off
187+
this._startOrAuthStandaloneSSE(lastEventId).catch(reconnectError => {
188+
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${reconnectError instanceof Error ? reconnectError.message : String(reconnectError)}`));
189+
});
190+
}, 1000); // 1 second delay before reconnection attempt
191+
}
174192
}
175193
};
176194

177-
processStream().catch(err => this.onerror?.(err));
195+
processStream().catch(err => {
196+
this.onerror?.(err);
197+
198+
// Try to reconnect on unexpected errors
199+
if (this._abortController && !this._abortController.signal.aborted) {
200+
setTimeout(() => {
201+
this._startOrAuthStandaloneSSE(lastEventId).catch(reconnectError => {
202+
this.onerror?.(new Error(`Failed to reconnect SSE stream: ${reconnectError instanceof Error ? reconnectError.message : String(reconnectError)}`));
203+
});
204+
}, 1000);
205+
}
206+
});
178207
}
179208

180209
async start() {
@@ -252,7 +281,7 @@ export class StreamableHTTPClientTransport implements Transport {
252281
// if the accepted notification is initialized, we start the SSE stream
253282
// if it's supported by the server
254283
if (isJSONRPCNotification(message) && message.method === "notifications/initialized") {
255-
// We don't need to handle 405 here anymore as it's handled in _startOrAuthStandaloneSSE
284+
// Start without a lastEventId since this is a fresh connection
256285
this._startOrAuthStandaloneSSE().catch(err => this.onerror?.(err));
257286
}
258287
return;
@@ -268,6 +297,9 @@ export class StreamableHTTPClientTransport implements Transport {
268297

269298
if (hasRequests) {
270299
if (contentType?.includes("text/event-stream")) {
300+
// Handle SSE stream responses for requests
301+
// We use the same handler as standalone streams, which now supports
302+
// reconnection with the last event ID
271303
this._handleSseStream(response.body);
272304
} else if (contentType?.includes("application/json")) {
273305
// For non-streaming servers, we might get direct JSON responses

0 commit comments

Comments
 (0)