Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/common/config/userConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,20 @@ const ServerConfigSchema = z4.object({
"When true, runs the server in dry mode: dumps configuration and enabled tools, then exits without starting the server."
)
.register(configRegistry, { overrideBehavior: "not-allowed" }),
externallyManagedSessions: z4
.boolean()
.default(false)
.describe(
"When true, the HTTP transport allows requests with a session ID supplied externally through the 'mcp-session-id' header. When an external ID is supplied, the initialization request is optional."
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] Do we have way to instruct end users how this external ID's can be supplied (header)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It already mentions that this is defined in the header, though I'm not sure if we should be more descriptive beyond that. We could be more descriptive in the docs page, though I doubt we'll be able to cover all the configuration methods.

)
.register(configRegistry, { overrideBehavior: "not-allowed" }),
httpResponseType: z4
.enum(["sse", "json"])
.default("sse")
.describe(
"The HTTP response type for tool responses: 'sse' for Server-Sent Events, 'json' for standard JSON responses."
)
.register(configRegistry, { overrideBehavior: "not-allowed" }),
});

export const UserConfigSchema = z4.object({
Expand Down
2 changes: 2 additions & 0 deletions src/common/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ export const LogId = {
streamableHttpTransportKeepAliveFailure: mongoLogId(1_006_007),
streamableHttpTransportKeepAlive: mongoLogId(1_006_008),
streamableHttpTransportHttpHostWarning: mongoLogId(1_006_009),
streamableHttpTransportSessionNotFound: mongoLogId(1_006_010),
streamableHttpTransportDisallowedExternalSessionError: mongoLogId(1_006_011),

exportCleanupError: mongoLogId(1_007_001),
exportCreationError: mongoLogId(1_007_002),
Expand Down
294 changes: 185 additions & 109 deletions src/transports/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import { LogId } from "../common/logger.js";
import { SessionStore } from "../common/sessionStore.js";
import { TransportRunnerBase, type TransportRunnerConfig, type RequestContext } from "./base.js";
import { getRandomUUID } from "../helpers/getRandomUUID.js";
import type { WebStandardStreamableHTTPServerTransportOptions } from "@modelcontextprotocol/sdk/server/webStandardStreamableHttp.js";

const JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED = -32000;
const JSON_RPC_ERROR_CODE_SESSION_ID_REQUIRED = -32001;
const JSON_RPC_ERROR_CODE_SESSION_ID_INVALID = -32002;
const JSON_RPC_ERROR_CODE_SESSION_NOT_FOUND = -32003;
const JSON_RPC_ERROR_CODE_INVALID_REQUEST = -32004;
const JSON_RPC_ERROR_CODE_DISALLOWED_EXTERNAL_SESSION = -32005;

export class StreamableHttpRunner extends TransportRunnerBase {
private httpServer: http.Server | undefined;
Expand Down Expand Up @@ -49,147 +51,221 @@ export class StreamableHttpRunner extends TransportRunnerBase {
for (const [key, value] of Object.entries(this.userConfig.httpHeaders)) {
const header = req.headers[key.toLowerCase()];
if (!header || header !== value) {
res.status(403).send({ error: `Invalid value for header "${key}"` });
res.status(403).json({ error: `Invalid value for header "${key}"` });
return;
}
}

next();
});

const reportSessionError = (res: express.Response, errorCode: number): void => {
let message: string;
let statusCode = 400;

switch (errorCode) {
case JSON_RPC_ERROR_CODE_SESSION_ID_REQUIRED:
message = "session id is required";
break;
case JSON_RPC_ERROR_CODE_SESSION_ID_INVALID:
message = "session id is invalid";
break;
case JSON_RPC_ERROR_CODE_INVALID_REQUEST:
message = "invalid request";
break;
case JSON_RPC_ERROR_CODE_SESSION_NOT_FOUND:
message = "session not found";
statusCode = 404;
break;
case JSON_RPC_ERROR_CODE_DISALLOWED_EXTERNAL_SESSION:
message = "cannot provide sessionId when externally managed sessions are disabled";
break;
default:
message = "unknown error";
statusCode = 500;
}
res.status(statusCode).json({
jsonrpc: "2.0",
error: {
code: errorCode,
message,
},
});
};

const handleSessionRequest = async (req: express.Request, res: express.Response): Promise<void> => {
const sessionId = req.headers["mcp-session-id"];
if (!sessionId) {
res.status(400).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_SESSION_ID_REQUIRED,
message: `session id is required`,
},
});
return;
return reportSessionError(res, JSON_RPC_ERROR_CODE_SESSION_ID_REQUIRED);
}

if (typeof sessionId !== "string") {
res.status(400).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_SESSION_ID_INVALID,
message: "session id is invalid",
},
});
return;
return reportSessionError(res, JSON_RPC_ERROR_CODE_SESSION_ID_INVALID);
}

const transport = this.sessionStore.getSession(sessionId);
if (!transport) {
res.status(404).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_SESSION_NOT_FOUND,
message: "session not found",
},
if (this.userConfig.externallyManagedSessions) {
this.logger.debug({
id: LogId.streamableHttpTransportSessionNotFound,
context: "streamableHttpTransport",
message: `Session with ID ${sessionId} not found, initializing new session`,
});

return await initializeServer(req, res, { sessionId, isImplicitInitialization: true });
}

this.logger.debug({
id: LogId.streamableHttpTransportSessionNotFound,
context: "streamableHttpTransport",
message: `Session with ID ${sessionId} not found`,
});
return;

return reportSessionError(res, JSON_RPC_ERROR_CODE_SESSION_NOT_FOUND);
}

await transport.handleRequest(req, res, req.body);
};

app.post(
"/mcp",
this.withErrorHandling(async (req: express.Request, res: express.Response) => {
const sessionId = req.headers["mcp-session-id"];
if (sessionId) {
await handleSessionRequest(req, res);
return;
}
/**
* Initializes a new server and session. This can be done either explicitly via an initialize request
* or implicitly when externally managed sessions are enabled and a request is received for a session
* that does not exist.
*/
const initializeServer = async (
req: express.Request,
res: express.Response,
{
sessionId,
isImplicitInitialization,
}:
| { sessionId?: string; isImplicitInitialization?: false }
| { sessionId: string; isImplicitInitialization: true }
): Promise<void> => {
if (isImplicitInitialization && !sessionId) {
throw new Error("Implicit initialization requires externally-passed sessionId");
}

const request: RequestContext = {
headers: req.headers as Record<string, string | string[] | undefined>,
query: req.query as Record<string, string | string[] | undefined>,
};
const server = await this.setupServer(request);

if (!isInitializeRequest(req.body)) {
res.status(400).json({
const options: WebStandardStreamableHTTPServerTransportOptions = {
enableJsonResponse: this.userConfig.httpResponseType === "json",
};

const sessionInitialized = (sessionId: string): void => {
server.session.logger.setAttribute("sessionId", sessionId);

this.sessionStore.setSession(sessionId, transport, server.session.logger);
};

// When we're implicitly initializing a session, the client is not going through the initialization
// flow. This means that it won't do proper session lifecycle management, so we should not add hooks for
// onsessioninitialized and onsessionclosed.
if (!isImplicitInitialization) {
options.sessionIdGenerator = (): string => sessionId ?? getRandomUUID();
options.onsessioninitialized = sessionInitialized.bind(this);
options.onsessionclosed = async (sessionId): Promise<void> => {
try {
await this.sessionStore.closeSession(sessionId, false);
} catch (error) {
this.logger.error({
id: LogId.streamableHttpTransportSessionCloseFailure,
context: "streamableHttpTransport",
message: `Error closing session ${sessionId}: ${error instanceof Error ? error.message : String(error)}`,
});
}
};
}

const transport = new StreamableHTTPServerTransport(options);

if (isImplicitInitialization) {
sessionInitialized(sessionId);
}

let failedPings = 0;
// eslint-disable-next-line @typescript-eslint/no-misused-promises
const keepAliveLoop: NodeJS.Timeout = setInterval(async () => {
try {
server.session.logger.debug({
id: LogId.streamableHttpTransportKeepAlive,
context: "streamableHttpTransport",
message: "Sending ping",
});

await transport.send({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_INVALID_REQUEST,
message: `invalid request`,
},
method: "ping",
});
return;
}
failedPings = 0;
} catch (err) {
try {
failedPings++;
server.session.logger.warning({
id: LogId.streamableHttpTransportKeepAliveFailure,
context: "streamableHttpTransport",
message: `Error sending ping (attempt #${failedPings}): ${err instanceof Error ? err.message : String(err)}`,
});

const request: RequestContext = {
headers: req.headers as Record<string, string | string[] | undefined>,
query: req.query as Record<string, string | string[] | undefined>,
};
const server = await this.setupServer(request);
let keepAliveLoop: NodeJS.Timeout;

const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: (): string => getRandomUUID(),
onsessioninitialized: (sessionId): void => {
server.session.logger.setAttribute("sessionId", sessionId);

this.sessionStore.setSession(sessionId, transport, server.session.logger);

let failedPings = 0;
// eslint-disable-next-line @typescript-eslint/no-misused-promises
keepAliveLoop = setInterval(async () => {
try {
server.session.logger.debug({
id: LogId.streamableHttpTransportKeepAlive,
context: "streamableHttpTransport",
message: "Sending ping",
});

await transport.send({
jsonrpc: "2.0",
method: "ping",
});
failedPings = 0;
} catch (err) {
try {
failedPings++;
server.session.logger.warning({
id: LogId.streamableHttpTransportKeepAliveFailure,
context: "streamableHttpTransport",
message: `Error sending ping (attempt #${failedPings}): ${err instanceof Error ? err.message : String(err)}`,
});

if (failedPings > 3) {
clearInterval(keepAliveLoop);
await transport.close();
}
} catch {
// Ignore the error of the transport close as there's nothing else
// we can do at this point.
}
}
}, 30_000);
},
onsessionclosed: async (sessionId): Promise<void> => {
try {
await this.sessionStore.closeSession(sessionId, false);
} catch (error) {
this.logger.error({
id: LogId.streamableHttpTransportSessionCloseFailure,
context: "streamableHttpTransport",
message: `Error closing session ${sessionId}: ${error instanceof Error ? error.message : String(error)}`,
});
if (failedPings > 3) {
clearInterval(keepAliveLoop);
await transport.close();
}
},
} catch {
// Ignore the error of the transport close as there's nothing else
// we can do at this point.
}
}
}, 30_000);

transport.onclose = (): void => {
clearInterval(keepAliveLoop);

server.close().catch((error) => {
this.logger.error({
id: LogId.streamableHttpTransportCloseFailure,
context: "streamableHttpTransport",
message: `Error closing server: ${error instanceof Error ? error.message : String(error)}`,
});
});
};

transport.onclose = (): void => {
clearInterval(keepAliveLoop);
await server.connect(transport);

server.close().catch((error) => {
this.logger.error({
id: LogId.streamableHttpTransportCloseFailure,
await transport.handleRequest(req, res, req.body);
};

app.post(
"/mcp",
this.withErrorHandling(async (req: express.Request, res: express.Response) => {
const sessionId = req.headers["mcp-session-id"];
if (sessionId && typeof sessionId !== "string") {
return reportSessionError(res, JSON_RPC_ERROR_CODE_SESSION_ID_INVALID);
}

if (isInitializeRequest(req.body)) {
if (sessionId && !this.userConfig.externallyManagedSessions) {
this.logger.debug({
id: LogId.streamableHttpTransportDisallowedExternalSessionError,
context: "streamableHttpTransport",
message: `Error closing server: ${error instanceof Error ? error.message : String(error)}`,
message: `Client provided session ID ${sessionId}, but externallyManagedSessions is disabled`,
});
});
};

await server.connect(transport);
return reportSessionError(res, JSON_RPC_ERROR_CODE_DISALLOWED_EXTERNAL_SESSION);
}

return await initializeServer(req, res, { sessionId });
}

if (sessionId) {
return await handleSessionRequest(req, res);
}

await transport.handleRequest(req, res, req.body);
return reportSessionError(res, JSON_RPC_ERROR_CODE_INVALID_REQUEST);
})
);

Expand Down
Loading
Loading