Skip to content

Commit 7c07f38

Browse files
Merge branch 'main' into feat/tasks
2 parents 4782f9d + 3c50d07 commit 7c07f38

File tree

7 files changed

+916
-30
lines changed

7 files changed

+916
-30
lines changed

src/client/streamableHttp.test.ts

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,70 @@ describe('StreamableHTTPClientTransport', () => {
799799
expect(fetchMock).toHaveBeenCalledTimes(1);
800800
expect(fetchMock.mock.calls[0][1]?.method).toBe('POST');
801801
});
802+
803+
it('should reconnect a POST-initiated stream after receiving a priming event', async () => {
804+
// ARRANGE
805+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
806+
reconnectionOptions: {
807+
initialReconnectionDelay: 10,
808+
maxRetries: 1,
809+
maxReconnectionDelay: 1000,
810+
reconnectionDelayGrowFactor: 1
811+
}
812+
});
813+
814+
const errorSpy = vi.fn();
815+
transport.onerror = errorSpy;
816+
817+
// Create a stream that sends a priming event (with ID) then closes
818+
const streamWithPrimingEvent = new ReadableStream({
819+
start(controller) {
820+
// Send a priming event with an ID - this enables reconnection
821+
controller.enqueue(
822+
new TextEncoder().encode('id: event-123\ndata: {"jsonrpc":"2.0","method":"notifications/message","params":{}}\n\n')
823+
);
824+
// Then close the stream (simulating server disconnect)
825+
controller.close();
826+
}
827+
});
828+
829+
const fetchMock = global.fetch as Mock;
830+
// First call: POST returns streaming response with priming event
831+
fetchMock.mockResolvedValueOnce({
832+
ok: true,
833+
status: 200,
834+
headers: new Headers({ 'content-type': 'text/event-stream' }),
835+
body: streamWithPrimingEvent
836+
});
837+
// Second call: GET reconnection - return 405 to stop further reconnection
838+
fetchMock.mockResolvedValueOnce({
839+
ok: false,
840+
status: 405,
841+
headers: new Headers()
842+
});
843+
844+
const requestMessage: JSONRPCRequest = {
845+
jsonrpc: '2.0',
846+
method: 'long_running_tool',
847+
id: 'request-1',
848+
params: {}
849+
};
850+
851+
// ACT
852+
await transport.start();
853+
await transport.send(requestMessage);
854+
// Wait for stream to process and reconnection to be scheduled
855+
await vi.advanceTimersByTimeAsync(50);
856+
857+
// ASSERT
858+
// THE KEY ASSERTION: Fetch was called TWICE - POST then GET reconnection
859+
expect(fetchMock).toHaveBeenCalledTimes(2);
860+
expect(fetchMock.mock.calls[0][1]?.method).toBe('POST');
861+
expect(fetchMock.mock.calls[1][1]?.method).toBe('GET');
862+
// Verify Last-Event-ID header was sent for reconnection
863+
const reconnectHeaders = fetchMock.mock.calls[1][1]?.headers as Headers;
864+
expect(reconnectHeaders.get('last-event-id')).toBe('event-123');
865+
});
802866
});
803867

804868
it('invalidates all credentials on InvalidClientError during auth', async () => {
@@ -1102,6 +1166,148 @@ describe('StreamableHTTPClientTransport', () => {
11021166
});
11031167
});
11041168

1169+
describe('SSE retry field handling', () => {
1170+
beforeEach(() => {
1171+
vi.useFakeTimers();
1172+
(global.fetch as Mock).mockReset();
1173+
});
1174+
afterEach(() => vi.useRealTimers());
1175+
1176+
it('should use server-provided retry value for reconnection delay', async () => {
1177+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
1178+
reconnectionOptions: {
1179+
initialReconnectionDelay: 100,
1180+
maxReconnectionDelay: 5000,
1181+
reconnectionDelayGrowFactor: 2,
1182+
maxRetries: 3
1183+
}
1184+
});
1185+
1186+
// Create a stream that sends a retry field
1187+
const encoder = new TextEncoder();
1188+
const stream = new ReadableStream({
1189+
start(controller) {
1190+
// Send SSE event with retry field
1191+
const event =
1192+
'retry: 3000\nevent: message\nid: evt-1\ndata: {"jsonrpc": "2.0", "method": "notification", "params": {}}\n\n';
1193+
controller.enqueue(encoder.encode(event));
1194+
// Close stream to trigger reconnection
1195+
controller.close();
1196+
}
1197+
});
1198+
1199+
const fetchMock = global.fetch as Mock;
1200+
fetchMock.mockResolvedValueOnce({
1201+
ok: true,
1202+
status: 200,
1203+
headers: new Headers({ 'content-type': 'text/event-stream' }),
1204+
body: stream
1205+
});
1206+
1207+
// Second request for reconnection
1208+
fetchMock.mockResolvedValueOnce({
1209+
ok: true,
1210+
status: 200,
1211+
headers: new Headers({ 'content-type': 'text/event-stream' }),
1212+
body: new ReadableStream()
1213+
});
1214+
1215+
await transport.start();
1216+
await transport['_startOrAuthSse']({});
1217+
1218+
// Wait for stream to close and reconnection to be scheduled
1219+
await vi.advanceTimersByTimeAsync(100);
1220+
1221+
// Verify the server retry value was captured
1222+
const transportInternal = transport as unknown as { _serverRetryMs?: number };
1223+
expect(transportInternal._serverRetryMs).toBe(3000);
1224+
1225+
// Verify the delay calculation uses server retry value
1226+
const getDelay = transport['_getNextReconnectionDelay'].bind(transport);
1227+
expect(getDelay(0)).toBe(3000); // Should use server value, not 100ms initial
1228+
expect(getDelay(5)).toBe(3000); // Should still use server value for any attempt
1229+
});
1230+
1231+
it('should fall back to exponential backoff when no server retry value', () => {
1232+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
1233+
reconnectionOptions: {
1234+
initialReconnectionDelay: 100,
1235+
maxReconnectionDelay: 5000,
1236+
reconnectionDelayGrowFactor: 2,
1237+
maxRetries: 3
1238+
}
1239+
});
1240+
1241+
// Without any SSE stream, _serverRetryMs should be undefined
1242+
const transportInternal = transport as unknown as { _serverRetryMs?: number };
1243+
expect(transportInternal._serverRetryMs).toBeUndefined();
1244+
1245+
// Should use exponential backoff
1246+
const getDelay = transport['_getNextReconnectionDelay'].bind(transport);
1247+
expect(getDelay(0)).toBe(100); // 100 * 2^0
1248+
expect(getDelay(1)).toBe(200); // 100 * 2^1
1249+
expect(getDelay(2)).toBe(400); // 100 * 2^2
1250+
expect(getDelay(10)).toBe(5000); // capped at max
1251+
});
1252+
1253+
it('should reconnect on graceful stream close', async () => {
1254+
transport = new StreamableHTTPClientTransport(new URL('http://localhost:1234/mcp'), {
1255+
reconnectionOptions: {
1256+
initialReconnectionDelay: 10,
1257+
maxReconnectionDelay: 1000,
1258+
reconnectionDelayGrowFactor: 1,
1259+
maxRetries: 1
1260+
}
1261+
});
1262+
1263+
// Create a stream that closes gracefully after sending an event with ID
1264+
const encoder = new TextEncoder();
1265+
const stream = new ReadableStream({
1266+
start(controller) {
1267+
// Send priming event with ID and retry field
1268+
const event = 'id: evt-1\nretry: 100\ndata: \n\n';
1269+
controller.enqueue(encoder.encode(event));
1270+
// Graceful close
1271+
controller.close();
1272+
}
1273+
});
1274+
1275+
const fetchMock = global.fetch as Mock;
1276+
fetchMock.mockResolvedValueOnce({
1277+
ok: true,
1278+
status: 200,
1279+
headers: new Headers({ 'content-type': 'text/event-stream' }),
1280+
body: stream
1281+
});
1282+
1283+
// Second request for reconnection
1284+
fetchMock.mockResolvedValueOnce({
1285+
ok: true,
1286+
status: 200,
1287+
headers: new Headers({ 'content-type': 'text/event-stream' }),
1288+
body: new ReadableStream()
1289+
});
1290+
1291+
await transport.start();
1292+
await transport['_startOrAuthSse']({});
1293+
1294+
// Wait for stream to process and close
1295+
await vi.advanceTimersByTimeAsync(50);
1296+
1297+
// Wait for reconnection delay (100ms from retry field)
1298+
await vi.advanceTimersByTimeAsync(150);
1299+
1300+
// Should have attempted reconnection
1301+
expect(fetchMock).toHaveBeenCalledTimes(2);
1302+
expect(fetchMock.mock.calls[0][1]?.method).toBe('GET');
1303+
expect(fetchMock.mock.calls[1][1]?.method).toBe('GET');
1304+
1305+
// Second call should include Last-Event-ID
1306+
const secondCallHeaders = fetchMock.mock.calls[1][1]?.headers;
1307+
expect(secondCallHeaders?.get('last-event-id')).toBe('evt-1');
1308+
});
1309+
});
1310+
11051311
describe('prevent infinite recursion when server returns 401 after successful auth', () => {
11061312
it('should throw error when server returns 401 after successful auth', async () => {
11071313
const message: JSONRPCMessage = {

src/client/streamableHttp.ts

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ export class StreamableHTTPClientTransport implements Transport {
135135
private _protocolVersion?: string;
136136
private _hasCompletedAuthFlow = false; // Circuit breaker: detect auth success followed by immediate 401
137137
private _lastUpscopingHeader?: string; // Track last upscoping header to prevent infinite upscoping.
138+
private _serverRetryMs?: number; // Server-provided retry delay from SSE retry field
138139

139140
onclose?: () => void;
140141
onerror?: (error: Error) => void;
@@ -203,6 +204,7 @@ export class StreamableHTTPClientTransport implements Transport {
203204

204205
private async _startOrAuthSse(options: StartSSEOptions): Promise<void> {
205206
const { resumptionToken } = options;
207+
206208
try {
207209
// Try to open an initial SSE stream with GET to listen for server messages
208210
// This is optional according to the spec - server may not support it
@@ -249,7 +251,12 @@ export class StreamableHTTPClientTransport implements Transport {
249251
* @returns Time to wait in milliseconds before next reconnection attempt
250252
*/
251253
private _getNextReconnectionDelay(attempt: number): number {
252-
// Access default values directly, ensuring they're never undefined
254+
// Use server-provided retry value if available
255+
if (this._serverRetryMs !== undefined) {
256+
return this._serverRetryMs;
257+
}
258+
259+
// Fall back to exponential backoff
253260
const initialDelay = this._reconnectionOptions.initialReconnectionDelay;
254261
const growFactor = this._reconnectionOptions.reconnectionDelayGrowFactor;
255262
const maxDelay = this._reconnectionOptions.maxReconnectionDelay;
@@ -259,7 +266,7 @@ export class StreamableHTTPClientTransport implements Transport {
259266
}
260267

261268
/**
262-
* Schedule a reconnection attempt with exponential backoff
269+
* Schedule a reconnection attempt using server-provided retry interval or backoff
263270
*
264271
* @param lastEventId The ID of the last received event for resumability
265272
* @param attemptCount Current reconnection attempt count for this specific stream
@@ -295,14 +302,24 @@ export class StreamableHTTPClientTransport implements Transport {
295302
const { onresumptiontoken, replayMessageId } = options;
296303

297304
let lastEventId: string | undefined;
305+
// Track whether we've received a priming event (event with ID)
306+
// Per spec, server SHOULD send a priming event with ID before closing
307+
let hasPrimingEvent = false;
298308
const processStream = async () => {
299309
// this is the closest we can get to trying to catch network errors
300310
// if something happens reader will throw
301311
try {
302312
// Create a pipeline: binary stream -> text decoder -> SSE parser
303313
const reader = stream
304314
.pipeThrough(new TextDecoderStream() as ReadableWritablePair<string, Uint8Array>)
305-
.pipeThrough(new EventSourceParserStream())
315+
.pipeThrough(
316+
new EventSourceParserStream({
317+
onRetry: (retryMs: number) => {
318+
// Capture server-provided retry value for reconnection timing
319+
this._serverRetryMs = retryMs;
320+
}
321+
})
322+
)
306323
.getReader();
307324

308325
while (true) {
@@ -314,6 +331,8 @@ export class StreamableHTTPClientTransport implements Transport {
314331
// Update last event ID if provided
315332
if (event.id) {
316333
lastEventId = event.id;
334+
// Mark that we've received a priming event - stream is now resumable
335+
hasPrimingEvent = true;
317336
onresumptiontoken?.(event.id);
318337
}
319338

@@ -329,12 +348,29 @@ export class StreamableHTTPClientTransport implements Transport {
329348
}
330349
}
331350
}
351+
352+
// Handle graceful server-side disconnect
353+
// Server may close connection after sending event ID and retry field
354+
// Reconnect if: already reconnectable (GET stream) OR received a priming event (POST stream with event ID)
355+
const canResume = isReconnectable || hasPrimingEvent;
356+
if (canResume && this._abortController && !this._abortController.signal.aborted) {
357+
this._scheduleReconnection(
358+
{
359+
resumptionToken: lastEventId,
360+
onresumptiontoken,
361+
replayMessageId
362+
},
363+
0
364+
);
365+
}
332366
} catch (error) {
333367
// Handle stream errors - likely a network disconnect
334368
this.onerror?.(new Error(`SSE stream disconnected: ${error}`));
335369

336370
// Attempt to reconnect if the stream disconnects unexpectedly and we aren't closing
337-
if (isReconnectable && this._abortController && !this._abortController.signal.aborted) {
371+
// Reconnect if: already reconnectable (GET stream) OR received a priming event (POST stream with event ID)
372+
const canResume = isReconnectable || hasPrimingEvent;
373+
if (canResume && this._abortController && !this._abortController.signal.aborted) {
338374
// Use the exponential backoff reconnection strategy
339375
try {
340376
this._scheduleReconnection(
@@ -593,4 +629,18 @@ export class StreamableHTTPClientTransport implements Transport {
593629
get protocolVersion(): string | undefined {
594630
return this._protocolVersion;
595631
}
632+
633+
/**
634+
* Resume an SSE stream from a previous event ID.
635+
* Opens a GET SSE connection with Last-Event-ID header to replay missed events.
636+
*
637+
* @param lastEventId The event ID to resume from
638+
* @param options Optional callback to receive new resumption tokens
639+
*/
640+
async resumeStream(lastEventId: string, options?: { onresumptiontoken?: (token: string) => void }): Promise<void> {
641+
await this._startOrAuthSse({
642+
resumptionToken: lastEventId,
643+
onresumptiontoken: options?.onresumptiontoken
644+
});
645+
}
596646
}

0 commit comments

Comments
 (0)