From 3a58ea1057c3c6cc050eb5f20e94839b11014d87 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Tue, 25 Nov 2025 11:51:22 +0000 Subject: [PATCH] feat: add closeSSEStream callback to RequestHandlerExtra Address findleyr's feedback to decouple tool code from transport: - Add CloseSSEStreamOptions type for per-invocation retry intervals - Add closeSSEStream callback to MessageExtraInfo and RequestHandlerExtra - Callback only provided when eventStore is configured - Support custom retry interval via options.retryInterval - Update ssePollingExample to use the new callback API Tools can now close SSE streams directly via extra.closeSSEStream() without needing to track or access transports explicitly. --- src/examples/server/ssePollingExample.ts | 16 +- src/server/streamableHttp.test.ts | 184 +++++++++++++++++++++++ src/server/streamableHttp.ts | 21 ++- src/shared/protocol.ts | 14 +- src/types.ts | 17 +++ 5 files changed, 241 insertions(+), 11 deletions(-) diff --git a/src/examples/server/ssePollingExample.ts b/src/examples/server/ssePollingExample.ts index 8bb8cfbc9..42cbfe0f5 100644 --- a/src/examples/server/ssePollingExample.ts +++ b/src/examples/server/ssePollingExample.ts @@ -7,7 +7,7 @@ * Key features: * - Configures `retryInterval` to tell clients how long to wait before reconnecting * - Uses `eventStore` to persist events for replay after reconnection - * - Calls `closeSSEStream()` to gracefully disconnect clients mid-operation + * - Uses `extra.closeSSEStream()` callback to gracefully disconnect clients mid-operation * * Run with: npx tsx src/examples/server/ssePollingExample.ts * Test with: curl or the MCP Inspector @@ -31,9 +31,6 @@ const server = new McpServer( } ); -// Track active transports by session ID for closeSSEStream access -const transports = new Map(); - // Register a long-running tool that demonstrates server-initiated disconnect server.tool( 'long-task', @@ -66,10 +63,10 @@ server.tool( // Server decides to disconnect the client to free resources // Client will reconnect via GET with Last-Event-ID after retryInterval - const transport = transports.get(extra.sessionId!); - if (transport) { + // Use extra.closeSSEStream callback - available when eventStore is configured + if (extra.closeSSEStream) { console.log(`[${extra.sessionId}] Closing SSE stream to trigger client polling...`); - transport.closeSSEStream(extra.requestId); + extra.closeSSEStream({ retryInterval: 2000 }); } // Continue processing while client is disconnected @@ -112,6 +109,9 @@ app.use(cors()); // Create event store for resumability const eventStore = new InMemoryEventStore(); +// Track transports by session ID for session reuse +const transports = new Map(); + // Handle all MCP requests - use express.json() only for this route app.all('/mcp', express.json(), async (req: Request, res: Response) => { const sessionId = req.headers['mcp-session-id'] as string | undefined; @@ -123,7 +123,7 @@ app.all('/mcp', express.json(), async (req: Request, res: Response) => { transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => randomUUID(), eventStore, - retryInterval: 2000, // Client should reconnect after 2 seconds + retryInterval: 2000, // Default retry interval for priming events onsessioninitialized: id => { console.log(`[${id}] Session initialized`); transports.set(id, transport!); diff --git a/src/server/streamableHttp.test.ts b/src/server/streamableHttp.test.ts index 80ee04d67..f81a346d3 100644 --- a/src/server/streamableHttp.test.ts +++ b/src/server/streamableHttp.test.ts @@ -1802,6 +1802,190 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => { // Clean up - resolve the tool promise toolResolve!(); }); + + it('should provide closeSSEStream callback in extra when eventStore is configured', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore(), + retryInterval: 1000 + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + mcpServer = result.mcpServer; + + // Track whether closeSSEStream callback was provided + let receivedCloseSSEStream: ((options?: { retryInterval?: number }) => void) | undefined; + + // Register a tool that captures the extra.closeSSEStream callback + mcpServer.tool('test-callback-tool', 'Test tool', {}, async (_args, extra) => { + receivedCloseSSEStream = extra.closeSSEStream; + return { content: [{ type: 'text', text: 'Done' }] }; + }); + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Call the tool + const toolCallRequest: JSONRPCMessage = { + jsonrpc: '2.0', + id: 200, + method: 'tools/call', + params: { name: 'test-callback-tool', arguments: {} } + }; + + const postResponse = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + }, + body: JSON.stringify(toolCallRequest) + }); + + expect(postResponse.status).toBe(200); + + // Read all events to completion + const reader = postResponse.body?.getReader(); + while (true) { + const { done } = await reader!.read(); + if (done) break; + } + + // Verify closeSSEStream callback was provided + expect(receivedCloseSSEStream).toBeDefined(); + expect(typeof receivedCloseSSEStream).toBe('function'); + }); + + it('should NOT provide closeSSEStream callback when eventStore is NOT configured', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID() + // No eventStore + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + mcpServer = result.mcpServer; + + // Track whether closeSSEStream callback was provided + let receivedCloseSSEStream: ((options?: { retryInterval?: number }) => void) | undefined; + + // Register a tool that captures the extra.closeSSEStream callback + mcpServer.tool('test-no-callback-tool', 'Test tool', {}, async (_args, extra) => { + receivedCloseSSEStream = extra.closeSSEStream; + return { content: [{ type: 'text', text: 'Done' }] }; + }); + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Call the tool + const toolCallRequest: JSONRPCMessage = { + jsonrpc: '2.0', + id: 201, + method: 'tools/call', + params: { name: 'test-no-callback-tool', arguments: {} } + }; + + const postResponse = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + }, + body: JSON.stringify(toolCallRequest) + }); + + expect(postResponse.status).toBe(200); + + // Read all events to completion + const reader = postResponse.body?.getReader(); + while (true) { + const { done } = await reader!.read(); + if (done) break; + } + + // Verify closeSSEStream callback was NOT provided + expect(receivedCloseSSEStream).toBeUndefined(); + }); + + it('should send custom retry interval when closeSSEStream is called with retryInterval', async () => { + const result = await createTestServer({ + sessionIdGenerator: () => randomUUID(), + eventStore: createEventStore(), + retryInterval: 1000 // Default + }); + server = result.server; + transport = result.transport; + baseUrl = result.baseUrl; + mcpServer = result.mcpServer; + + // Track tool execution state + let toolResolve: () => void; + const toolPromise = new Promise(resolve => { + toolResolve = resolve; + }); + + // Register a tool that uses closeSSEStream with custom retry interval + mcpServer.tool('custom-retry-tool', 'Test tool', {}, async (_args, extra) => { + // Use closeSSEStream with custom retry interval + extra.closeSSEStream?.({ retryInterval: 5000 }); + await toolPromise; + return { content: [{ type: 'text', text: 'Done' }] }; + }); + + // Initialize to get session ID + const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize); + sessionId = initResponse.headers.get('mcp-session-id') as string; + expect(sessionId).toBeDefined(); + + // Call the tool + const toolCallRequest: JSONRPCMessage = { + jsonrpc: '2.0', + id: 202, + method: 'tools/call', + params: { name: 'custom-retry-tool', arguments: {} } + }; + + const postResponse = await fetch(baseUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Accept: 'text/event-stream, application/json', + 'mcp-session-id': sessionId, + 'mcp-protocol-version': '2025-03-26' + }, + body: JSON.stringify(toolCallRequest) + }); + + expect(postResponse.status).toBe(200); + + // Collect all SSE data + const reader = postResponse.body?.getReader(); + let allText = ''; + while (true) { + const { done, value } = await reader!.read(); + if (value) { + allText += new TextDecoder().decode(value); + } + if (done) break; + } + + // Verify the custom retry interval was sent + // The stream should contain "retry: 5000" (the custom value) + expect(allText).toContain('retry: 5000'); + + // Clean up + toolResolve!(); + }); }); // Test onsessionclosed callback diff --git a/src/server/streamableHttp.ts b/src/server/streamableHttp.ts index 4514e619c..d0228c36b 100644 --- a/src/server/streamableHttp.ts +++ b/src/server/streamableHttp.ts @@ -1,6 +1,7 @@ import { IncomingMessage, ServerResponse } from 'node:http'; import { Transport } from '../shared/transport.js'; import { + CloseSSEStreamOptions, MessageExtraInfo, RequestInfo, isInitializeRequest, @@ -649,7 +650,15 @@ export class StreamableHTTPServerTransport implements Transport { // handle each message for (const message of messages) { - this.onmessage?.(message, { authInfo, requestInfo }); + // Build closeSSEStream callback for requests when eventStore is configured + let closeSSEStream: ((options?: CloseSSEStreamOptions) => void) | undefined; + if (isJSONRPCRequest(message) && this._eventStore) { + closeSSEStream = (options?: CloseSSEStreamOptions) => { + this.closeSSEStream(message.id, options?.retryInterval); + }; + } + + this.onmessage?.(message, { authInfo, requestInfo, closeSSEStream }); } // The server SHOULD NOT close the SSE stream before sending all JSON-RPC responses // This will be handled by the send() method when responses are ready @@ -794,13 +803,21 @@ export class StreamableHTTPServerTransport implements Transport { * Close an SSE stream for a specific request, triggering client reconnection. * Use this to implement polling behavior during long-running operations - * client will reconnect after the retry interval specified in the priming event. + * + * @param requestId - The request ID whose stream should be closed + * @param retryInterval - Optional retry interval in milliseconds to send before closing. + * If provided, sends a retry field to override the transport default. */ - closeSSEStream(requestId: RequestId): void { + closeSSEStream(requestId: RequestId, retryInterval?: number): void { const streamId = this._requestToStreamMapping.get(requestId); if (!streamId) return; const stream = this._streamMapping.get(streamId); if (stream) { + // If a custom retry interval is provided, send it before closing + if (retryInterval !== undefined) { + stream.write(`retry: ${retryInterval}\n\n`); + } stream.end(); this._streamMapping.delete(streamId); } diff --git a/src/shared/protocol.ts b/src/shared/protocol.ts index add69163c..212dd08db 100644 --- a/src/shared/protocol.ts +++ b/src/shared/protocol.ts @@ -2,6 +2,7 @@ import { AnySchema, AnyObjectSchema, SchemaOutput, safeParse } from '../server/z import { CancelledNotificationSchema, ClientCapabilities, + CloseSSEStreamOptions, ErrorCode, isJSONRPCError, isJSONRPCRequest, @@ -154,6 +155,16 @@ export type RequestHandlerExtra(request: SendRequestT, resultSchema: U, options?: RequestOptions) => Promise>; + + /** + * Closes the SSE stream for this request, triggering client reconnection. + * Only available when using StreamableHTTPServerTransport with eventStore configured. + * Use this to implement polling behavior during long-running operations. + * + * @param options - Optional configuration for the close operation + * @param options.retryInterval - Retry interval in milliseconds to suggest to clients + */ + closeSSEStream?: (options?: CloseSSEStreamOptions) => void; }; /** @@ -369,7 +380,8 @@ export abstract class Protocol this.request(r, resultSchema, { ...options, relatedRequestId: request.id }), authInfo: extra?.authInfo, requestId: request.id, - requestInfo: extra?.requestInfo + requestInfo: extra?.requestInfo, + closeSSEStream: extra?.closeSSEStream }; // Starting with Promise.resolve() puts any synchronous errors into the monad as well. diff --git a/src/types.ts b/src/types.ts index 5f34ed1b1..01a2caa81 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1841,6 +1841,17 @@ export interface RequestInfo { headers: IsomorphicHeaders; } +/** + * Options for closing an SSE stream. + */ +export interface CloseSSEStreamOptions { + /** + * Retry interval in milliseconds to suggest to clients before closing. + * When set, sends an SSE retry field to override the transport's default. + */ + retryInterval?: number; +} + /** * Extra information about a message. */ @@ -1854,6 +1865,12 @@ export interface MessageExtraInfo { * The authentication information. */ authInfo?: AuthInfo; + + /** + * Callback to close the SSE stream for this request, triggering client reconnection. + * Only available when using StreamableHTTPServerTransport with eventStore configured. + */ + closeSSEStream?: (options?: CloseSSEStreamOptions) => void; } /* JSON-RPC types */