Skip to content

Commit 9a8a5d6

Browse files
authored
⏱️ fix: Separate MCP GET SSE Stream Timeout from POST and Suppress SDK-Internal Recovery Errors (#11936)
* fix: Separate MCP GET SSE body timeout from POST and suppress SDK-internal stream recovery - Add a dedicated GET Agent with a configurable `sseReadTimeout` (default 5 min, matching the Python MCP SDK) so idle SSE streams time out independently of POST requests, preventing the reconnect-loop log flood described in Discussion #11230. - Suppress "SSE stream disconnected" and "Failed to reconnect SSE stream" errors in setupTransportErrorHandlers — these are SDK-internal recovery events, not transport failures. "Maximum reconnection attempts exceeded" still escalates. - Add optional `sseReadTimeout` to BaseOptionsSchema for per-server configuration. - Add 6 tests: agent timeout separation, custom sseReadTimeout, SSE disconnect suppression (3 unit), and a real-server integration test proving the GET stream recovers without a full transport rebuild. * fix: Refactor MCP connection timeouts and error handling - Updated the `DEFAULT_SSE_READ_TIMEOUT` to use a constant for better readability. - Introduced internal error message constants for SSE stream disconnection and reconnection failures to improve maintainability. - Enhanced type safety in tests by ensuring the options symbol is defined before usage. - Updated the `sseReadTimeout` in `BaseOptionsSchema` to enforce positive values, ensuring valid configurations. * chore: Update SSE read timeout documentation format in BaseOptionsSchema - Changed the default timeout value comment in BaseOptionsSchema to use an underscore for better readability, aligning with common formatting practices.
1 parent 44dbbd5 commit 9a8a5d6

File tree

3 files changed

+254
-10
lines changed

3 files changed

+254
-10
lines changed

packages/api/src/mcp/__tests__/MCPConnectionAgentLifecycle.test.ts

Lines changed: 194 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,8 @@ describe('MCPConnection Agent lifecycle – streamable-http', () => {
229229
await safeDisconnect(conn);
230230

231231
/**
232-
* streamable-http creates one Agent via createFetchFunction.
232+
* streamable-http creates two Agents via createFetchFunction: one for POST
233+
* (normal timeout) and one for GET SSE (long body timeout).
233234
* If agents were per-request (old bug), they would not be stored and close
234235
* would be called 0 times. With our fix, Agents are stored and closed on
235236
* disconnect regardless of request count — confirming reuse.
@@ -300,6 +301,57 @@ describe('MCPConnection Agent lifecycle – streamable-http', () => {
300301
expect(closeSpy.mock.calls.length).toBe(countAfterFirst);
301302
conn = null;
302303
});
304+
305+
it('creates separate Agents for POST (normal timeout) and GET SSE (default sseReadTimeout)', async () => {
306+
conn = new MCPConnection({
307+
serverName: 'test',
308+
serverConfig: { type: 'streamable-http', url: server.url },
309+
useSSRFProtection: false,
310+
});
311+
312+
await conn.connect();
313+
314+
const agents = (conn as unknown as { agents: Agent[] }).agents;
315+
expect(agents.length).toBeGreaterThanOrEqual(2);
316+
317+
const optionsSym = Object.getOwnPropertySymbols(agents[0]).find(
318+
(s) => s.toString() === 'Symbol(options)',
319+
);
320+
expect(optionsSym).toBeDefined();
321+
322+
const bodyTimeouts = agents.map(
323+
(a) => (a as unknown as Record<symbol, { bodyTimeout: number }>)[optionsSym!].bodyTimeout,
324+
);
325+
326+
const hasShortTimeout = bodyTimeouts.some((t) => t <= 120_000);
327+
const hasLongTimeout = bodyTimeouts.some((t) => t === 5 * 60 * 1000);
328+
329+
expect(hasShortTimeout).toBe(true);
330+
expect(hasLongTimeout).toBe(true);
331+
});
332+
333+
it('respects a custom sseReadTimeout from server config', async () => {
334+
const customTimeout = 10 * 60 * 1000;
335+
conn = new MCPConnection({
336+
serverName: 'test',
337+
serverConfig: { type: 'streamable-http', url: server.url, sseReadTimeout: customTimeout },
338+
useSSRFProtection: false,
339+
});
340+
341+
await conn.connect();
342+
343+
const agents = (conn as unknown as { agents: Agent[] }).agents;
344+
const optionsSym = Object.getOwnPropertySymbols(agents[0]).find(
345+
(s) => s.toString() === 'Symbol(options)',
346+
);
347+
expect(optionsSym).toBeDefined();
348+
349+
const bodyTimeouts = agents.map(
350+
(a) => (a as unknown as Record<symbol, { bodyTimeout: number }>)[optionsSym!].bodyTimeout,
351+
);
352+
353+
expect(bodyTimeouts).toContain(customTimeout);
354+
});
303355
});
304356

305357
describe('MCPConnection Agent lifecycle – SSE', () => {
@@ -528,3 +580,144 @@ describe('MCPConnection SSE 404 handling – session-aware', () => {
528580
expect(emitSpy).not.toHaveBeenCalledWith('connectionChange', 'error');
529581
});
530582
});
583+
584+
describe('MCPConnection SSE stream disconnect handling', () => {
585+
function makeTransportStub() {
586+
return {
587+
onerror: undefined as ((e: Error) => void) | undefined,
588+
onclose: undefined as (() => void) | undefined,
589+
onmessage: undefined as ((m: unknown) => void) | undefined,
590+
start: jest.fn(),
591+
close: jest.fn(),
592+
send: jest.fn(),
593+
};
594+
}
595+
596+
function makeConn() {
597+
return new MCPConnection({
598+
serverName: 'test-sse-disconnect',
599+
serverConfig: { url: 'http://127.0.0.1:1/sse' },
600+
useSSRFProtection: false,
601+
});
602+
}
603+
604+
function bindErrorHandler(conn: MCPConnection, transport: ReturnType<typeof makeTransportStub>) {
605+
(
606+
conn as unknown as { setupTransportErrorHandlers: (t: unknown) => void }
607+
).setupTransportErrorHandlers(transport);
608+
}
609+
610+
beforeEach(() => {
611+
mockLogger.debug.mockClear();
612+
mockLogger.error.mockClear();
613+
});
614+
615+
it('suppresses "SSE stream disconnected" errors from escalating to full reconnection', () => {
616+
const conn = makeConn();
617+
const transport = makeTransportStub();
618+
const emitSpy = jest.spyOn(conn, 'emit');
619+
bindErrorHandler(conn, transport);
620+
621+
transport.onerror?.(
622+
new Error('SSE stream disconnected: AbortError: The operation was aborted'),
623+
);
624+
625+
expect(mockLogger.debug).toHaveBeenCalledWith(
626+
expect.stringContaining('SDK SSE stream recovery in progress'),
627+
);
628+
expect(emitSpy).not.toHaveBeenCalledWith('connectionChange', 'error');
629+
});
630+
631+
it('suppresses "Failed to reconnect SSE stream" errors (SDK still has retries left)', () => {
632+
const conn = makeConn();
633+
const transport = makeTransportStub();
634+
const emitSpy = jest.spyOn(conn, 'emit');
635+
bindErrorHandler(conn, transport);
636+
637+
transport.onerror?.(new Error('Failed to reconnect SSE stream: connection refused'));
638+
639+
expect(mockLogger.debug).toHaveBeenCalledWith(
640+
expect.stringContaining('SDK SSE stream recovery in progress'),
641+
);
642+
expect(emitSpy).not.toHaveBeenCalledWith('connectionChange', 'error');
643+
});
644+
645+
it('escalates "Maximum reconnection attempts exceeded" (SDK gave up)', () => {
646+
const conn = makeConn();
647+
const transport = makeTransportStub();
648+
const emitSpy = jest.spyOn(conn, 'emit');
649+
bindErrorHandler(conn, transport);
650+
651+
transport.onerror?.(new Error('Maximum reconnection attempts (2) exceeded.'));
652+
653+
expect(emitSpy).toHaveBeenCalledWith('connectionChange', 'error');
654+
});
655+
656+
it('still escalates non-SSE-stream errors (e.g. POST failures)', () => {
657+
const conn = makeConn();
658+
const transport = makeTransportStub();
659+
const emitSpy = jest.spyOn(conn, 'emit');
660+
bindErrorHandler(conn, transport);
661+
662+
transport.onerror?.(new Error('Streamable HTTP error: Error POSTing to endpoint: 500'));
663+
664+
expect(emitSpy).toHaveBeenCalledWith('connectionChange', 'error');
665+
});
666+
});
667+
668+
describe('MCPConnection SSE GET stream recovery – integration', () => {
669+
let server: TestServer;
670+
let conn: MCPConnection | null;
671+
672+
beforeEach(async () => {
673+
server = await createStreamableServer();
674+
conn = null;
675+
});
676+
677+
afterEach(async () => {
678+
await safeDisconnect(conn);
679+
conn = null;
680+
jest.restoreAllMocks();
681+
await server.close();
682+
});
683+
684+
it('survives a GET SSE body timeout without triggering a full transport rebuild', async () => {
685+
const SHORT_SSE_TIMEOUT = 1500;
686+
687+
conn = new MCPConnection({
688+
serverName: 'test-sse-recovery',
689+
serverConfig: {
690+
type: 'streamable-http',
691+
url: server.url,
692+
sseReadTimeout: SHORT_SSE_TIMEOUT,
693+
},
694+
useSSRFProtection: false,
695+
});
696+
697+
await conn.connect();
698+
699+
await conn.fetchTools();
700+
701+
/**
702+
* Wait for the GET SSE body timeout to fire. The SDK will see a stream
703+
* error and call onerror("SSE stream disconnected: …"), then internally
704+
* schedule a reconnection. Our handler should suppress the escalation.
705+
*/
706+
await new Promise((resolve) => setTimeout(resolve, SHORT_SSE_TIMEOUT + 1000));
707+
708+
expect(mockLogger.debug).toHaveBeenCalledWith(
709+
expect.stringContaining('SDK SSE stream recovery in progress'),
710+
);
711+
expect(mockLogger.error).not.toHaveBeenCalledWith(
712+
expect.stringContaining('Reconnection handler failed'),
713+
expect.anything(),
714+
);
715+
716+
/**
717+
* The connection should still be functional — POST requests use a
718+
* separate Agent with the normal timeout and are unaffected.
719+
*/
720+
const tools = await conn.fetchTools();
721+
expect(tools).toBeDefined();
722+
}, 10_000);
723+
});

packages/api/src/mcp/connection.ts

Lines changed: 57 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,17 @@ const FIVE_MINUTES = 5 * 60 * 1000;
7171
const DEFAULT_TIMEOUT = 60000;
7272
/** SSE connections through proxies may need longer initial handshake time */
7373
const SSE_CONNECT_TIMEOUT = 120000;
74+
/** Default body timeout for Streamable HTTP GET SSE streams that idle between server pushes */
75+
const DEFAULT_SSE_READ_TIMEOUT = FIVE_MINUTES;
76+
77+
/**
78+
* Error message prefixes emitted by the MCP SDK's StreamableHTTPClientTransport
79+
* (client/streamableHttp.ts → _handleSseStream / _scheduleReconnection).
80+
* These are SDK-internal strings, not part of a public API. If the SDK changes
81+
* them, suppression in setupTransportErrorHandlers will silently stop working.
82+
*/
83+
const SDK_SSE_STREAM_DISCONNECTED = 'SSE stream disconnected';
84+
const SDK_SSE_RECONNECT_FAILED = 'Failed to reconnect SSE stream';
7485

7586
/**
7687
* Headers for SSE connections.
@@ -254,6 +265,7 @@ export class MCPConnection extends EventEmitter {
254265
private readonly useSSRFProtection: boolean;
255266
iconPath?: string;
256267
timeout?: number;
268+
sseReadTimeout?: number;
257269
url?: string;
258270

259271
/**
@@ -285,6 +297,7 @@ export class MCPConnection extends EventEmitter {
285297
this.useSSRFProtection = params.useSSRFProtection === true;
286298
this.iconPath = params.serverConfig.iconPath;
287299
this.timeout = params.serverConfig.timeout;
300+
this.sseReadTimeout = params.serverConfig.sseReadTimeout;
288301
this.lastPingTime = Date.now();
289302
this.createdAt = Date.now(); // Record creation timestamp for staleness detection
290303
if (params.oauthTokens) {
@@ -313,30 +326,45 @@ export class MCPConnection extends EventEmitter {
313326
* Factory function to create fetch functions without capturing the entire `this` context.
314327
* This helps prevent memory leaks by only passing necessary dependencies.
315328
*
316-
* @param getHeaders Function to retrieve request headers
317-
* @param timeout Timeout value for the agent (in milliseconds)
318-
* @returns A fetch function that merges headers appropriately
329+
* When `sseBodyTimeout` is provided, a second Agent is created with a much longer
330+
* body timeout for GET requests (the Streamable HTTP SSE stream). POST requests
331+
* continue using the normal timeout so they fail fast on real errors.
319332
*/
320333
private createFetchFunction(
321334
getHeaders: () => Record<string, string> | null | undefined,
322335
timeout?: number,
336+
sseBodyTimeout?: number,
323337
): (input: UndiciRequestInfo, init?: UndiciRequestInit) => Promise<UndiciResponse> {
324338
const ssrfConnect = this.useSSRFProtection ? createSSRFSafeUndiciConnect() : undefined;
339+
const connectOpts = ssrfConnect != null ? { connect: ssrfConnect } : {};
325340
const effectiveTimeout = timeout || DEFAULT_TIMEOUT;
326-
const agent = new Agent({
341+
const postAgent = new Agent({
327342
bodyTimeout: effectiveTimeout,
328343
headersTimeout: effectiveTimeout,
329-
...(ssrfConnect != null ? { connect: ssrfConnect } : {}),
344+
...connectOpts,
330345
});
331-
this.agents.push(agent);
346+
this.agents.push(postAgent);
347+
348+
let getAgent: Agent | undefined;
349+
if (sseBodyTimeout != null) {
350+
getAgent = new Agent({
351+
bodyTimeout: sseBodyTimeout,
352+
headersTimeout: effectiveTimeout,
353+
...connectOpts,
354+
});
355+
this.agents.push(getAgent);
356+
}
332357

333358
return function customFetch(
334359
input: UndiciRequestInfo,
335360
init?: UndiciRequestInit,
336361
): Promise<UndiciResponse> {
362+
const isGet = (init?.method ?? 'GET').toUpperCase() === 'GET';
363+
const dispatcher = isGet && getAgent ? getAgent : postAgent;
364+
337365
const requestHeaders = getHeaders();
338366
if (!requestHeaders) {
339-
return undiciFetch(input, { ...init, dispatcher: agent });
367+
return undiciFetch(input, { ...init, dispatcher });
340368
}
341369

342370
let initHeaders: Record<string, string> = {};
@@ -356,7 +384,7 @@ export class MCPConnection extends EventEmitter {
356384
...initHeaders,
357385
...requestHeaders,
358386
},
359-
dispatcher: agent,
387+
dispatcher,
360388
});
361389
};
362390
}
@@ -507,6 +535,7 @@ export class MCPConnection extends EventEmitter {
507535
fetch: this.createFetchFunction(
508536
this.getRequestHeaders.bind(this),
509537
this.timeout,
538+
this.sseReadTimeout || DEFAULT_SSE_READ_TIMEOUT,
510539
) as unknown as FetchLike,
511540
});
512541

@@ -829,7 +858,26 @@ export class MCPConnection extends EventEmitter {
829858

830859
private setupTransportErrorHandlers(transport: Transport): void {
831860
transport.onerror = (error) => {
832-
// Extract meaningful error information (handles "SSE error: undefined" cases)
861+
const rawMessage =
862+
error && typeof error === 'object' ? ((error as { message?: string }).message ?? '') : '';
863+
864+
/**
865+
* The MCP SDK's StreamableHTTPClientTransport fires onerror for SSE GET stream
866+
* disconnects but also handles reconnection internally via _scheduleReconnection.
867+
* Escalating these to a full transport rebuild creates a redundant reconnection
868+
* loop. Log at debug level and let the SDK recover the GET stream on its own.
869+
*
870+
* "Maximum reconnection attempts … exceeded" means the SDK gave up — that one
871+
* must fall through so our higher-level reconnection takes over.
872+
*/
873+
if (
874+
rawMessage.startsWith(SDK_SSE_STREAM_DISCONNECTED) ||
875+
rawMessage.startsWith(SDK_SSE_RECONNECT_FAILED)
876+
) {
877+
logger.debug(`${this.getLogPrefix()} SDK SSE stream recovery in progress: ${rawMessage}`);
878+
return;
879+
}
880+
833881
const {
834882
message: errorMessage,
835883
code: errorCode,

packages/data-provider/src/mcp.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ const BaseOptionsSchema = z.object({
1919
startup: z.boolean().optional(),
2020
iconPath: z.string().optional(),
2121
timeout: z.number().optional(),
22+
/** Timeout (ms) for the long-lived SSE GET stream body before undici aborts it. Default: 300_000 (5 min). */
23+
sseReadTimeout: z.number().positive().optional(),
2224
initTimeout: z.number().optional(),
2325
/** Controls visibility in chat dropdown menu (MCPSelect) */
2426
chatMenu: z.boolean().optional(),
@@ -212,6 +214,7 @@ const omitServerManagedFields = <T extends z.ZodObject<z.ZodRawShape>>(schema: T
212214
schema.omit({
213215
startup: true,
214216
timeout: true,
217+
sseReadTimeout: true,
215218
initTimeout: true,
216219
chatMenu: true,
217220
serverInstructions: true,

0 commit comments

Comments
 (0)