Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/common/config/userConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,13 @@ const ServerConfigSchema = z4.object({
"When true, runs the server in dry mode: dumps configuration and enabled tools, then exits without starting the server."
)
.register(configRegistry, { overrideBehavior: "not-allowed" }),
externallyManagedSessions: z4
.boolean()
.default(false)
.describe(
"When true, the HTTP transport will allow json responses without SSE. Use this in cases where sessions are managed externally, such as by Amazon Bedrock AgentCore."
)
.register(configRegistry, { overrideBehavior: "override" }),
});

export const UserConfigSchema = z4.object({
Expand Down
2 changes: 2 additions & 0 deletions src/common/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ export const LogId = {
streamableHttpTransportKeepAliveFailure: mongoLogId(1_006_007),
streamableHttpTransportKeepAlive: mongoLogId(1_006_008),
streamableHttpTransportHttpHostWarning: mongoLogId(1_006_009),
streamableHttpTransportSessionNotFound: mongoLogId(1_006_010),
streamableHttpTransportIgnoredSessionIdWarning: mongoLogId(1_006_011),

exportCleanupError: mongoLogId(1_007_001),
exportCreationError: mongoLogId(1_007_002),
Expand Down
238 changes: 146 additions & 92 deletions src/transports/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,118 +49,113 @@ export class StreamableHttpRunner extends TransportRunnerBase {
for (const [key, value] of Object.entries(this.userConfig.httpHeaders)) {
const header = req.headers[key.toLowerCase()];
if (!header || header !== value) {
res.status(403).send({ error: `Invalid value for header "${key}"` });
res.status(403).json({ error: `Invalid value for header "${key}"` });
return;
}
}

next();
});

const reportSessionError = (res: express.Response, errorCode: number): void => {
let message: string;
let statusCode = 400;

switch (errorCode) {
case JSON_RPC_ERROR_CODE_SESSION_ID_REQUIRED:
message = "session id is required";
break;
case JSON_RPC_ERROR_CODE_SESSION_ID_INVALID:
message = "session id is invalid";
break;
case JSON_RPC_ERROR_CODE_INVALID_REQUEST:
message = "invalid request";
break;
case JSON_RPC_ERROR_CODE_SESSION_NOT_FOUND:
message = "session not found";
statusCode = 404;
break;
default:
message = "unknown error";
statusCode = 500;
}
res.status(statusCode).json({
jsonrpc: "2.0",
error: {
code: errorCode,
message,
},
});
};

const handleSessionRequest = async (req: express.Request, res: express.Response): Promise<void> => {
const sessionId = req.headers["mcp-session-id"];
if (!sessionId) {
res.status(400).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_SESSION_ID_REQUIRED,
message: `session id is required`,
},
});
return;
return reportSessionError(res, JSON_RPC_ERROR_CODE_SESSION_ID_REQUIRED);
}

if (typeof sessionId !== "string") {
res.status(400).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_SESSION_ID_INVALID,
message: "session id is invalid",
},
});
return;
return reportSessionError(res, JSON_RPC_ERROR_CODE_SESSION_ID_INVALID);
}

const transport = this.sessionStore.getSession(sessionId);
if (!transport) {
res.status(404).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_SESSION_NOT_FOUND,
message: "session not found",
},
if (this.userConfig.externallyManagedSessions) {
this.logger.debug({
id: LogId.streamableHttpTransportSessionNotFound,
context: "streamableHttpTransport",
message: `Session with ID ${sessionId} not found, initializing new session`,
});

return await initializeServer(req, res, sessionId);
}

this.logger.debug({
id: LogId.streamableHttpTransportSessionNotFound,
context: "streamableHttpTransport",
message: `Session with ID ${sessionId} not found`,
});
return;

return reportSessionError(res, JSON_RPC_ERROR_CODE_SESSION_NOT_FOUND);
}

await transport.handleRequest(req, res, req.body);
};

app.post(
"/mcp",
this.withErrorHandling(async (req: express.Request, res: express.Response) => {
const sessionId = req.headers["mcp-session-id"];
if (sessionId) {
await handleSessionRequest(req, res);
return;
}
const initializeServer = async (
req: express.Request,
res: express.Response,
sessionId?: string
): Promise<void> => {
const request: RequestContext = {
headers: req.headers as Record<string, string | string[] | undefined>,
query: req.query as Record<string, string | string[] | undefined>,
};
const server = await this.setupServer(request);

if (!isInitializeRequest(req.body)) {
res.status(400).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_INVALID_REQUEST,
message: `invalid request`,
},
let transport: StreamableHTTPServerTransport;
if (this.userConfig.externallyManagedSessions && sessionId) {
transport = new StreamableHTTPServerTransport({
enableJsonResponse: true,
});
server.session.logger.setAttribute("sessionId", sessionId);

this.sessionStore.setSession(sessionId, transport, server.session.logger);
} else {
if (sessionId) {
this.logger.warning({
id: LogId.streamableHttpTransportIgnoredSessionIdWarning,
context: "streamableHttpTransport",
message: `Ignoring provided session ID ${sessionId} as externallyManagedSessions is disabled`,
});
return;
}

const request: RequestContext = {
headers: req.headers as Record<string, string | string[] | undefined>,
query: req.query as Record<string, string | string[] | undefined>,
};
const server = await this.setupServer(request);
let keepAliveLoop: NodeJS.Timeout;

const transport = new StreamableHTTPServerTransport({
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: (): string => getRandomUUID(),
onsessioninitialized: (sessionId): void => {
server.session.logger.setAttribute("sessionId", sessionId);

this.sessionStore.setSession(sessionId, transport, server.session.logger);

let failedPings = 0;
// eslint-disable-next-line @typescript-eslint/no-misused-promises
keepAliveLoop = setInterval(async () => {
try {
server.session.logger.debug({
id: LogId.streamableHttpTransportKeepAlive,
context: "streamableHttpTransport",
message: "Sending ping",
});

await transport.send({
jsonrpc: "2.0",
method: "ping",
});
failedPings = 0;
} catch (err) {
try {
failedPings++;
server.session.logger.warning({
id: LogId.streamableHttpTransportKeepAliveFailure,
context: "streamableHttpTransport",
message: `Error sending ping (attempt #${failedPings}): ${err instanceof Error ? err.message : String(err)}`,
});

if (failedPings > 3) {
clearInterval(keepAliveLoop);
await transport.close();
}
} catch {
// Ignore the error of the transport close as there's nothing else
// we can do at this point.
}
}
}, 30_000);
},
onsessionclosed: async (sessionId): Promise<void> => {
try {
Expand All @@ -174,22 +169,81 @@ export class StreamableHttpRunner extends TransportRunnerBase {
}
},
});
}

transport.onclose = (): void => {
clearInterval(keepAliveLoop);
let failedPings = 0;
// eslint-disable-next-line @typescript-eslint/no-misused-promises
const keepAliveLoop: NodeJS.Timeout = setInterval(async () => {
if (!transport) {
return;
}

server.close().catch((error) => {
this.logger.error({
id: LogId.streamableHttpTransportCloseFailure,
try {
server.session.logger.debug({
id: LogId.streamableHttpTransportKeepAlive,
context: "streamableHttpTransport",
message: "Sending ping",
});

await transport.send({
jsonrpc: "2.0",
method: "ping",
});
failedPings = 0;
} catch (err) {
try {
failedPings++;
server.session.logger.warning({
id: LogId.streamableHttpTransportKeepAliveFailure,
context: "streamableHttpTransport",
message: `Error closing server: ${error instanceof Error ? error.message : String(error)}`,
message: `Error sending ping (attempt #${failedPings}): ${err instanceof Error ? err.message : String(err)}`,
});

if (failedPings > 3) {
clearInterval(keepAliveLoop);
await transport.close();
}
} catch {
// Ignore the error of the transport close as there's nothing else
// we can do at this point.
}
}
}, 30_000);

transport.onclose = (): void => {
clearInterval(keepAliveLoop);

server.close().catch((error) => {
this.logger.error({
id: LogId.streamableHttpTransportCloseFailure,
context: "streamableHttpTransport",
message: `Error closing server: ${error instanceof Error ? error.message : String(error)}`,
});
};
});
};

await server.connect(transport);
await server.connect(transport);

await transport.handleRequest(req, res, req.body);
};

app.post(
"/mcp",
this.withErrorHandling(async (req: express.Request, res: express.Response) => {
const sessionId = req.headers["mcp-session-id"];
if (sessionId && typeof sessionId !== "string") {
return reportSessionError(res, JSON_RPC_ERROR_CODE_SESSION_ID_INVALID);
}

if (isInitializeRequest(req.body)) {
return await initializeServer(req, res, sessionId);
}

if (sessionId) {
return await handleSessionRequest(req, res);
}

await transport.handleRequest(req, res, req.body);
return reportSessionError(res, JSON_RPC_ERROR_CODE_INVALID_REQUEST);
})
);

Expand Down
Loading