Skip to content

Commit 585dc06

Browse files
committed
introduce onsessioninitialized to avoid race conditions
1 parent 31f44e0 commit 585dc06

File tree

6 files changed

+45
-46
lines changed

6 files changed

+45
-46
lines changed

src/client/streamableHttp.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Transport } from "../shared/transport.js";
2-
import { isInitializedNotification, isJSONRPCNotification, isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema } from "../types.js";
2+
import { isInitializedNotification, isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema } from "../types.js";
33
import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js";
44
import { EventSourceParserStream } from "eventsource-parser/stream";
55

src/examples/server/jsonResponseStreamableHttp.ts

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,18 +95,17 @@ app.post('/mcp', async (req: Request, res: Response) => {
9595
transport = new StreamableHTTPServerTransport({
9696
sessionIdGenerator: () => randomUUID(),
9797
enableJsonResponse: true, // Enable JSON response mode
98+
onsessioninitialized: (sessionId) => {
99+
// Store the transport by session ID when session is initialized
100+
// This avoids race conditions where requests might come in before the session is stored
101+
console.log(`Session initialized with ID: ${sessionId}`);
102+
transports[sessionId] = transport;
103+
}
98104
});
99105

100106
// Connect the transport to the MCP server BEFORE handling the request
101107
await server.connect(transport);
102-
103-
// After handling the request, if we get a session ID back, store the transport
104108
await transport.handleRequest(req, res, req.body);
105-
106-
// Store the transport by session ID for future requests
107-
if (transport.sessionId) {
108-
transports[transport.sessionId] = transport;
109-
}
110109
return; // Already handled
111110
} else {
112111
// Invalid request - no session ID or not initialization request

src/examples/server/simpleStreamableHttp.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -247,19 +247,19 @@ app.post('/mcp', async (req: Request, res: Response) => {
247247
transport = new StreamableHTTPServerTransport({
248248
sessionIdGenerator: () => randomUUID(),
249249
eventStore, // Enable resumability
250+
onsessioninitialized: (sessionId) => {
251+
// Store the transport by session ID when session is initialized
252+
// This avoids race conditions where requests might come in before the session is stored
253+
console.log(`Session initialized with ID: ${sessionId}`);
254+
transports[sessionId] = transport;
255+
}
250256
});
251257

252258
// Connect the transport to the MCP server BEFORE handling the request
253259
// so responses can flow back through the same transport
254260
await server.connect(transport);
255261

256-
// After handling the request, if we get a session ID back, store the transport
257262
await transport.handleRequest(req, res, req.body);
258-
259-
// Store the transport by session ID for future requests
260-
if (transport.sessionId) {
261-
transports[transport.sessionId] = transport;
262-
}
263263
return; // Already handled
264264
} else {
265265
// Invalid request - no session ID or not initialization request

src/examples/server/standaloneSseWithGetStreamableHttp.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,17 +52,19 @@ app.post('/mcp', async (req: Request, res: Response) => {
5252
// New initialization request
5353
transport = new StreamableHTTPServerTransport({
5454
sessionIdGenerator: () => randomUUID(),
55+
onsessioninitialized: (sessionId) => {
56+
// Store the transport by session ID when session is initialized
57+
// This avoids race conditions where requests might come in before the session is stored
58+
console.log(`Session initialized with ID: ${sessionId}`);
59+
transports[sessionId] = transport;
60+
}
5561
});
5662

5763
// Connect the transport to the MCP server
5864
await server.connect(transport);
5965

66+
// Handle the request - the onsessioninitialized callback will store the transport
6067
await transport.handleRequest(req, res, req.body);
61-
62-
// Store the transport by session ID for future requests
63-
if (transport.sessionId) {
64-
transports[transport.sessionId] = transport;
65-
}
6668
return; // Already handled
6769
} else {
6870
// Invalid request - no session ID or not initialization request

src/server/streamableHttp.test.ts

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -187,15 +187,11 @@ describe("StreamableHTTPServerTransport", () => {
187187
expect(sessionId).toBeDefined();
188188

189189
// Try second initialize
190-
const secondInitMessage: JSONRPCMessage = {
191-
jsonrpc: "2.0",
192-
method: "initialize",
193-
params: {
194-
clientInfo: { name: "test-client-2", version: "1.0" },
195-
protocolVersion: "2025-03-26",
196-
},
197-
id: "init-2",
190+
const secondInitMessage = {
191+
...TEST_MESSAGES.initialize,
192+
id: "second-init"
198193
};
194+
199195
const response = await sendPostRequest(baseUrl, secondInitMessage);
200196

201197
expect(response.status).toBe(400);
@@ -1092,14 +1088,7 @@ describe("StreamableHTTPServerTransport in stateless mode", () => {
10921088
});
10931089

10941090
it("should handle POST requests with various session IDs in stateless mode", async () => {
1095-
// Initialize the server first
1096-
await fetch(baseUrl, {
1097-
method: "POST",
1098-
headers: { "Content-Type": "application/json", Accept: "application/json, text/event-stream" },
1099-
body: JSON.stringify({
1100-
jsonrpc: "2.0", method: "initialize", params: { clientInfo: { name: "test-client", version: "1.0" }, protocolVersion: "2025-03-26" }, id: "init-1"
1101-
}),
1102-
});
1091+
await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
11031092

11041093
// Try with a random session ID - should be accepted
11051094
const response1 = await fetch(baseUrl, {
@@ -1131,13 +1120,7 @@ describe("StreamableHTTPServerTransport in stateless mode", () => {
11311120
// one standalone SSE stream at a time
11321121

11331122
// Initialize the server first
1134-
await fetch(baseUrl, {
1135-
method: "POST",
1136-
headers: { "Content-Type": "application/json", Accept: "application/json, text/event-stream" },
1137-
body: JSON.stringify({
1138-
jsonrpc: "2.0", method: "initialize", params: { clientInfo: { name: "test-client", version: "1.0" }, protocolVersion: "2025-03-26" }, id: "init-1"
1139-
}),
1140-
});
1123+
await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
11411124

11421125
// Open first SSE stream
11431126
const stream1 = await fetch(baseUrl, {

src/server/streamableHttp.ts

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { IncomingMessage, ServerResponse } from "node:http";
22
import { Transport } from "../shared/transport.js";
3-
import { isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema, RequestId } from "../types.js";
3+
import { isInitializeRequest, isJSONRPCRequest, isJSONRPCResponse, JSONRPCMessage, JSONRPCMessageSchema, RequestId } from "../types.js";
44
import getRawBody from "raw-body";
55
import contentType from "content-type";
66
import { randomUUID } from "node:crypto";
@@ -39,6 +39,15 @@ export interface StreamableHTTPServerTransportOptions {
3939
*/
4040
sessionIdGenerator: () => string | undefined;
4141

42+
/**
43+
* A callback for session initialization events
44+
* This is called when the server initializes a new session.
45+
* Usefult in cases when you need to register multiple mcp sessions
46+
* and need to keep track of them.
47+
* @param sessionId The generated session ID
48+
*/
49+
onsessioninitialized?: (sessionId: string) => void;
50+
4251
/**
4352
* If true, the server will return JSON responses instead of starting an SSE stream.
4453
* This can be useful for simple request/response scenarios without streaming.
@@ -98,6 +107,7 @@ export class StreamableHTTPServerTransport implements Transport {
98107
private _enableJsonResponse: boolean = false;
99108
private _standaloneSseStreamId: string = '_GET_stream';
100109
private _eventStore?: EventStore;
110+
private _onsessioninitialized?: (sessionId: string) => void;
101111

102112
sessionId?: string | undefined;
103113
onclose?: () => void;
@@ -108,6 +118,7 @@ export class StreamableHTTPServerTransport implements Transport {
108118
this.sessionIdGenerator = options.sessionIdGenerator;
109119
this._enableJsonResponse = options.enableJsonResponse ?? false;
110120
this._eventStore = options.eventStore;
121+
this._onsessioninitialized = options.onsessioninitialized;
111122
}
112123

113124
/**
@@ -328,9 +339,7 @@ export class StreamableHTTPServerTransport implements Transport {
328339

329340
// Check if this is an initialization request
330341
// https://spec.modelcontextprotocol.io/specification/2025-03-26/basic/lifecycle/
331-
const isInitializationRequest = messages.some(
332-
msg => 'method' in msg && msg.method === 'initialize'
333-
);
342+
const isInitializationRequest = messages.some(isInitializeRequest);
334343
if (isInitializationRequest) {
335344
// If it's a server with session management and the session ID is already set we should reject the request
336345
// to avoid re-initialization.
@@ -359,6 +368,12 @@ export class StreamableHTTPServerTransport implements Transport {
359368
this.sessionId = this.sessionIdGenerator();
360369
this._initialized = true;
361370

371+
// If we have a session ID and an onsessioninitialized handler, call it immediately
372+
// This is needed in cases where the server needs to keep track of multiple sessions
373+
if (this.sessionId && this._onsessioninitialized) {
374+
this._onsessioninitialized(this.sessionId);
375+
}
376+
362377
}
363378
// If an Mcp-Session-Id is returned by the server during initialization,
364379
// clients using the Streamable HTTP transport MUST include it

0 commit comments

Comments
 (0)