Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/examples/server/ssePollingExample.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* Key features:
* - Configures `retryInterval` to tell clients how long to wait before reconnecting
* - Uses `eventStore` to persist events for replay after reconnection
* - Calls `closeSSEStream()` to gracefully disconnect clients mid-operation
* - Uses `extra.closeSSEStream()` callback to gracefully disconnect clients mid-operation
*
* Run with: npx tsx src/examples/server/ssePollingExample.ts
* Test with: curl or the MCP Inspector
Expand All @@ -31,9 +31,6 @@ const server = new McpServer(
}
);

// Track active transports by session ID for closeSSEStream access
const transports = new Map<string, StreamableHTTPServerTransport>();

// Register a long-running tool that demonstrates server-initiated disconnect
server.tool(
'long-task',
Expand Down Expand Up @@ -66,10 +63,10 @@ server.tool(

// Server decides to disconnect the client to free resources
// Client will reconnect via GET with Last-Event-ID after retryInterval
const transport = transports.get(extra.sessionId!);
if (transport) {
// Use extra.closeSSEStream callback - available when eventStore is configured
if (extra.closeSSEStream) {
console.log(`[${extra.sessionId}] Closing SSE stream to trigger client polling...`);
transport.closeSSEStream(extra.requestId);
extra.closeSSEStream({ retryInterval: 2000 });
}

// Continue processing while client is disconnected
Expand Down Expand Up @@ -112,6 +109,9 @@ app.use(cors());
// Create event store for resumability
const eventStore = new InMemoryEventStore();

// Track transports by session ID for session reuse
const transports = new Map<string, StreamableHTTPServerTransport>();

Comment on lines +112 to +114
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Track transports by session ID for session reuse
const transports = new Map<string, StreamableHTTPServerTransport>();

don't think you need this any more

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought so too at first, but we do still need this at line 120 below:

// Reuse existing transport or create new one
let transport = sessionId ? transports.get(sessionId) : undefined;

// Handle all MCP requests - use express.json() only for this route
app.all('/mcp', express.json(), async (req: Request, res: Response) => {
const sessionId = req.headers['mcp-session-id'] as string | undefined;
Expand All @@ -123,7 +123,7 @@ app.all('/mcp', express.json(), async (req: Request, res: Response) => {
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
eventStore,
retryInterval: 2000, // Client should reconnect after 2 seconds
retryInterval: 2000, // Default retry interval for priming events
onsessioninitialized: id => {
console.log(`[${id}] Session initialized`);
transports.set(id, transport!);
Expand Down
184 changes: 184 additions & 0 deletions src/server/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1802,6 +1802,190 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
// Clean up - resolve the tool promise
toolResolve!();
});

it('should provide closeSSEStream callback in extra when eventStore is configured', async () => {
const result = await createTestServer({
sessionIdGenerator: () => randomUUID(),
eventStore: createEventStore(),
retryInterval: 1000
});
server = result.server;
transport = result.transport;
baseUrl = result.baseUrl;
mcpServer = result.mcpServer;

// Track whether closeSSEStream callback was provided
let receivedCloseSSEStream: ((options?: { retryInterval?: number }) => void) | undefined;

// Register a tool that captures the extra.closeSSEStream callback
mcpServer.tool('test-callback-tool', 'Test tool', {}, async (_args, extra) => {
receivedCloseSSEStream = extra.closeSSEStream;
return { content: [{ type: 'text', text: 'Done' }] };
});

// Initialize to get session ID
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
sessionId = initResponse.headers.get('mcp-session-id') as string;
expect(sessionId).toBeDefined();

// Call the tool
const toolCallRequest: JSONRPCMessage = {
jsonrpc: '2.0',
id: 200,
method: 'tools/call',
params: { name: 'test-callback-tool', arguments: {} }
};

const postResponse = await fetch(baseUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/event-stream, application/json',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-03-26'
},
body: JSON.stringify(toolCallRequest)
});

expect(postResponse.status).toBe(200);

// Read all events to completion
const reader = postResponse.body?.getReader();
while (true) {
const { done } = await reader!.read();
if (done) break;
}

// Verify closeSSEStream callback was provided
expect(receivedCloseSSEStream).toBeDefined();
expect(typeof receivedCloseSSEStream).toBe('function');
});

it('should NOT provide closeSSEStream callback when eventStore is NOT configured', async () => {
const result = await createTestServer({
sessionIdGenerator: () => randomUUID()
// No eventStore
});
server = result.server;
transport = result.transport;
baseUrl = result.baseUrl;
mcpServer = result.mcpServer;

// Track whether closeSSEStream callback was provided
let receivedCloseSSEStream: ((options?: { retryInterval?: number }) => void) | undefined;

// Register a tool that captures the extra.closeSSEStream callback
mcpServer.tool('test-no-callback-tool', 'Test tool', {}, async (_args, extra) => {
receivedCloseSSEStream = extra.closeSSEStream;
return { content: [{ type: 'text', text: 'Done' }] };
});

// Initialize to get session ID
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
sessionId = initResponse.headers.get('mcp-session-id') as string;
expect(sessionId).toBeDefined();

// Call the tool
const toolCallRequest: JSONRPCMessage = {
jsonrpc: '2.0',
id: 201,
method: 'tools/call',
params: { name: 'test-no-callback-tool', arguments: {} }
};

const postResponse = await fetch(baseUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/event-stream, application/json',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-03-26'
},
body: JSON.stringify(toolCallRequest)
});

expect(postResponse.status).toBe(200);

// Read all events to completion
const reader = postResponse.body?.getReader();
while (true) {
const { done } = await reader!.read();
if (done) break;
}

// Verify closeSSEStream callback was NOT provided
expect(receivedCloseSSEStream).toBeUndefined();
});

it('should send custom retry interval when closeSSEStream is called with retryInterval', async () => {
const result = await createTestServer({
sessionIdGenerator: () => randomUUID(),
eventStore: createEventStore(),
retryInterval: 1000 // Default
});
server = result.server;
transport = result.transport;
baseUrl = result.baseUrl;
mcpServer = result.mcpServer;

// Track tool execution state
let toolResolve: () => void;
const toolPromise = new Promise<void>(resolve => {
toolResolve = resolve;
});

// Register a tool that uses closeSSEStream with custom retry interval
mcpServer.tool('custom-retry-tool', 'Test tool', {}, async (_args, extra) => {
// Use closeSSEStream with custom retry interval
extra.closeSSEStream?.({ retryInterval: 5000 });
await toolPromise;
return { content: [{ type: 'text', text: 'Done' }] };
});

// Initialize to get session ID
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
sessionId = initResponse.headers.get('mcp-session-id') as string;
expect(sessionId).toBeDefined();

// Call the tool
const toolCallRequest: JSONRPCMessage = {
jsonrpc: '2.0',
id: 202,
method: 'tools/call',
params: { name: 'custom-retry-tool', arguments: {} }
};

const postResponse = await fetch(baseUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Accept: 'text/event-stream, application/json',
'mcp-session-id': sessionId,
'mcp-protocol-version': '2025-03-26'
},
body: JSON.stringify(toolCallRequest)
});

expect(postResponse.status).toBe(200);

// Collect all SSE data
const reader = postResponse.body?.getReader();
let allText = '';
while (true) {
const { done, value } = await reader!.read();
if (value) {
allText += new TextDecoder().decode(value);
}
if (done) break;
}

// Verify the custom retry interval was sent
// The stream should contain "retry: 5000" (the custom value)
expect(allText).toContain('retry: 5000');

// Clean up
toolResolve!();
});
});

// Test onsessionclosed callback
Expand Down
21 changes: 19 additions & 2 deletions src/server/streamableHttp.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { IncomingMessage, ServerResponse } from 'node:http';
import { Transport } from '../shared/transport.js';
import {
CloseSSEStreamOptions,
MessageExtraInfo,
RequestInfo,
isInitializeRequest,
Expand Down Expand Up @@ -649,7 +650,15 @@ export class StreamableHTTPServerTransport implements Transport {

// handle each message
for (const message of messages) {
this.onmessage?.(message, { authInfo, requestInfo });
// Build closeSSEStream callback for requests when eventStore is configured
let closeSSEStream: ((options?: CloseSSEStreamOptions) => void) | undefined;
if (isJSONRPCRequest(message) && this._eventStore) {
closeSSEStream = (options?: CloseSSEStreamOptions) => {
this.closeSSEStream(message.id, options?.retryInterval);
};
}

this.onmessage?.(message, { authInfo, requestInfo, closeSSEStream });
}
// The server SHOULD NOT close the SSE stream before sending all JSON-RPC responses
// This will be handled by the send() method when responses are ready
Expand Down Expand Up @@ -794,13 +803,21 @@ export class StreamableHTTPServerTransport implements Transport {
* Close an SSE stream for a specific request, triggering client reconnection.
* Use this to implement polling behavior during long-running operations -
* client will reconnect after the retry interval specified in the priming event.
*
* @param requestId - The request ID whose stream should be closed
* @param retryInterval - Optional retry interval in milliseconds to send before closing.
* If provided, sends a retry field to override the transport default.
*/
closeSSEStream(requestId: RequestId): void {
closeSSEStream(requestId: RequestId, retryInterval?: number): void {
const streamId = this._requestToStreamMapping.get(requestId);
if (!streamId) return;

const stream = this._streamMapping.get(streamId);
if (stream) {
// If a custom retry interval is provided, send it before closing
if (retryInterval !== undefined) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this cause problems for the client?
If the "event" field is missing, the SSE spec says that it should be treated as a "message" event.

stream.write(`retry: ${retryInterval}\n\n`);
}
stream.end();
this._streamMapping.delete(streamId);
}
Expand Down
14 changes: 13 additions & 1 deletion src/shared/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { AnySchema, AnyObjectSchema, SchemaOutput, safeParse } from '../server/z
import {
CancelledNotificationSchema,
ClientCapabilities,
CloseSSEStreamOptions,
ErrorCode,
isJSONRPCError,
isJSONRPCRequest,
Expand Down Expand Up @@ -154,6 +155,16 @@ export type RequestHandlerExtra<SendRequestT extends Request, SendNotificationT
* This is used by certain transports to correctly associate related messages.
*/
sendRequest: <U extends AnySchema>(request: SendRequestT, resultSchema: U, options?: RequestOptions) => Promise<SchemaOutput<U>>;

/**
* Closes the SSE stream for this request, triggering client reconnection.
* Only available when using StreamableHTTPServerTransport with eventStore configured.
* Use this to implement polling behavior during long-running operations.
*
* @param options - Optional configuration for the close operation
* @param options.retryInterval - Retry interval in milliseconds to suggest to clients
*/
closeSSEStream?: (options?: CloseSSEStreamOptions) => void;
};

/**
Expand Down Expand Up @@ -369,7 +380,8 @@ export abstract class Protocol<SendRequestT extends Request, SendNotificationT e
sendRequest: (r, resultSchema, options?) => this.request(r, resultSchema, { ...options, relatedRequestId: request.id }),
authInfo: extra?.authInfo,
requestId: request.id,
requestInfo: extra?.requestInfo
requestInfo: extra?.requestInfo,
closeSSEStream: extra?.closeSSEStream
};

// Starting with Promise.resolve() puts any synchronous errors into the monad as well.
Expand Down
17 changes: 17 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1841,6 +1841,17 @@ export interface RequestInfo {
headers: IsomorphicHeaders;
}

/**
* Options for closing an SSE stream.
*/
export interface CloseSSEStreamOptions {
/**
* Retry interval in milliseconds to suggest to clients before closing.
* When set, sends an SSE retry field to override the transport's default.
*/
retryInterval?: number;
}

/**
* Extra information about a message.
*/
Expand All @@ -1854,6 +1865,12 @@ export interface MessageExtraInfo {
* The authentication information.
*/
authInfo?: AuthInfo;

/**
* Callback to close the SSE stream for this request, triggering client reconnection.
* Only available when using StreamableHTTPServerTransport with eventStore configured.
*/
closeSSEStream?: (options?: CloseSSEStreamOptions) => void;
}

/* JSON-RPC types */
Expand Down
Loading