Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"name": "Launch Program",
"skipFiles": ["<node_internals>/**"],
"program": "${workspaceFolder}/dist/index.js",
"args": ["--transport", "http", "--loggers", "stderr", "mcp"],
"preLaunchTask": "tsc: build - tsconfig.build.json",
"outFiles": ["${workspaceFolder}/dist/**/*.js"]
}
Expand Down
28 changes: 14 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -299,20 +299,20 @@ The MongoDB MCP Server can be configured using multiple methods, with the follow

### Configuration Options

| Option | Default | Description |
| ------------------ | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `apiClientId` | <not set> | Atlas API client ID for authentication. Required for running Atlas tools. |
| `apiClientSecret` | <not set> | Atlas API client secret for authentication. Required for running Atlas tools. |
| `connectionString` | <not set> | MongoDB connection string for direct database connections. Optional, if not set, you'll need to call the `connect` tool before interacting with MongoDB data. |
| `loggers` | disk,mcp | Comma separated values, possible values are `mcp`, `disk` and `stderr`. See [Logger Options](#logger-options) for details. |
| `logPath` | see note\* | Folder to store logs. |
| `disabledTools` | <not set> | An array of tool names, operation types, and/or categories of tools that will be disabled. |
| `readOnly` | false | When set to true, only allows read, connect, and metadata operation types, disabling create/update/delete operations. |
| `indexCheck` | false | When set to true, enforces that query operations must use an index, rejecting queries that perform a collection scan. |
| `telemetry` | enabled | When set to disabled, disables telemetry collection. |
| `transport` | stdio | Either 'stdio' or 'http'. |
| `httpPort` | 3000 | Port number. |
| `httpHost` | 127.0.0.1 | Host to bind the http server. |
| CLI Option | Environment Variable | Default | Description |
| ------------------ | --------------------------- | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `apiClientId` | `MDB_MCP_API_CLIENT_ID` | <not set> | Atlas API client ID for authentication. Required for running Atlas tools. |
| `apiClientSecret` | `MDB_MCP_API_CLIENT_SECRET` | <not set> | Atlas API client secret for authentication. Required for running Atlas tools. |
| `connectionString` | `MDB_MCP_CONNECTION_STRING` | <not set> | MongoDB connection string for direct database connections. Optional, if not set, you'll need to call the `connect` tool before interacting with MongoDB data. |
| `loggers` | `MDB_MCP_LOGGERS` | disk,mcp | Comma separated values, possible values are `mcp`, `disk` and `stderr`. See [Logger Options](#logger-options) for details. |
| `logPath` | `MDB_MCP_LOG_PATH` | see note\* | Folder to store logs. |
| `disabledTools` | `MDB_MCP_DISABLED_TOOLS` | <not set> | An array of tool names, operation types, and/or categories of tools that will be disabled. |
| `readOnly` | `MDB_MCP_READ_ONLY` | false | When set to true, only allows read, connect, and metadata operation types, disabling create/update/delete operations. |
| `indexCheck` | `MDB_MCP_INDEX_CHECK` | false | When set to true, enforces that query operations must use an index, rejecting queries that perform a collection scan. |
| `telemetry` | `MDB_MCP_TELEMETRY` | enabled | When set to disabled, disables telemetry collection. |
| `transport` | `MDB_MCP_TRANSPORT` | stdio | Either 'stdio' or 'http'. |
| `httpPort` | `MDB_MCP_HTTP_PORT` | 3000 | Port number. |
| `httpHost` | `MDB_MCP_HTTP_HOST` | 127.0.0.1 | Host to bind the http server. |

#### Logger Options

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
},
"type": "module",
"scripts": {
"start": "node dist/index.js --transport http",
"prepare": "npm run build",
"build:clean": "rm -rf dist",
"build:compile": "tsc --project tsconfig.build.json",
Expand Down
9 changes: 3 additions & 6 deletions src/common/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,9 @@ export const LogId = {
toolUpdateFailure: mongoLogId(1_005_001),

streamableHttpTransportStarted: mongoLogId(1_006_001),
streamableHttpTransportStartFailure: mongoLogId(1_006_002),
streamableHttpTransportSessionInitialized: mongoLogId(1_006_003),
streamableHttpTransportRequestFailure: mongoLogId(1_006_004),
streamableHttpTransportCloseRequested: mongoLogId(1_006_005),
streamableHttpTransportCloseSuccess: mongoLogId(1_006_006),
streamableHttpTransportCloseFailure: mongoLogId(1_006_007),
streamableHttpTransportSessionCloseFailure: mongoLogId(1_006_002),
streamableHttpTransportRequestFailure: mongoLogId(1_006_003),
streamableHttpTransportCloseFailure: mongoLogId(1_006_004),
} as const;

export abstract class LoggerBase {
Expand Down
48 changes: 48 additions & 0 deletions src/common/sessionStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import logger, { LogId } from "./logger.js";

export class SessionStore {
private sessions: { [sessionId: string]: StreamableHTTPServerTransport | undefined } = {};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[q] Is there a reason why they value is nullable?
I don't see any location where the value (StreamableHTTPServerTransport) is set to undefined/removed

Copy link
Collaborator Author

@fmenezes fmenezes Jul 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the case where a key is not found


getSession(sessionId: string): StreamableHTTPServerTransport | undefined {
return this.sessions[sessionId];
}

setSession(sessionId: string, transport: StreamableHTTPServerTransport): void {
if (this.sessions[sessionId]) {
throw new Error(`Session ${sessionId} already exists`);
}
this.sessions[sessionId] = transport;
}

async closeSession(sessionId: string, closeTransport: boolean = true): Promise<void> {
if (!this.sessions[sessionId]) {
throw new Error(`Session ${sessionId} not found`);
}
if (closeTransport) {
const transport = this.sessions[sessionId];
if (!transport) {
throw new Error(`Session ${sessionId} not found`);
}
try {
await transport.close();
} catch (error) {
logger.error(
LogId.streamableHttpTransportSessionCloseFailure,
"streamableHttpTransport",
`Error closing transport ${sessionId}: ${error instanceof Error ? error.message : String(error)}`
);
}
}
delete this.sessions[sessionId];
}

async closeAllSessions(): Promise<void> {
await Promise.all(
Object.values(this.sessions)
.filter((transport) => transport !== undefined)
.map((transport) => transport.close())
);
this.sessions = {};
}
}
9 changes: 5 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ async function main() {
transportRunner
.close()
.then(() => {
logger.info(LogId.serverClosed, "server", `Server closed`);
process.exit(0);
})
.catch((error: unknown) => {
Expand All @@ -22,10 +23,10 @@ async function main() {
});
};

process.once("SIGINT", shutdown);
process.once("SIGABRT", shutdown);
process.once("SIGTERM", shutdown);
process.once("SIGQUIT", shutdown);
process.on("SIGINT", shutdown);
process.on("SIGABRT", shutdown);
process.on("SIGTERM", shutdown);
process.on("SIGQUIT", shutdown);

try {
await transportRunner.start();
Expand Down
157 changes: 101 additions & 56 deletions src/transports/streamableHttp.ts
Original file line number Diff line number Diff line change
@@ -1,90 +1,132 @@
import express from "express";
import http from "http";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js";
import { TransportRunnerBase } from "./base.js";
import { config } from "../common/config.js";
import logger, { LogId } from "../common/logger.js";
import { randomUUID } from "crypto";
import { SessionStore } from "../common/sessionStore.js";

const JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED = -32000;
const JSON_RPC_ERROR_CODE_METHOD_NOT_ALLOWED = -32601;
const JSON_RPC_ERROR_CODE_SESSION_ID_REQUIRED = -32001;
const JSON_RPC_ERROR_CODE_SESSION_NOT_FOUND = -32002;
const JSON_RPC_ERROR_CODE_INVALID_REQUEST = -32003;

function promiseHandler(
fn: (req: express.Request, res: express.Response, next: express.NextFunction) => Promise<void>
) {
return (req: express.Request, res: express.Response, next: express.NextFunction) => {
fn(req, res, next).catch(next);
fn(req, res, next).catch((error) => {
logger.error(
LogId.streamableHttpTransportRequestFailure,
"streamableHttpTransport",
`Error handling request: ${error instanceof Error ? error.message : String(error)}`
);
res.status(400).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED,
message: `failed to handle request`,
data: error instanceof Error ? error.message : String(error),
},
});
});
};
}

export class StreamableHttpRunner extends TransportRunnerBase {
private httpServer: http.Server | undefined;
private sessionStore: SessionStore = new SessionStore();

async start() {
const app = express();
app.enable("trust proxy"); // needed for reverse proxy support
app.use(express.urlencoded({ extended: true }));
app.use(express.json());

const handleRequest = async (req: express.Request, res: express.Response) => {
const sessionId = req.headers["mcp-session-id"] as string;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the better way to write this be:

const sessionId = req.headers["mcp-session-id"];
if (typeof sessionId !== 'string' || !sessionId) {

Or at least use as string | undefined, instead of as string, which is incorrect?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, I had an issue on my IDE before I think

if (!sessionId) {
res.status(400).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_SESSION_ID_REQUIRED,
message: `session id is required`,
},
});
return;
}
const transport = this.sessionStore.getSession(sessionId);
if (!transport) {
res.status(404).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_SESSION_NOT_FOUND,
message: `session not found`,
},
});
return;
}
await transport.handleRequest(req, res, req.body);
};

app.post(
"/mcp",
promiseHandler(async (req: express.Request, res: express.Response) => {
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined,
});
const sessionId = req.headers["mcp-session-id"] as string;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as previous remark

if (sessionId) {
await handleRequest(req, res);
return;
}

const server = this.setupServer();
if (!isInitializeRequest(req.body)) {
res.status(400).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_INVALID_REQUEST,
message: `invalid request`,
},
});
return;
}

await server.connect(transport);
const server = this.setupServer();
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID().toString(),
onsessioninitialized: (sessionId) => {
this.sessionStore.setSession(sessionId, transport);
},
onsessionclosed: async (sessionId) => {
try {
await this.sessionStore.closeSession(sessionId, false);
} catch (error) {
logger.error(
LogId.streamableHttpTransportSessionCloseFailure,
"streamableHttpTransport",
`Error closing session: ${error instanceof Error ? error.message : String(error)}`
);
}
},
});

res.on("close", () => {
Promise.all([transport.close(), server.close()]).catch((error: unknown) => {
transport.onclose = () => {
server.close().catch((error) => {
logger.error(
LogId.streamableHttpTransportCloseFailure,
"streamableHttpTransport",
`Error closing server: ${error instanceof Error ? error.message : String(error)}`
);
});
});
};

try {
await transport.handleRequest(req, res, req.body);
} catch (error) {
logger.error(
LogId.streamableHttpTransportRequestFailure,
"streamableHttpTransport",
`Error handling request: ${error instanceof Error ? error.message : String(error)}`
);
res.status(400).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED,
message: `failed to handle request`,
data: error instanceof Error ? error.message : String(error),
},
});
}
await server.connect(transport);

await transport.handleRequest(req, res, req.body);
})
);

app.get("/mcp", (req: express.Request, res: express.Response) => {
res.status(405).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_METHOD_NOT_ALLOWED,
message: `method not allowed`,
},
});
});

app.delete("/mcp", (req: express.Request, res: express.Response) => {
res.status(405).json({
jsonrpc: "2.0",
error: {
code: JSON_RPC_ERROR_CODE_METHOD_NOT_ALLOWED,
message: `method not allowed`,
},
});
});
app.get("/mcp", promiseHandler(handleRequest));
app.delete("/mcp", promiseHandler(handleRequest));

this.httpServer = await new Promise<http.Server>((resolve, reject) => {
const result = app.listen(config.httpPort, config.httpHost, (err?: Error) => {
Expand All @@ -104,14 +146,17 @@ export class StreamableHttpRunner extends TransportRunnerBase {
}

async close(): Promise<void> {
await new Promise<void>((resolve, reject) => {
this.httpServer?.close((err) => {
if (err) {
reject(err);
return;
}
resolve();
});
});
await Promise.all([
this.sessionStore.closeAllSessions(),
new Promise<void>((resolve, reject) => {
this.httpServer?.close((err) => {
if (err) {
reject(err);
return;
}
resolve();
});
}),
]);
}
}
Loading
Loading