Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
"request": "launch",
"name": "Launch Program",
"skipFiles": ["<node_internals>/**"],
"program": "${workspaceFolder}/dist/index.js",
"args": ["--transport", "http", "--loggers", "stderr", "mcp"],
"runtimeExecutable": "npm",
"runtimeArgs": ["start"],
"preLaunchTask": "tsc: build - tsconfig.build.json",
"outFiles": ["${workspaceFolder}/dist/**/*.js"]
}
Expand Down
30 changes: 16 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,20 +302,22 @@ The MongoDB MCP Server can be configured using multiple methods, with the follow

### Configuration Options

| CLI Option | Environment Variable | Default | Description |
| ------------------ | --------------------------- | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `apiClientId` | `MDB_MCP_API_CLIENT_ID` | <not set> | Atlas API client ID for authentication. Required for running Atlas tools. |
| `apiClientSecret` | `MDB_MCP_API_CLIENT_SECRET` | <not set> | Atlas API client secret for authentication. Required for running Atlas tools. |
| `connectionString` | `MDB_MCP_CONNECTION_STRING` | <not set> | MongoDB connection string for direct database connections. Optional, if not set, you'll need to call the `connect` tool before interacting with MongoDB data. |
| `loggers` | `MDB_MCP_LOGGERS` | disk,mcp | Comma separated values, possible values are `mcp`, `disk` and `stderr`. See [Logger Options](#logger-options) for details. |
| `logPath` | `MDB_MCP_LOG_PATH` | see note\* | Folder to store logs. |
| `disabledTools` | `MDB_MCP_DISABLED_TOOLS` | <not set> | An array of tool names, operation types, and/or categories of tools that will be disabled. |
| `readOnly` | `MDB_MCP_READ_ONLY` | false | When set to true, only allows read, connect, and metadata operation types, disabling create/update/delete operations. |
| `indexCheck` | `MDB_MCP_INDEX_CHECK` | false | When set to true, enforces that query operations must use an index, rejecting queries that perform a collection scan. |
| `telemetry` | `MDB_MCP_TELEMETRY` | enabled | When set to disabled, disables telemetry collection. |
| `transport` | `MDB_MCP_TRANSPORT` | stdio | Either 'stdio' or 'http'. |
| `httpPort` | `MDB_MCP_HTTP_PORT` | 3000 | Port number. |
| `httpHost` | `MDB_MCP_HTTP_HOST` | 127.0.0.1 | Host to bind the http server. |
| CLI Option | Environment Variable | Default | Description |
| ----------------------- | --------------------------------- | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `apiClientId` | `MDB_MCP_API_CLIENT_ID` | <not set> | Atlas API client ID for authentication. Required for running Atlas tools. |
| `apiClientSecret` | `MDB_MCP_API_CLIENT_SECRET` | <not set> | Atlas API client secret for authentication. Required for running Atlas tools. |
| `connectionString` | `MDB_MCP_CONNECTION_STRING` | <not set> | MongoDB connection string for direct database connections. Optional, if not set, you'll need to call the `connect` tool before interacting with MongoDB data. |
| `loggers` | `MDB_MCP_LOGGERS` | disk,mcp | Comma separated values, possible values are `mcp`, `disk` and `stderr`. See [Logger Options](#logger-options) for details. |
| `logPath` | `MDB_MCP_LOG_PATH` | see note\* | Folder to store logs. |
| `disabledTools` | `MDB_MCP_DISABLED_TOOLS` | <not set> | An array of tool names, operation types, and/or categories of tools that will be disabled. |
| `readOnly` | `MDB_MCP_READ_ONLY` | false | When set to true, only allows read, connect, and metadata operation types, disabling create/update/delete operations. |
| `indexCheck` | `MDB_MCP_INDEX_CHECK` | false | When set to true, enforces that query operations must use an index, rejecting queries that perform a collection scan. |
| `telemetry` | `MDB_MCP_TELEMETRY` | enabled | When set to disabled, disables telemetry collection. |
| `transport` | `MDB_MCP_TRANSPORT` | stdio | Either 'stdio' or 'http'. |
| `httpPort` | `MDB_MCP_HTTP_PORT` | 3000 | Port number. |
| `httpHost` | `MDB_MCP_HTTP_HOST` | 127.0.0.1 | Host to bind the http server. |
| `idleTimeoutMs` | `MDB_MCP_IDLE_TIMEOUT_MS` | 600000 | Idle timeout for a client to disconnect (only applies to http transport). |
| `notificationTimeoutMs` | `MDB_MCP_NOTIFICATION_TIMEOUT_MS` | 540000 | Notification timeout for a client to be aware of diconnect (only applies to http transport). |

#### Logger Options

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
},
"type": "module",
"scripts": {
"start": "node dist/index.js --transport http",
"start": "node dist/index.js --transport http --loggers stderr mcp",
"prepare": "npm run build",
"build:clean": "rm -rf dist",
"build:compile": "tsc --project tsconfig.build.json",
Expand Down
4 changes: 4 additions & 0 deletions src/common/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ export interface UserConfig {
httpPort: number;
httpHost: string;
loggers: Array<"stderr" | "disk" | "mcp">;
idleTimeoutMs: number;
notificationTimeoutMs: number;
}

const defaults: UserConfig = {
Expand All @@ -47,6 +49,8 @@ const defaults: UserConfig = {
httpPort: 3000,
httpHost: "127.0.0.1",
loggers: ["disk", "mcp"],
idleTimeoutMs: 600000, // 10 minutes
notificationTimeoutMs: 540000, // 9 minutes
};

export const config = {
Expand Down
5 changes: 3 additions & 2 deletions src/common/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ export const LogId = {

streamableHttpTransportStarted: mongoLogId(1_006_001),
streamableHttpTransportSessionCloseFailure: mongoLogId(1_006_002),
streamableHttpTransportRequestFailure: mongoLogId(1_006_003),
streamableHttpTransportCloseFailure: mongoLogId(1_006_004),
streamableHttpTransportSessionCloseNotification: mongoLogId(1_006_003),
streamableHttpTransportRequestFailure: mongoLogId(1_006_004),
streamableHttpTransportCloseFailure: mongoLogId(1_006_005),
} as const;

export abstract class LoggerBase {
Expand Down
128 changes: 112 additions & 16 deletions src/common/sessionStore.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,132 @@
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import logger, { LogId } from "./logger.js";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import logger, { LogId, McpLogger } from "./logger.js";

class TimeoutManager {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we add unit tests for this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[q] I know is tiny but should we move this into it's own separate file?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't because this is not reused and it is private to the file, no strong opinion, can be moved

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we add JS docs to the class and some of the methods below?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

almost none of our code has JS docs, we should add it across the board or not add any I think

private timeoutId?: NodeJS.Timeout;
public onerror?: (error: unknown) => void;

constructor(
private readonly callback: () => Promise<void> | void,
private readonly timeoutMS: number
) {
if (timeoutMS <= 0) {
throw new Error("timeoutMS must be greater than 0");
}
this.reset();
}

clear() {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
this.timeoutId = undefined;
}
}

private async runCallback() {
if (this.callback) {
try {
await this.callback();
} catch (error: unknown) {
this.onerror?.(error);
}
}
}

reset() {
this.clear();
this.timeoutId = setTimeout(() => {
void this.runCallback().finally(() => {
this.timeoutId = undefined;
});
}, this.timeoutMS);
}
}

export class SessionStore {
private sessions: { [sessionId: string]: StreamableHTTPServerTransport } = {};
private sessions: {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[q] didn't check our telemetry PR but should we consider adding some http metrics? we could open a ticket for this, just checking about what you think. example: number of active sessions, timed out sessions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does that add value? number of active sessions changes over time not sure how we would track it, we can add a time out session but bear in mind all of our telemetry is session based not server based, as it it lives inside of the MCP server not outside (http server).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair point - we can always check w/ analytics first hence the idea of a ticket

[sessionId: string]: {
mcpServer: McpServer;
transport: StreamableHTTPServerTransport;
abortTimeout: TimeoutManager;
notificationTimeout: TimeoutManager;
};
} = {};

constructor(
private readonly idleTimeoutMS: number,
private readonly notificationTimeoutMS: number
) {
if (idleTimeoutMS <= 0) {
throw new Error("idleTimeoutMS must be greater than 0");
}
if (notificationTimeoutMS <= 0) {
throw new Error("notificationTimeoutMS must be greater than 0");
}
if (idleTimeoutMS <= notificationTimeoutMS) {
throw new Error("idleTimeoutMS must be greater than notificationTimeoutMS");
}
}

getSession(sessionId: string): StreamableHTTPServerTransport | undefined {
return this.sessions[sessionId];
this.resetTimeout(sessionId);
return this.sessions[sessionId]?.transport;
}

private resetTimeout(sessionId: string): void {
const session = this.sessions[sessionId];
if (!session) {
return;
}

session.abortTimeout.reset();

session.notificationTimeout.reset();
}

setSession(sessionId: string, transport: StreamableHTTPServerTransport): void {
private sendNotification(sessionId: string): void {
const session = this.sessions[sessionId];
if (!session) {
return;
}
const logger = new McpLogger(session.mcpServer);
logger.info(
LogId.streamableHttpTransportSessionCloseNotification,
"sessionStore",
"Session is about to be closed due to inactivity"
);
}

setSession(sessionId: string, transport: StreamableHTTPServerTransport, mcpServer: McpServer): void {
if (this.sessions[sessionId]) {
throw new Error(`Session ${sessionId} already exists`);
}
this.sessions[sessionId] = transport;
const abortTimeout = new TimeoutManager(async () => {
const logger = new McpLogger(mcpServer);
logger.info(
LogId.streamableHttpTransportSessionCloseNotification,
"sessionStore",
"Session closed due to inactivity"
);

await this.closeSession(sessionId);
}, this.idleTimeoutMS);
const notificationTimeout = new TimeoutManager(
() => this.sendNotification(sessionId),
this.notificationTimeoutMS
);
this.sessions[sessionId] = { mcpServer, transport, abortTimeout, notificationTimeout };
}

async closeSession(sessionId: string, closeTransport: boolean = true): Promise<void> {
if (!this.sessions[sessionId]) {
throw new Error(`Session ${sessionId} not found`);
}
this.sessions[sessionId].abortTimeout.clear();
this.sessions[sessionId].notificationTimeout.clear();
if (closeTransport) {
const transport = this.sessions[sessionId];
if (!transport) {
throw new Error(`Session ${sessionId} not found`);
}
try {
await transport.close();
await this.sessions[sessionId].transport.close();
} catch (error) {
logger.error(
LogId.streamableHttpTransportSessionCloseFailure,
Expand All @@ -38,11 +139,6 @@ export class SessionStore {
}

async closeAllSessions(): Promise<void> {
await Promise.all(
Object.values(this.sessions)
.filter((transport) => transport !== undefined)
.map((transport) => transport.close())
);
this.sessions = {};
await Promise.all(Object.keys(this.sessions).map((sessionId) => this.closeSession(sessionId)));
}
}
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { StdioRunner } from "./transports/stdio.js";
import { StreamableHttpRunner } from "./transports/streamableHttp.js";

async function main() {
const transportRunner = config.transport === "stdio" ? new StdioRunner() : new StreamableHttpRunner();
const transportRunner = config.transport === "stdio" ? new StdioRunner(config) : new StreamableHttpRunner(config);

const shutdown = () => {
logger.info(LogId.serverCloseRequested, "server", `Server close requested`);
Expand Down
14 changes: 7 additions & 7 deletions src/transports/base.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import { config } from "../common/config.js";
import { UserConfig } from "../common/config.js";
import { packageInfo } from "../common/packageInfo.js";
import { Server } from "../server.js";
import { Session } from "../common/session.js";
import { Telemetry } from "../telemetry/telemetry.js";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";

export abstract class TransportRunnerBase {
protected setupServer(): Server {
protected setupServer(userConfig: UserConfig): Server {
const session = new Session({
apiBaseUrl: config.apiBaseUrl,
apiClientId: config.apiClientId,
apiClientSecret: config.apiClientSecret,
apiBaseUrl: userConfig.apiBaseUrl,
apiClientId: userConfig.apiClientId,
apiClientSecret: userConfig.apiClientSecret,
});

const telemetry = Telemetry.create(session, config);
const telemetry = Telemetry.create(session, userConfig);

const mcpServer = new McpServer({
name: packageInfo.mcpServerName,
Expand All @@ -24,7 +24,7 @@ export abstract class TransportRunnerBase {
mcpServer,
session,
telemetry,
userConfig: config,
userConfig,
});
}

Expand Down
7 changes: 6 additions & 1 deletion src/transports/stdio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { TransportRunnerBase } from "./base.js";
import { JSONRPCMessage, JSONRPCMessageSchema } from "@modelcontextprotocol/sdk/types.js";
import { EJSON } from "bson";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { UserConfig } from "../common/config.js";

// This is almost a copy of ReadBuffer from @modelcontextprotocol/sdk
// but it uses EJSON.parse instead of JSON.parse to handle BSON types
Expand Down Expand Up @@ -52,9 +53,13 @@ export function createStdioTransport(): StdioServerTransport {
export class StdioRunner extends TransportRunnerBase {
private server: Server | undefined;

constructor(private userConfig: UserConfig) {
super();
}

async start() {
try {
this.server = this.setupServer();
this.server = this.setupServer(this.userConfig);

const transport = createStdioTransport();

Expand Down
17 changes: 11 additions & 6 deletions src/transports/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import http from "http";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js";
import { TransportRunnerBase } from "./base.js";
import { config } from "../common/config.js";
import { UserConfig } from "../common/config.js";
import logger, { LogId } from "../common/logger.js";
import { randomUUID } from "crypto";
import { SessionStore } from "../common/sessionStore.js";
Expand Down Expand Up @@ -38,7 +38,12 @@ function promiseHandler(

export class StreamableHttpRunner extends TransportRunnerBase {
private httpServer: http.Server | undefined;
private sessionStore: SessionStore = new SessionStore();
private sessionStore: SessionStore;

constructor(private userConfig: UserConfig) {
super();
this.sessionStore = new SessionStore(this.userConfig.idleTimeoutMs, this.userConfig.notificationTimeoutMs);
}

async start() {
const app = express();
Expand Down Expand Up @@ -101,11 +106,11 @@ export class StreamableHttpRunner extends TransportRunnerBase {
return;
}

const server = this.setupServer();
const server = this.setupServer(this.userConfig);
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID().toString(),
onsessioninitialized: (sessionId) => {
this.sessionStore.setSession(sessionId, transport);
this.sessionStore.setSession(sessionId, transport, server.mcpServer);
},
onsessionclosed: async (sessionId) => {
try {
Expand Down Expand Up @@ -140,7 +145,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
app.delete("/mcp", promiseHandler(handleRequest));

this.httpServer = await new Promise<http.Server>((resolve, reject) => {
const result = app.listen(config.httpPort, config.httpHost, (err?: Error) => {
const result = app.listen(this.userConfig.httpPort, this.userConfig.httpHost, (err?: Error) => {
if (err) {
reject(err);
return;
Expand All @@ -152,7 +157,7 @@ export class StreamableHttpRunner extends TransportRunnerBase {
logger.info(
LogId.streamableHttpTransportStarted,
"streamableHttpTransport",
`Server started on http://${config.httpHost}:${config.httpPort}`
`Server started on http://${this.userConfig.httpHost}:${this.userConfig.httpPort}`
);
}

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/transports/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ describe("StreamableHttpRunner", () => {
oldLoggers = config.loggers;
config.telemetry = "disabled";
config.loggers = ["stderr"];
runner = new StreamableHttpRunner();
runner = new StreamableHttpRunner(config);
await runner.start();
});

Expand Down
Loading