Skip to content

Commit da2e98c

Browse files
feat: add closeSSEStream callback to RequestHandlerExtra
Address findleyr's feedback to decouple tool code from transport: - Add CloseSSEStreamOptions type for per-invocation retry intervals - Add closeSSEStream callback to MessageExtraInfo and RequestHandlerExtra - Callback only provided when eventStore is configured - Support custom retry interval via options.retryInterval - Update ssePollingExample to use the new callback API Tools can now close SSE streams directly via extra.closeSSEStream() without needing to track or access transports explicitly.
1 parent b81bfb9 commit da2e98c

File tree

5 files changed

+241
-11
lines changed

5 files changed

+241
-11
lines changed

src/examples/server/ssePollingExample.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
* Key features:
88
* - Configures `retryInterval` to tell clients how long to wait before reconnecting
99
* - Uses `eventStore` to persist events for replay after reconnection
10-
* - Calls `closeSSEStream()` to gracefully disconnect clients mid-operation
10+
* - Uses `extra.closeSSEStream()` callback to gracefully disconnect clients mid-operation
1111
*
1212
* Run with: npx tsx src/examples/server/ssePollingExample.ts
1313
* Test with: curl or the MCP Inspector
@@ -31,9 +31,6 @@ const server = new McpServer(
3131
}
3232
);
3333

34-
// Track active transports by session ID for closeSSEStream access
35-
const transports = new Map<string, StreamableHTTPServerTransport>();
36-
3734
// Register a long-running tool that demonstrates server-initiated disconnect
3835
server.tool(
3936
'long-task',
@@ -66,10 +63,10 @@ server.tool(
6663

6764
// Server decides to disconnect the client to free resources
6865
// Client will reconnect via GET with Last-Event-ID after retryInterval
69-
const transport = transports.get(extra.sessionId!);
70-
if (transport) {
66+
// Use extra.closeSSEStream callback - available when eventStore is configured
67+
if (extra.closeSSEStream) {
7168
console.log(`[${extra.sessionId}] Closing SSE stream to trigger client polling...`);
72-
transport.closeSSEStream(extra.requestId);
69+
extra.closeSSEStream({ retryInterval: 2000 });
7370
}
7471

7572
// Continue processing while client is disconnected
@@ -112,6 +109,9 @@ app.use(cors());
112109
// Create event store for resumability
113110
const eventStore = new InMemoryEventStore();
114111

112+
// Track transports by session ID for session reuse
113+
const transports = new Map<string, StreamableHTTPServerTransport>();
114+
115115
// Handle all MCP requests - use express.json() only for this route
116116
app.all('/mcp', express.json(), async (req: Request, res: Response) => {
117117
const sessionId = req.headers['mcp-session-id'] as string | undefined;
@@ -123,7 +123,7 @@ app.all('/mcp', express.json(), async (req: Request, res: Response) => {
123123
transport = new StreamableHTTPServerTransport({
124124
sessionIdGenerator: () => randomUUID(),
125125
eventStore,
126-
retryInterval: 2000, // Client should reconnect after 2 seconds
126+
retryInterval: 2000, // Default retry interval for priming events
127127
onsessioninitialized: id => {
128128
console.log(`[${id}] Session initialized`);
129129
transports.set(id, transport!);

src/server/streamableHttp.test.ts

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1802,6 +1802,190 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
18021802
// Clean up - resolve the tool promise
18031803
toolResolve!();
18041804
});
1805+
1806+
it('should provide closeSSEStream callback in extra when eventStore is configured', async () => {
1807+
const result = await createTestServer({
1808+
sessionIdGenerator: () => randomUUID(),
1809+
eventStore: createEventStore(),
1810+
retryInterval: 1000
1811+
});
1812+
server = result.server;
1813+
transport = result.transport;
1814+
baseUrl = result.baseUrl;
1815+
mcpServer = result.mcpServer;
1816+
1817+
// Track whether closeSSEStream callback was provided
1818+
let receivedCloseSSEStream: ((options?: { retryInterval?: number }) => void) | undefined;
1819+
1820+
// Register a tool that captures the extra.closeSSEStream callback
1821+
mcpServer.tool('test-callback-tool', 'Test tool', {}, async (_args, extra) => {
1822+
receivedCloseSSEStream = extra.closeSSEStream;
1823+
return { content: [{ type: 'text', text: 'Done' }] };
1824+
});
1825+
1826+
// Initialize to get session ID
1827+
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
1828+
sessionId = initResponse.headers.get('mcp-session-id') as string;
1829+
expect(sessionId).toBeDefined();
1830+
1831+
// Call the tool
1832+
const toolCallRequest: JSONRPCMessage = {
1833+
jsonrpc: '2.0',
1834+
id: 200,
1835+
method: 'tools/call',
1836+
params: { name: 'test-callback-tool', arguments: {} }
1837+
};
1838+
1839+
const postResponse = await fetch(baseUrl, {
1840+
method: 'POST',
1841+
headers: {
1842+
'Content-Type': 'application/json',
1843+
Accept: 'text/event-stream, application/json',
1844+
'mcp-session-id': sessionId,
1845+
'mcp-protocol-version': '2025-03-26'
1846+
},
1847+
body: JSON.stringify(toolCallRequest)
1848+
});
1849+
1850+
expect(postResponse.status).toBe(200);
1851+
1852+
// Read all events to completion
1853+
const reader = postResponse.body?.getReader();
1854+
while (true) {
1855+
const { done } = await reader!.read();
1856+
if (done) break;
1857+
}
1858+
1859+
// Verify closeSSEStream callback was provided
1860+
expect(receivedCloseSSEStream).toBeDefined();
1861+
expect(typeof receivedCloseSSEStream).toBe('function');
1862+
});
1863+
1864+
it('should NOT provide closeSSEStream callback when eventStore is NOT configured', async () => {
1865+
const result = await createTestServer({
1866+
sessionIdGenerator: () => randomUUID()
1867+
// No eventStore
1868+
});
1869+
server = result.server;
1870+
transport = result.transport;
1871+
baseUrl = result.baseUrl;
1872+
mcpServer = result.mcpServer;
1873+
1874+
// Track whether closeSSEStream callback was provided
1875+
let receivedCloseSSEStream: ((options?: { retryInterval?: number }) => void) | undefined;
1876+
1877+
// Register a tool that captures the extra.closeSSEStream callback
1878+
mcpServer.tool('test-no-callback-tool', 'Test tool', {}, async (_args, extra) => {
1879+
receivedCloseSSEStream = extra.closeSSEStream;
1880+
return { content: [{ type: 'text', text: 'Done' }] };
1881+
});
1882+
1883+
// Initialize to get session ID
1884+
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
1885+
sessionId = initResponse.headers.get('mcp-session-id') as string;
1886+
expect(sessionId).toBeDefined();
1887+
1888+
// Call the tool
1889+
const toolCallRequest: JSONRPCMessage = {
1890+
jsonrpc: '2.0',
1891+
id: 201,
1892+
method: 'tools/call',
1893+
params: { name: 'test-no-callback-tool', arguments: {} }
1894+
};
1895+
1896+
const postResponse = await fetch(baseUrl, {
1897+
method: 'POST',
1898+
headers: {
1899+
'Content-Type': 'application/json',
1900+
Accept: 'text/event-stream, application/json',
1901+
'mcp-session-id': sessionId,
1902+
'mcp-protocol-version': '2025-03-26'
1903+
},
1904+
body: JSON.stringify(toolCallRequest)
1905+
});
1906+
1907+
expect(postResponse.status).toBe(200);
1908+
1909+
// Read all events to completion
1910+
const reader = postResponse.body?.getReader();
1911+
while (true) {
1912+
const { done } = await reader!.read();
1913+
if (done) break;
1914+
}
1915+
1916+
// Verify closeSSEStream callback was NOT provided
1917+
expect(receivedCloseSSEStream).toBeUndefined();
1918+
});
1919+
1920+
it('should send custom retry interval when closeSSEStream is called with retryInterval', async () => {
1921+
const result = await createTestServer({
1922+
sessionIdGenerator: () => randomUUID(),
1923+
eventStore: createEventStore(),
1924+
retryInterval: 1000 // Default
1925+
});
1926+
server = result.server;
1927+
transport = result.transport;
1928+
baseUrl = result.baseUrl;
1929+
mcpServer = result.mcpServer;
1930+
1931+
// Track tool execution state
1932+
let toolResolve: () => void;
1933+
const toolPromise = new Promise<void>(resolve => {
1934+
toolResolve = resolve;
1935+
});
1936+
1937+
// Register a tool that uses closeSSEStream with custom retry interval
1938+
mcpServer.tool('custom-retry-tool', 'Test tool', {}, async (_args, extra) => {
1939+
// Use closeSSEStream with custom retry interval
1940+
extra.closeSSEStream?.({ retryInterval: 5000 });
1941+
await toolPromise;
1942+
return { content: [{ type: 'text', text: 'Done' }] };
1943+
});
1944+
1945+
// Initialize to get session ID
1946+
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
1947+
sessionId = initResponse.headers.get('mcp-session-id') as string;
1948+
expect(sessionId).toBeDefined();
1949+
1950+
// Call the tool
1951+
const toolCallRequest: JSONRPCMessage = {
1952+
jsonrpc: '2.0',
1953+
id: 202,
1954+
method: 'tools/call',
1955+
params: { name: 'custom-retry-tool', arguments: {} }
1956+
};
1957+
1958+
const postResponse = await fetch(baseUrl, {
1959+
method: 'POST',
1960+
headers: {
1961+
'Content-Type': 'application/json',
1962+
Accept: 'text/event-stream, application/json',
1963+
'mcp-session-id': sessionId,
1964+
'mcp-protocol-version': '2025-03-26'
1965+
},
1966+
body: JSON.stringify(toolCallRequest)
1967+
});
1968+
1969+
expect(postResponse.status).toBe(200);
1970+
1971+
// Collect all SSE data
1972+
const reader = postResponse.body?.getReader();
1973+
let allText = '';
1974+
while (true) {
1975+
const { done, value } = await reader!.read();
1976+
if (value) {
1977+
allText += new TextDecoder().decode(value);
1978+
}
1979+
if (done) break;
1980+
}
1981+
1982+
// Verify the custom retry interval was sent
1983+
// The stream should contain "retry: 5000" (the custom value)
1984+
expect(allText).toContain('retry: 5000');
1985+
1986+
// Clean up
1987+
toolResolve!();
1988+
});
18051989
});
18061990

18071991
// Test onsessionclosed callback

src/server/streamableHttp.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { IncomingMessage, ServerResponse } from 'node:http';
22
import { Transport } from '../shared/transport.js';
33
import {
4+
CloseSSEStreamOptions,
45
MessageExtraInfo,
56
RequestInfo,
67
isInitializeRequest,
@@ -649,7 +650,15 @@ export class StreamableHTTPServerTransport implements Transport {
649650

650651
// handle each message
651652
for (const message of messages) {
652-
this.onmessage?.(message, { authInfo, requestInfo });
653+
// Build closeSSEStream callback for requests when eventStore is configured
654+
let closeSSEStream: ((options?: CloseSSEStreamOptions) => void) | undefined;
655+
if (isJSONRPCRequest(message) && this._eventStore) {
656+
closeSSEStream = (options?: CloseSSEStreamOptions) => {
657+
this.closeSSEStream(message.id, options?.retryInterval);
658+
};
659+
}
660+
661+
this.onmessage?.(message, { authInfo, requestInfo, closeSSEStream });
653662
}
654663
// The server SHOULD NOT close the SSE stream before sending all JSON-RPC responses
655664
// This will be handled by the send() method when responses are ready
@@ -794,13 +803,21 @@ export class StreamableHTTPServerTransport implements Transport {
794803
* Close an SSE stream for a specific request, triggering client reconnection.
795804
* Use this to implement polling behavior during long-running operations -
796805
* client will reconnect after the retry interval specified in the priming event.
806+
*
807+
* @param requestId - The request ID whose stream should be closed
808+
* @param retryInterval - Optional retry interval in milliseconds to send before closing.
809+
* If provided, sends a retry field to override the transport default.
797810
*/
798-
closeSSEStream(requestId: RequestId): void {
811+
closeSSEStream(requestId: RequestId, retryInterval?: number): void {
799812
const streamId = this._requestToStreamMapping.get(requestId);
800813
if (!streamId) return;
801814

802815
const stream = this._streamMapping.get(streamId);
803816
if (stream) {
817+
// If a custom retry interval is provided, send it before closing
818+
if (retryInterval !== undefined) {
819+
stream.write(`retry: ${retryInterval}\n\n`);
820+
}
804821
stream.end();
805822
this._streamMapping.delete(streamId);
806823
}

src/shared/protocol.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { AnySchema, AnyObjectSchema, SchemaOutput, safeParse } from '../server/z
22
import {
33
CancelledNotificationSchema,
44
ClientCapabilities,
5+
CloseSSEStreamOptions,
56
ErrorCode,
67
isJSONRPCError,
78
isJSONRPCRequest,
@@ -154,6 +155,16 @@ export type RequestHandlerExtra<SendRequestT extends Request, SendNotificationT
154155
* This is used by certain transports to correctly associate related messages.
155156
*/
156157
sendRequest: <U extends AnySchema>(request: SendRequestT, resultSchema: U, options?: RequestOptions) => Promise<SchemaOutput<U>>;
158+
159+
/**
160+
* Closes the SSE stream for this request, triggering client reconnection.
161+
* Only available when using StreamableHTTPServerTransport with eventStore configured.
162+
* Use this to implement polling behavior during long-running operations.
163+
*
164+
* @param options - Optional configuration for the close operation
165+
* @param options.retryInterval - Retry interval in milliseconds to suggest to clients
166+
*/
167+
closeSSEStream?: (options?: CloseSSEStreamOptions) => void;
157168
};
158169

159170
/**
@@ -369,7 +380,8 @@ export abstract class Protocol<SendRequestT extends Request, SendNotificationT e
369380
sendRequest: (r, resultSchema, options?) => this.request(r, resultSchema, { ...options, relatedRequestId: request.id }),
370381
authInfo: extra?.authInfo,
371382
requestId: request.id,
372-
requestInfo: extra?.requestInfo
383+
requestInfo: extra?.requestInfo,
384+
closeSSEStream: extra?.closeSSEStream
373385
};
374386

375387
// Starting with Promise.resolve() puts any synchronous errors into the monad as well.

src/types.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1841,6 +1841,17 @@ export interface RequestInfo {
18411841
headers: IsomorphicHeaders;
18421842
}
18431843

1844+
/**
1845+
* Options for closing an SSE stream.
1846+
*/
1847+
export interface CloseSSEStreamOptions {
1848+
/**
1849+
* Retry interval in milliseconds to suggest to clients before closing.
1850+
* When set, sends an SSE retry field to override the transport's default.
1851+
*/
1852+
retryInterval?: number;
1853+
}
1854+
18441855
/**
18451856
* Extra information about a message.
18461857
*/
@@ -1854,6 +1865,12 @@ export interface MessageExtraInfo {
18541865
* The authentication information.
18551866
*/
18561867
authInfo?: AuthInfo;
1868+
1869+
/**
1870+
* Callback to close the SSE stream for this request, triggering client reconnection.
1871+
* Only available when using StreamableHTTPServerTransport with eventStore configured.
1872+
*/
1873+
closeSSEStream?: (options?: CloseSSEStreamOptions) => void;
18571874
}
18581875

18591876
/* JSON-RPC types */

0 commit comments

Comments
 (0)