diff --git a/examples/clients/typescript/sse-retry-test.ts b/examples/clients/typescript/sse-retry-test.ts index 8595668..8c16ca2 100644 --- a/examples/clients/typescript/sse-retry-test.ts +++ b/examples/clients/typescript/sse-retry-test.ts @@ -4,12 +4,13 @@ * SSE Retry Test Client * * Tests that the MCP client respects the SSE retry field when reconnecting. - * This client connects to a test server that sends retry: field and closes - * the connection, then validates that the client waits the appropriate time. + * This client connects to a test server that closes the SSE stream mid-tool-call, + * then waits for the client to reconnect and sends the tool result. */ import { Client } from '@modelcontextprotocol/sdk/client/index.js'; import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'; +import { CallToolResultSchema } from '@modelcontextprotocol/sdk/types.js'; async function main(): Promise { const serverUrl = process.argv[2]; @@ -42,7 +43,6 @@ async function main(): Promise { } }); - // Track reconnection events transport.onerror = (error) => { console.log(`Transport error: ${error.message}`); }; @@ -55,24 +55,23 @@ async function main(): Promise { await client.connect(transport); console.log('Connected to MCP server'); - // Keep connection alive to observe reconnection behavior - // The server will close the POST SSE stream and the client should reconnect via GET - console.log('Waiting for reconnection cycle...'); + console.log('Calling test_reconnection tool...'); console.log( - 'Server will send priming event with retry field, then close POST SSE stream' - ); - console.log( - 'Client should wait for retry period (2000ms) then reconnect via GET with Last-Event-ID' + 'Server will close SSE stream mid-call and send result after reconnection' ); - // Wait long enough for: - // 1. Server to send priming event with retry field on POST SSE stream (100ms) - // 2. Server closes POST stream to trigger reconnection - // 3. Client waits for retry period (2000ms expected) - // 4. Client reconnects via GET with Last-Event-ID header - await new Promise((resolve) => setTimeout(resolve, 6000)); + const result = await client.request( + { + method: 'tools/call', + params: { + name: 'test_reconnection', + arguments: {} + } + }, + CallToolResultSchema + ); - console.log('Test duration complete'); + console.log('Tool call completed:', JSON.stringify(result, null, 2)); await transport.close(); console.log('Connection closed successfully'); diff --git a/examples/servers/typescript/everything-server.ts b/examples/servers/typescript/everything-server.ts index a9c7f77..9dd382a 100644 --- a/examples/servers/typescript/everything-server.ts +++ b/examples/servers/typescript/everything-server.ts @@ -349,6 +349,46 @@ function createMcpServer() { } ); + // SEP-1699: Reconnection test tool - closes SSE stream mid-call to test client reconnection + mcpServer.registerTool( + 'test_reconnection', + { + description: + 'Tests SSE stream disconnection and client reconnection (SEP-1699). Server will close the stream mid-call and send the result after client reconnects.', + inputSchema: {} + }, + async (_args, { sessionId, requestId }) => { + const sleep = (ms: number) => + new Promise((resolve) => setTimeout(resolve, ms)); + + console.log(`[${sessionId}] Starting test_reconnection tool...`); + + // Get the transport for this session + const transport = sessionId ? transports[sessionId] : undefined; + if (transport && requestId) { + // Close the SSE stream to trigger client reconnection + console.log( + `[${sessionId}] Closing SSE stream to trigger client polling...` + ); + transport.closeSSEStream(requestId); + } + + // Wait for client to reconnect (should respect retry field) + await sleep(100); + + console.log(`[${sessionId}] test_reconnection tool complete`); + + return { + content: [ + { + type: 'text', + text: 'Reconnection test completed successfully. If you received this, the client properly reconnected after stream closure.' + } + ] + }; + } + ); + // Sampling tool - requests LLM completion from client mcpServer.registerTool( 'test_sampling', diff --git a/src/index.ts b/src/index.ts index a65e79f..6128a16 100644 --- a/src/index.ts +++ b/src/index.ts @@ -204,11 +204,14 @@ program 'Suite to run: "active" (default, excludes pending), "all", or "pending"', 'active' ) + .option('--verbose', 'Show verbose output (JSON instead of pretty print)') .action(async (options) => { try { // Validate options with Zod const validated = ServerOptionsSchema.parse(options); + const verbose = options.verbose ?? false; + // If a single scenario is specified, run just that one if (validated.scenario) { const result = await runServerConformanceTest( @@ -218,7 +221,8 @@ program const { failed } = printServerResults( result.checks, - result.scenarioDescription + result.scenarioDescription, + verbose ); process.exit(failed > 0 ? 1 : 0); } else { diff --git a/src/runner/server.ts b/src/runner/server.ts index 46be558..18c8254 100644 --- a/src/runner/server.ts +++ b/src/runner/server.ts @@ -2,7 +2,7 @@ import { promises as fs } from 'fs'; import path from 'path'; import { ConformanceCheck } from '../types'; import { getClientScenario } from '../scenarios'; -import { ensureResultsDir, createResultDir } from './utils'; +import { ensureResultsDir, createResultDir, formatPrettyChecks } from './utils'; /** * Format markdown-style text for terminal output using ANSI codes @@ -54,7 +54,8 @@ export async function runServerConformanceTest( export function printServerResults( checks: ConformanceCheck[], - scenarioDescription: string + scenarioDescription: string, + verbose: boolean = false ): { passed: number; failed: number; @@ -68,7 +69,11 @@ export function printServerResults( const failed = checks.filter((c) => c.status === 'FAILURE').length; const warnings = checks.filter((c) => c.status === 'WARNING').length; - console.log(`Checks:\n${JSON.stringify(checks, null, 2)}`); + if (verbose) { + console.log(JSON.stringify(checks, null, 2)); + } else { + console.log(`Checks:\n${formatPrettyChecks(checks)}`); + } console.log(`\nTest Results:`); console.log( diff --git a/src/scenarios/client/sse-retry.ts b/src/scenarios/client/sse-retry.ts index a982678..6e44a06 100644 --- a/src/scenarios/client/sse-retry.ts +++ b/src/scenarios/client/sse-retry.ts @@ -20,14 +20,17 @@ export class SSERetryScenario implements Scenario { private port: number = 0; // Timing tracking - private postStreamCloseTime: number | null = null; + private toolStreamCloseTime: number | null = null; private getReconnectionTime: number | null = null; private getConnectionCount: number = 0; private lastEventIds: (string | undefined)[] = []; - private retryValue: number = 2000; // 2 seconds + private retryValue: number = 500; // 500ms private eventIdCounter: number = 0; private sessionId: string = `session-${Date.now()}`; - private primingEventId: string | null = null; + + // Pending tool call to respond to after reconnection + private pendingToolCallId: number | string | null = null; + private getResponseStream: http.ServerResponse | null = null; // Tolerances for timing validation private readonly EARLY_TOLERANCE = 50; // Allow 50ms early for scheduler variance @@ -89,7 +92,26 @@ export class SSERetryScenario implements Scenario { this.getReconnectionTime = performance.now(); const lastEventId = req.headers['last-event-id'] as string | undefined; - this.lastEventIds.push(lastEventId); + const description = lastEventId + ? `Received GET request for ${req.url} (Last-Event-ID: ${lastEventId})` + : `Received GET request for ${req.url}`; + this.checks.push({ + id: 'incoming-request', + name: 'IncomingRequest', + description, + status: 'INFO', + timestamp: new Date().toISOString(), + details: { + method: 'GET', + url: req.url, + headers: req.headers, + connectionCount: this.getConnectionCount + } + }); + + if (lastEventId) { + this.lastEventIds.push(lastEventId); + } // Handle GET SSE stream request (reconnection) this.handleGetSSEStream(req, res); @@ -119,12 +141,62 @@ export class SSERetryScenario implements Scenario { const eventId = `event-${this.eventIdCounter}`; // Send priming event with ID and retry field - res.write(`id: ${eventId}\n`); - res.write(`retry: ${this.retryValue}\n`); - res.write(`data: \n\n`); + const primingContent = `id: ${eventId}\nretry: ${this.retryValue}\ndata: \n\n`; + res.write(primingContent); + + this.checks.push({ + id: 'outgoing-sse-event', + name: 'OutgoingSseEvent', + description: `Sent SSE priming event on GET stream (id: ${eventId}, retry: ${this.retryValue}ms)`, + status: 'INFO', + timestamp: new Date().toISOString(), + details: { + eventId, + retryMs: this.retryValue, + eventType: 'priming', + raw: primingContent + } + }); + + // Store the GET stream to send pending tool response + this.getResponseStream = res; + + // If we have a pending tool call, send the response now + if (this.pendingToolCallId !== null) { + const toolResponse = { + jsonrpc: '2.0', + id: this.pendingToolCallId, + result: { + content: [ + { + type: 'text', + text: 'Reconnection test completed successfully' + } + ] + } + }; + + const responseEventId = `event-${++this.eventIdCounter}`; + const responseContent = `event: message\nid: ${responseEventId}\ndata: ${JSON.stringify(toolResponse)}\n\n`; + res.write(responseContent); + + this.checks.push({ + id: 'outgoing-sse-event', + name: 'OutgoingSseEvent', + description: `Sent tool response on GET stream after reconnection (id: ${responseEventId})`, + status: 'INFO', + timestamp: new Date().toISOString(), + details: { + eventId: responseEventId, + eventType: 'message', + jsonrpcId: this.pendingToolCallId, + body: toolResponse, + raw: responseContent + } + }); - // Keep connection open for now (don't close immediately to avoid infinite reconnection loop) - // The test will stop the server when done + this.pendingToolCallId = null; + } } private handlePostRequest( @@ -141,48 +213,26 @@ export class SSERetryScenario implements Scenario { try { const request = JSON.parse(body); - if (request.method === 'initialize') { - // Respond to initialize request with SSE stream containing priming event - res.writeHead(200, { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache', - Connection: 'keep-alive', - 'mcp-session-id': this.sessionId - }); - - // Generate priming event ID - this.eventIdCounter++; - this.primingEventId = `event-${this.eventIdCounter}`; - - // Send priming event with retry field - res.write(`id: ${this.primingEventId}\n`); - res.write(`retry: ${this.retryValue}\n`); - res.write(`data: \n\n`); + this.checks.push({ + id: 'incoming-request', + name: 'IncomingRequest', + description: `Received POST request for ${req.url} (method: ${request.method})`, + status: 'INFO', + timestamp: new Date().toISOString(), + details: { + method: 'POST', + url: req.url, + jsonrpcMethod: request.method, + jsonrpcId: request.id + } + }); - // Send initialize response - const response = { - jsonrpc: '2.0', - id: request.id, - result: { - protocolVersion: '2025-03-26', - serverInfo: { - name: 'sse-retry-test-server', - version: '1.0.0' - }, - capabilities: {} - } - }; - - res.write(`event: message\n`); - res.write(`id: event-${++this.eventIdCounter}\n`); - res.write(`data: ${JSON.stringify(response)}\n\n`); - - // Close connection after sending response to trigger reconnection - // Record the time when we close the stream - setTimeout(() => { - this.postStreamCloseTime = performance.now(); - res.end(); - }, 100); + if (request.method === 'initialize') { + this.handleInitialize(req, res, request); + } else if (request.method === 'tools/list') { + this.handleToolsList(res, request); + } else if (request.method === 'tools/call') { + this.handleToolsCall(res, request); } else if (request.id === undefined) { // Notifications (no id) - return 202 Accepted res.writeHead(202); @@ -216,17 +266,148 @@ export class SSERetryScenario implements Scenario { }); } + private handleInitialize( + _req: http.IncomingMessage, + res: http.ServerResponse, + request: any + ): void { + res.writeHead(200, { + 'Content-Type': 'application/json', + 'mcp-session-id': this.sessionId + }); + + const response = { + jsonrpc: '2.0', + id: request.id, + result: { + protocolVersion: '2025-03-26', + serverInfo: { + name: 'sse-retry-test-server', + version: '1.0.0' + }, + capabilities: { + tools: {} + } + } + }; + + res.end(JSON.stringify(response)); + + this.checks.push({ + id: 'outgoing-response', + name: 'OutgoingResponse', + description: `Sent initialize response`, + status: 'INFO', + timestamp: new Date().toISOString(), + details: { + jsonrpcId: request.id, + body: response + } + }); + } + + private handleToolsList(res: http.ServerResponse, request: any): void { + res.writeHead(200, { + 'Content-Type': 'application/json', + 'mcp-session-id': this.sessionId + }); + + const response = { + jsonrpc: '2.0', + id: request.id, + result: { + tools: [ + { + name: 'test_reconnection', + description: + 'A tool that triggers SSE stream closure to test client reconnection behavior', + inputSchema: { + type: 'object', + properties: {}, + required: [] + } + } + ] + } + }; + + res.end(JSON.stringify(response)); + + this.checks.push({ + id: 'outgoing-response', + name: 'OutgoingResponse', + description: `Sent tools/list response`, + status: 'INFO', + timestamp: new Date().toISOString(), + details: { + jsonrpcId: request.id, + body: response + } + }); + } + + private handleToolsCall(res: http.ServerResponse, request: any): void { + // Store the request ID so we can respond after reconnection + this.pendingToolCallId = request.id; + + // Start SSE stream + res.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'mcp-session-id': this.sessionId + }); + + // Send priming event with retry field + this.eventIdCounter++; + const primingEventId = `event-${this.eventIdCounter}`; + const primingContent = `id: ${primingEventId}\nretry: ${this.retryValue}\ndata: \n\n`; + res.write(primingContent); + + this.checks.push({ + id: 'outgoing-sse-event', + name: 'OutgoingSseEvent', + description: `Sent SSE priming event for tools/call (id: ${primingEventId}, retry: ${this.retryValue}ms)`, + status: 'INFO', + timestamp: new Date().toISOString(), + details: { + eventId: primingEventId, + retryMs: this.retryValue, + eventType: 'priming', + raw: primingContent + } + }); + + // Close the stream after a short delay to trigger reconnection + setTimeout(() => { + this.toolStreamCloseTime = performance.now(); + this.checks.push({ + id: 'outgoing-stream-close', + name: 'OutgoingStreamClose', + description: + 'Closed tools/call SSE stream to trigger client reconnection', + status: 'INFO', + timestamp: new Date().toISOString(), + details: { + retryMs: this.retryValue, + pendingToolCallId: this.pendingToolCallId + } + }); + res.end(); + }, 50); + } + private generateChecks(): void { - // Check 1: Client should have reconnected via GET after POST stream close + // Check 1: Client should have reconnected via GET after tool call stream close if (this.getConnectionCount < 1) { this.checks.push({ id: 'client-sse-graceful-reconnect', name: 'ClientGracefulReconnect', description: - 'Client reconnects via GET after POST SSE stream is closed gracefully', + 'Client reconnects via GET after SSE stream is closed gracefully', status: 'FAILURE', timestamp: new Date().toISOString(), - errorMessage: `Client did not attempt GET reconnection after POST stream closure. Client should treat graceful stream close as reconnectable.`, + errorMessage: `Client did not attempt GET reconnection after stream closure. Client should treat graceful stream close as reconnectable.`, specReferences: [ { id: 'SEP-1699', @@ -235,7 +416,7 @@ export class SSERetryScenario implements Scenario { ], details: { getConnectionCount: this.getConnectionCount, - postStreamCloseTime: this.postStreamCloseTime, + toolStreamCloseTime: this.toolStreamCloseTime, retryValue: this.retryValue } }); @@ -247,7 +428,7 @@ export class SSERetryScenario implements Scenario { id: 'client-sse-graceful-reconnect', name: 'ClientGracefulReconnect', description: - 'Client reconnects via GET after POST SSE stream is closed gracefully', + 'Client reconnects via GET after SSE stream is closed gracefully', status: 'SUCCESS', timestamp: new Date().toISOString(), specReferences: [ @@ -263,10 +444,10 @@ export class SSERetryScenario implements Scenario { // Check 2: Client MUST respect retry field timing if ( - this.postStreamCloseTime !== null && + this.toolStreamCloseTime !== null && this.getReconnectionTime !== null ) { - const actualDelay = this.getReconnectionTime - this.postStreamCloseTime; + const actualDelay = this.getReconnectionTime - this.toolStreamCloseTime; const minExpected = this.retryValue - this.EARLY_TOLERANCE; const maxExpected = this.retryValue + this.LATE_TOLERANCE; @@ -330,7 +511,7 @@ export class SSERetryScenario implements Scenario { status: 'WARNING', timestamp: new Date().toISOString(), errorMessage: - 'Could not measure timing - POST stream close time or GET reconnection time not recorded', + 'Could not measure timing - tool stream close time or GET reconnection time not recorded', specReferences: [ { id: 'SEP-1699', @@ -338,7 +519,7 @@ export class SSERetryScenario implements Scenario { } ], details: { - postStreamCloseTime: this.postStreamCloseTime, + toolStreamCloseTime: this.toolStreamCloseTime, getReconnectionTime: this.getReconnectionTime } }); @@ -364,7 +545,6 @@ export class SSERetryScenario implements Scenario { details: { hasLastEventId, lastEventIds: this.lastEventIds, - primingEventId: this.primingEventId, getConnectionCount: this.getConnectionCount }, errorMessage: !hasLastEventId diff --git a/src/scenarios/server/sse-polling.ts b/src/scenarios/server/sse-polling.ts index 0b1194d..9735bb2 100644 --- a/src/scenarios/server/sse-polling.ts +++ b/src/scenarios/server/sse-polling.ts @@ -4,6 +4,7 @@ * Tests that servers properly implement SSE polling behavior including: * - Sending priming events with event ID and empty data on POST SSE streams * - Sending retry field in priming events when configured + * - Closing SSE stream mid-operation and resuming after client reconnects * - Replaying events when client reconnects with Last-Event-ID */ @@ -12,10 +13,62 @@ import { EventSourceParserStream } from 'eventsource-parser/stream'; import { Client } from '@modelcontextprotocol/sdk/client/index.js'; import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'; +function createLoggingFetch(checks: ConformanceCheck[]) { + return async (url: string, options: RequestInit): Promise => { + const method = options.method || 'GET'; + let description = `Sending ${method} request`; + if (options.body) { + try { + const body = JSON.parse(options.body as string); + if (body.method) { + description = `Sending ${method} ${body.method}`; + } + } catch { + // Not JSON + } + } + + checks.push({ + id: 'outgoing-request', + name: 'OutgoingRequest', + description, + status: 'INFO', + timestamp: new Date().toISOString(), + details: { + method, + url, + headers: options.headers, + body: options.body ? JSON.parse(options.body as string) : undefined + } + }); + + const response = await fetch(url, options); + + const responseHeaders: Record = {}; + response.headers.forEach((value, key) => { + responseHeaders[key] = value; + }); + + checks.push({ + id: 'incoming-response', + name: 'IncomingResponse', + description: `Received ${response.status} response for ${method}`, + status: 'INFO', + timestamp: new Date().toISOString(), + details: { + statusCode: response.status, + headers: responseHeaders + } + }); + + return response; + }; +} + export class ServerSSEPollingScenario implements ClientScenario { name = 'server-sse-polling'; description = - 'Test server sends SSE priming events on POST streams and supports event replay (SEP-1699)'; + 'Test server SSE polling via test_reconnection tool that closes stream mid-call (SEP-1699)'; async run(serverUrl: string): Promise { const checks: ConformanceCheck[] = []; @@ -65,9 +118,11 @@ export class ServerSSEPollingScenario implements ClientScenario { }); } - // Step 2: Make a POST request that returns SSE stream - // We need to use raw fetch to observe the priming event - const postResponse = await fetch(serverUrl, { + // Step 2: Call test_reconnection tool via raw fetch to observe SSE behavior + // This tool should close the stream mid-call, requiring reconnection + const loggingFetch = createLoggingFetch(checks); + + const postResponse = await loggingFetch(serverUrl, { method: 'POST', headers: { 'Content-Type': 'application/json', @@ -78,12 +133,35 @@ export class ServerSSEPollingScenario implements ClientScenario { body: JSON.stringify({ jsonrpc: '2.0', id: 1, - method: 'tools/list', - params: {} + method: 'tools/call', + params: { + name: 'test_reconnection', + arguments: {} + } }) }); if (!postResponse.ok) { + // Check if tool doesn't exist (method not found or similar) + if (postResponse.status === 400 || postResponse.status === 404) { + checks.push({ + id: 'server-sse-test-reconnection-tool', + name: 'ServerTestReconnectionTool', + description: + 'Server implements test_reconnection tool for SSE polling tests', + status: 'WARNING', + timestamp: new Date().toISOString(), + errorMessage: `Server does not implement test_reconnection tool (HTTP ${postResponse.status}). This tool is recommended for testing SSE polling behavior.`, + specReferences: [ + { + id: 'SEP-1699', + url: 'https://github.com/modelcontextprotocol/modelcontextprotocol/issues/1699' + } + ] + }); + return checks; + } + checks.push({ id: 'server-sse-post-request', name: 'ServerSSEPostRequest', @@ -125,14 +203,15 @@ export class ServerSSEPollingScenario implements ClientScenario { return checks; } - // Step 3: Parse SSE stream for priming event + // Step 3: Parse SSE stream for priming event and tool response let hasEventId = false; let hasPrimingEvent = false; let primingEventIsFirst = false; let hasRetryField = false; let retryValue: number | undefined; - let primingEventId: string | undefined; + let lastEventId: string | undefined; let eventCount = 0; + let receivedToolResponse = false; if (!postResponse.body) { checks.push({ @@ -164,10 +243,10 @@ export class ServerSSEPollingScenario implements ClientScenario { ) .getReader(); - // Read events with timeout + // Read events with timeout - expect stream to close before we get the response const timeout = setTimeout(() => { reader.cancel(); - }, 5000); + }, 10000); try { while (true) { @@ -179,31 +258,85 @@ export class ServerSSEPollingScenario implements ClientScenario { eventCount++; - // Check for event ID + // Track the last event ID for reconnection if (event.id) { hasEventId = true; - if (!primingEventId) { - primingEventId = event.id; - } + lastEventId = event.id; // Check if this is a priming event (empty or minimal data) - if ( + const isPriming = event.data === '' || event.data === '{}' || - event.data.trim() === '' - ) { + event.data.trim() === ''; + if (isPriming) { hasPrimingEvent = true; // Check if priming event is the first event if (eventCount === 1) { primingEventIsFirst = true; } } + + // Log the SSE event + checks.push({ + id: 'incoming-sse-event', + name: 'IncomingSseEvent', + description: isPriming + ? `Received SSE priming event (id: ${event.id})` + : `Received SSE event (id: ${event.id})`, + status: 'INFO', + timestamp: new Date().toISOString(), + details: { + eventId: event.id, + eventType: event.event || 'message', + isPriming, + hasRetryField, + retryValue, + data: event.data + } + }); + } + + // Check if this is the tool response + if (event.data) { + try { + const parsed = JSON.parse(event.data); + if (parsed.id === 1 && parsed.result) { + receivedToolResponse = true; + checks.push({ + id: 'incoming-sse-event', + name: 'IncomingSseEvent', + description: `Received tool response on POST stream`, + status: 'INFO', + timestamp: new Date().toISOString(), + details: { + eventId: event.id, + body: parsed + } + }); + } + } catch { + // Not JSON, ignore + } } } } finally { clearTimeout(timeout); } + // Log stream closure + checks.push({ + id: 'stream-closed', + name: 'StreamClosed', + description: `POST SSE stream closed after ${eventCount} event(s)`, + status: 'INFO', + timestamp: new Date().toISOString(), + details: { + eventCount, + lastEventId, + receivedToolResponse + } + }); + // Check 1: Server SHOULD send priming event with ID on POST SSE stream let primingStatus: 'SUCCESS' | 'WARNING' = 'SUCCESS'; let primingErrorMessage: string | undefined; @@ -235,7 +368,7 @@ export class ServerSSEPollingScenario implements ClientScenario { hasPrimingEvent, primingEventIsFirst, hasEventId, - primingEventId, + lastEventId, eventCount }, errorMessage: primingErrorMessage @@ -264,50 +397,71 @@ export class ServerSSEPollingScenario implements ClientScenario { : undefined }); - // Step 4: Test event replay by reconnecting with Last-Event-ID - if (primingEventId && sessionId) { - // Make a GET request with Last-Event-ID to test replay - const getResponse = await fetch(serverUrl, { + // Step 4: If tool response wasn't received, reconnect with Last-Event-ID + if (!receivedToolResponse && lastEventId && sessionId) { + // Make a GET request with Last-Event-ID to get the tool response + const getResponse = await loggingFetch(serverUrl, { method: 'GET', headers: { Accept: 'text/event-stream', 'mcp-session-id': sessionId, 'mcp-protocol-version': '2025-03-26', - 'last-event-id': primingEventId + 'last-event-id': lastEventId } }); - if (getResponse.ok) { - // Server accepted reconnection with Last-Event-ID - let replayedEvents = 0; + if (getResponse.ok && getResponse.body) { + const reconnectReader = getResponse.body + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new EventSourceParserStream()) + .getReader(); - if (getResponse.body) { - const replayReader = getResponse.body - .pipeThrough(new TextDecoderStream()) - .pipeThrough(new EventSourceParserStream()) - .getReader(); + const reconnectTimeout = setTimeout(() => { + reconnectReader.cancel(); + }, 5000); - const replayTimeout = setTimeout(() => { - replayReader.cancel(); - }, 2000); + try { + while (true) { + const { value: event, done } = await reconnectReader.read(); + if (done) break; - try { - while (true) { - const { done } = await replayReader.read(); - if (done) break; - replayedEvents++; + // Log each event received on GET stream + checks.push({ + id: 'incoming-sse-event', + name: 'IncomingSseEvent', + description: `Received SSE event on GET reconnection stream (id: ${event.id || 'none'})`, + status: 'INFO', + timestamp: new Date().toISOString(), + details: { + eventId: event.id, + eventType: event.event || 'message', + data: event.data + } + }); + + // Check if this is the tool response + if (event.data) { + try { + const parsed = JSON.parse(event.data); + if (parsed.id === 1 && parsed.result) { + receivedToolResponse = true; + break; + } + } catch { + // Not JSON, ignore + } } - } finally { - clearTimeout(replayTimeout); } + } finally { + clearTimeout(reconnectTimeout); } checks.push({ - id: 'server-sse-event-replay', - name: 'ServerReplaysEvents', + id: 'server-sse-disconnect-resume', + name: 'ServerDisconnectResume', description: - 'Server replays events after Last-Event-ID on reconnection', - status: 'SUCCESS', + 'Server closes SSE stream mid-call and resumes after client reconnects with Last-Event-ID', + status: receivedToolResponse ? 'SUCCESS' : 'WARNING', timestamp: new Date().toISOString(), specReferences: [ { @@ -316,17 +470,22 @@ export class ServerSSEPollingScenario implements ClientScenario { } ], details: { - lastEventIdUsed: primingEventId, - replayedEvents, - message: 'Server accepted GET request with Last-Event-ID header' - } + lastEventIdUsed: lastEventId, + receivedToolResponse, + message: receivedToolResponse + ? 'Successfully received tool response after reconnection' + : 'Tool response not received after reconnection' + }, + errorMessage: !receivedToolResponse + ? 'Server did not send tool response after client reconnected with Last-Event-ID' + : undefined }); } else { // Check if server doesn't support standalone GET streams if (getResponse.status === 405) { checks.push({ - id: 'server-sse-event-replay', - name: 'ServerReplaysEvents', + id: 'server-sse-disconnect-resume', + name: 'ServerDisconnectResume', description: 'Server supports GET reconnection with Last-Event-ID', status: 'INFO', @@ -345,10 +504,10 @@ export class ServerSSEPollingScenario implements ClientScenario { }); } else { checks.push({ - id: 'server-sse-event-replay', - name: 'ServerReplaysEvents', + id: 'server-sse-disconnect-resume', + name: 'ServerDisconnectResume', description: - 'Server replays events after Last-Event-ID on reconnection', + 'Server supports GET reconnection with Last-Event-ID', status: 'WARNING', timestamp: new Date().toISOString(), specReferences: [ @@ -359,19 +518,40 @@ export class ServerSSEPollingScenario implements ClientScenario { ], details: { statusCode: getResponse.status, - lastEventIdUsed: primingEventId, + lastEventIdUsed: lastEventId, message: `Server returned ${getResponse.status} for GET request with Last-Event-ID` }, errorMessage: `Server did not accept reconnection with Last-Event-ID (HTTP ${getResponse.status})` }); } } + } else if (receivedToolResponse) { + // Tool response was received on the initial POST stream (server didn't disconnect) + checks.push({ + id: 'server-sse-disconnect-resume', + name: 'ServerDisconnectResume', + description: + 'Server closes SSE stream mid-call and resumes after reconnection', + status: 'INFO', + timestamp: new Date().toISOString(), + specReferences: [ + { + id: 'SEP-1699', + url: 'https://github.com/modelcontextprotocol/modelcontextprotocol/issues/1699' + } + ], + details: { + receivedToolResponse: true, + message: + 'Tool response received on initial POST stream - server did not disconnect mid-call. The test_reconnection tool should close the stream before sending the result.' + } + }); } else { checks.push({ - id: 'server-sse-event-replay', - name: 'ServerReplaysEvents', + id: 'server-sse-disconnect-resume', + name: 'ServerDisconnectResume', description: - 'Server replays events after Last-Event-ID on reconnection', + 'Server closes SSE stream mid-call and resumes after reconnection', status: 'INFO', timestamp: new Date().toISOString(), specReferences: [ @@ -381,10 +561,10 @@ export class ServerSSEPollingScenario implements ClientScenario { } ], details: { - primingEventId, + lastEventId, sessionId, message: - 'Could not test event replay - no priming event ID or session ID available' + 'Could not test disconnect/resume - no last event ID or session ID available' } }); }