Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions packages/client/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import {
} from '@modelcontextprotocol/core';

import { ExperimentalClientTasks } from '../experimental/tasks/client.js';
import { StreamableHTTPClientTransport, StreamableHTTPError } from './streamableHttp.js';

/**
* Elicitation default application helper. Applies defaults to the data based on the schema.
Expand Down Expand Up @@ -537,6 +538,43 @@ export class Client<
}
}

/**
* Override request to handle session recovery.
* When a 404 error is caught on an active session (server returned 404 for expired session),
* this will automatically reconnect and retry the request once.
*/
override request<T extends AnyObjectSchema>(
request: ClientRequest | RequestT,
resultSchema: T,
options?: RequestOptions
): Promise<SchemaOutput<T>> {
return this._requestWithSessionRecovery(request, resultSchema, options);
}

private async _requestWithSessionRecovery<T extends AnyObjectSchema>(
request: ClientRequest | RequestT,
resultSchema: T,
options?: RequestOptions
): Promise<SchemaOutput<T>> {
try {
return await super.request(request, resultSchema, options);
} catch (error) {
const transport = this.transport;
const isSessionTerminated =
error instanceof StreamableHTTPError &&
error.code === 404 &&
transport instanceof StreamableHTTPClientTransport &&
transport.sessionId !== undefined;

if (isSessionTerminated && request.method !== 'initialize') {
await transport.resetSession();
await this.connect(transport, options);
return await super.request(request, resultSchema, options);
}
throw error;
}
}

/**
* After initialization has completed, this will be populated with the server's reported capabilities.
*/
Expand Down
11 changes: 11 additions & 0 deletions packages/client/src/client/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,17 @@ export class StreamableHTTPClientTransport implements Transport {
this.onclose?.();
}

/**
* Resets the session by closing the current connection and clearing the session ID.
* After calling this, the next request will establish a fresh session.
*/
async resetSession(): Promise<void> {
await this.close();
this._sessionId = undefined;
this._abortController = undefined;
await this.start();
}

async send(
message: JSONRPCMessage | JSONRPCMessage[],
options?: { resumptionToken?: string; onresumptiontoken?: (token: string) => void }
Expand Down
34 changes: 34 additions & 0 deletions packages/client/test/client/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1626,4 +1626,38 @@ describe('StreamableHTTPClientTransport', () => {
});
});
});

describe('resetSession', () => {
it('should clear session and allow new session to be established', async () => {
const message: JSONRPCMessage = {
jsonrpc: '2.0',
method: 'test',
params: {},
id: 'test-id'
};

(global.fetch as Mock)
.mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers({ 'content-type': 'text/event-stream', 'mcp-session-id': 'session-1' }),
body: null
})
.mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers({ 'content-type': 'text/event-stream', 'mcp-session-id': 'session-2' }),
body: null
});

await transport.send(message);
expect(transport.sessionId).toBe('session-1');

await transport.resetSession();
expect(transport.sessionId).toBeUndefined();

await transport.send(message);
expect(transport.sessionId).toBe('session-2');
});
});
});
Loading
Loading