Skip to content

chore: refactor connections to use the new ConnectionManager to isolate long running processes like OIDC connections MCP-81 #423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Aug 6, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3977aa7
chore: refactor to include the new ConnectionManager
kmruiz Aug 4, 2025
7a1e217
chore: add tests to connection manager
kmruiz Aug 5, 2025
ece4d6c
chore: fix typo
kmruiz Aug 5, 2025
06a7ea8
chore: js prefers strict equality
kmruiz Aug 5, 2025
806382f
chore: fix linter errors
kmruiz Aug 5, 2025
4c357f9
chore: connection requested is not necessary at the end
kmruiz Aug 5, 2025
3f52815
chore: add test to connection-requested
kmruiz Aug 5, 2025
28d8b69
chore: add tests for the actual connection status
kmruiz Aug 6, 2025
f7ec158
chore: Fix typing issues and few PR suggestions
kmruiz Aug 6, 2025
f49e2b0
chore: style changes, use a getter for isConnectedToMongoDB
kmruiz Aug 6, 2025
e03b7a6
chore: move AtlasConnectionInfo to the connection manager
kmruiz Aug 6, 2025
43b54a0
chore: fixed linting issues
kmruiz Aug 6, 2025
89ba76d
chore: emit the close event
kmruiz Aug 6, 2025
61d7b1e
chore: add resource subscriptions and improve the resource prompt
kmruiz Aug 6, 2025
a33abb3
chore: Do not use anonymous tuples
kmruiz Aug 6, 2025
cc92766
chore: change the break to a return to make it easier to follow
kmruiz Aug 6, 2025
0a77b97
chore: small refactor
kmruiz Aug 6, 2025
6b5f49b
chore: minor clean up of redundant params
kmruiz Aug 6, 2025
6cbb1ab
Merge branch 'main' into chore/mcp-81
kmruiz Aug 6, 2025
1122e09
chore: ensure that we return connecting when not connected yet
kmruiz Aug 6, 2025
da872b4
chore: allow having the atlas cluster info independently of the conne…
kmruiz Aug 6, 2025
fe16acd
chore: simplify query connection logic
kmruiz Aug 6, 2025
17494ac
chore: This is clearer on how the behavior should look like
kmruiz Aug 6, 2025
693c755
chore: finish the refactor and clean up
kmruiz Aug 6, 2025
a6efc47
chore: propagate connected atlas cluster when disconnecting
kmruiz Aug 6, 2025
71dbf56
chore: fix linter issues and some status mismatch
kmruiz Aug 6, 2025
76e8ab5
chore: clean up atlas resource handling
kmruiz Aug 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 184 additions & 0 deletions src/common/connectionManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import { ConnectOptions } from "./config.js";
import { NodeDriverServiceProvider } from "@mongosh/service-provider-node-driver";
import EventEmitter from "events";
import { setAppNameParamIfMissing } from "../helpers/connectionOptions.js";
import { packageInfo } from "./packageInfo.js";
import ConnectionString from "mongodb-connection-string-url";
import { MongoClientOptions } from "mongodb";
import { ErrorCodes, MongoDBError } from "./errors.js";

export interface AtlasClusterConnectionInfo {
username: string;
projectId: string;
clusterName: string;
expiryDate: Date;
}

export interface ConnectionSettings extends ConnectOptions {
connectionString: string;
atlas?: AtlasClusterConnectionInfo;
}

type ConnectionTag = "connected" | "connecting" | "disconnected" | "errored";
type OIDCConnectionAuthType = "oidc-auth-flow" | "oidc-device-flow";
export type ConnectionStringAuthType = "scram" | "ldap" | "kerberos" | OIDCConnectionAuthType | "x.509";

export interface ConnectionState {
tag: ConnectionTag;
connectionStringAuthType?: ConnectionStringAuthType;
}

export interface ConnectionStateConnected extends ConnectionState {
tag: "connected";
serviceProvider: NodeDriverServiceProvider;
connectedAtlasCluster?: AtlasClusterConnectionInfo;
}

export interface ConnectionStateConnecting extends ConnectionState {
tag: "connecting";
serviceProvider: NodeDriverServiceProvider;
oidcConnectionType: OIDCConnectionAuthType;
oidcLoginUrl?: string;
oidcUserCode?: string;
}

export interface ConnectionStateDisconnected extends ConnectionState {
tag: "disconnected";
}

export interface ConnectionStateErrored extends ConnectionState {
tag: "errored";
errorReason: string;
}

export type AnyConnectionState =
| ConnectionState
| ConnectionStateConnected
| ConnectionStateConnecting
| ConnectionStateDisconnected
| ConnectionStateErrored;

export interface ConnectionManagerEvents {
"connection-requested": [AnyConnectionState];
"connection-succeeded": [ConnectionStateConnected];
"connection-timed-out": [ConnectionStateErrored];
"connection-closed": [ConnectionStateDisconnected];
"connection-errored": [ConnectionStateErrored];
}

export class ConnectionManager extends EventEmitter<ConnectionManagerEvents> {
private state: AnyConnectionState;

constructor() {
super();
this.state = { tag: "disconnected" };
}

async connect(settings: ConnectionSettings): Promise<AnyConnectionState> {
this.emit("connection-requested", this.state);

if (this.state.tag == "connected" || this.state.tag == "connecting") {
await this.disconnect();
}

let serviceProvider: NodeDriverServiceProvider;
try {
settings = { ...settings };
settings.connectionString = setAppNameParamIfMissing({
connectionString: settings.connectionString,
defaultAppName: `${packageInfo.mcpServerName} ${packageInfo.version}`,
});

serviceProvider = await NodeDriverServiceProvider.connect(settings.connectionString, {
productDocsLink: "https://github.com/mongodb-js/mongodb-mcp-server/",
productName: "MongoDB MCP",
readConcern: {
level: settings.readConcern,
},
readPreference: settings.readPreference,
writeConcern: {
w: settings.writeConcern,
},
timeoutMS: settings.timeoutMS,
proxy: { useEnvironmentVariableProxies: true },
applyProxyToOIDC: true,
});
} catch (error: unknown) {
const errorReason = error instanceof Error ? error.message : `${error as string}`;
this.changeState("connection-errored", { tag: "errored", errorReason });
throw new MongoDBError(ErrorCodes.MisconfiguredConnectionString, errorReason);
}

try {
await serviceProvider?.runCommand?.("admin", { hello: 1 });

return this.changeState("connection-succeeded", {
tag: "connected",
connectedAtlasCluster: settings.atlas,
serviceProvider,
connectionStringAuthType: ConnectionManager.inferConnectionTypeFromSettings(settings),
});
} catch (error: unknown) {
const errorReason = error instanceof Error ? error.message : `${error as string}`;
this.changeState("connection-errored", { tag: "errored", errorReason });
throw new MongoDBError(ErrorCodes.NotConnectedToMongoDB, errorReason);
}
}

async disconnect(): Promise<ConnectionStateDisconnected | ConnectionStateErrored> {
if (this.state.tag == "disconnected") {
return this.state as ConnectionStateDisconnected;
}

if (this.state.tag == "errored") {
return this.state as ConnectionStateErrored;
}

if (this.state.tag == "connected" || this.state.tag == "connecting") {
const state = this.state as ConnectionStateConnecting | ConnectionStateConnected;
try {
await state.serviceProvider?.close(true);
} finally {
this.changeState("connection-closed", { tag: "disconnected" });
}
}

return { tag: "disconnected" };
}

get currentConnectionState(): AnyConnectionState {
return this.state;
}

changeState<State extends AnyConnectionState>(event: keyof ConnectionManagerEvents, newState: State): State {
this.state = newState;
this.emit(event, newState);
return newState;
}

static inferConnectionTypeFromSettings(settings: ConnectionSettings): ConnectionStringAuthType {
const connString = new ConnectionString(settings.connectionString);
const searchParams = connString.typedSearchParams<MongoClientOptions>();

switch (searchParams.get("authMechanism")) {
case "MONGODB-OIDC": {
return "oidc-auth-flow"; // TODO: depending on if we don't have a --browser later it can be oidc-device-flow
}
case "MONGODB-X509":
return "x.509";
case "GSSAPI":
return "kerberos";
case "PLAIN":
if (searchParams.get("authSource") == "$external") {
return "ldap";
}
break;
// default should catch also null, but eslint complains
// about it.
case null:
default:
return "scram";
}
return "scram";
}
}
83 changes: 39 additions & 44 deletions src/common/session.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import { NodeDriverServiceProvider } from "@mongosh/service-provider-node-driver";
import { ApiClient, ApiClientCredentials } from "./atlas/apiClient.js";
import { Implementation } from "@modelcontextprotocol/sdk/types.js";
import logger, { LogId } from "./logger.js";
import EventEmitter from "events";
import { ConnectOptions } from "./config.js";
import { setAppNameParamIfMissing } from "../helpers/connectionOptions.js";
import { packageInfo } from "./packageInfo.js";
import {
AnyConnectionState,
ConnectionManager,
ConnectionSettings,
ConnectionStateConnected,
} from "./connectionManager.js";
import { NodeDriverServiceProvider } from "@mongosh/service-provider-node-driver";
import { ErrorCodes, MongoDBError } from "./errors.js";

export interface SessionOptions {
apiBaseUrl: string;
apiClientId?: string;
apiClientSecret?: string;
connectionManager?: ConnectionManager;
}

export type SessionEvents = {
Expand All @@ -22,7 +27,7 @@ export type SessionEvents = {

export class Session extends EventEmitter<SessionEvents> {
sessionId?: string;
serviceProvider?: NodeDriverServiceProvider;
connectionManager: ConnectionManager;
apiClient: ApiClient;
agentRunner?: {
name: string;
Expand All @@ -35,7 +40,7 @@ export class Session extends EventEmitter<SessionEvents> {
expiryDate: Date;
};

constructor({ apiBaseUrl, apiClientId, apiClientSecret }: SessionOptions) {
constructor({ apiBaseUrl, apiClientId, apiClientSecret, connectionManager }: SessionOptions) {
super();

const credentials: ApiClientCredentials | undefined =
Expand All @@ -46,10 +51,13 @@ export class Session extends EventEmitter<SessionEvents> {
}
: undefined;

this.apiClient = new ApiClient({
baseUrl: apiBaseUrl,
credentials,
});
this.apiClient = new ApiClient({ baseUrl: apiBaseUrl, credentials });

this.connectionManager = connectionManager ?? new ConnectionManager();
this.connectionManager.on("connection-succeeded", () => this.emit("connect"));
this.connectionManager.on("connection-timed-out", (error) => this.emit("connection-error", error.errorReason));
this.connectionManager.on("connection-closed", () => this.emit("disconnect"));
this.connectionManager.on("connection-errored", (error) => this.emit("connection-error", error.errorReason));
}

setAgentRunner(agentRunner: Implementation | undefined) {
Expand All @@ -62,15 +70,13 @@ export class Session extends EventEmitter<SessionEvents> {
}

async disconnect(): Promise<void> {
if (this.serviceProvider) {
try {
await this.serviceProvider.close(true);
} catch (err: unknown) {
const error = err instanceof Error ? err : new Error(String(err));
logger.error(LogId.mongodbDisconnectFailure, "Error closing service provider:", error.message);
}
this.serviceProvider = undefined;
try {
await this.connectionManager.disconnect();
} catch (err: unknown) {
const error = err instanceof Error ? err : new Error(String(err));
logger.error(LogId.mongodbDisconnectFailure, "Error closing service provider:", error.message);
}

if (this.connectedAtlasCluster?.username && this.connectedAtlasCluster?.projectId) {
void this.apiClient
.deleteDatabaseUser({
Expand All @@ -92,44 +98,33 @@ export class Session extends EventEmitter<SessionEvents> {
});
this.connectedAtlasCluster = undefined;
}
this.emit("disconnect");
}

async close(): Promise<void> {
await this.disconnect();
await this.apiClient.close();
this.emit("close");
}

async connectToMongoDB(connectionString: string, connectOptions: ConnectOptions): Promise<void> {
connectionString = setAppNameParamIfMissing({
connectionString,
defaultAppName: `${packageInfo.mcpServerName} ${packageInfo.version}`,
});

async connectToMongoDB(settings: ConnectionSettings): Promise<AnyConnectionState> {
try {
this.serviceProvider = await NodeDriverServiceProvider.connect(connectionString, {
productDocsLink: "https://github.com/mongodb-js/mongodb-mcp-server/",
productName: "MongoDB MCP",
readConcern: {
level: connectOptions.readConcern,
},
readPreference: connectOptions.readPreference,
writeConcern: {
w: connectOptions.writeConcern,
},
timeoutMS: connectOptions.timeoutMS,
proxy: { useEnvironmentVariableProxies: true },
applyProxyToOIDC: true,
});

await this.serviceProvider?.runCommand?.("admin", { hello: 1 });
return await this.connectionManager.connect({ ...settings });
} catch (error: unknown) {
const message = error instanceof Error ? error.message : `${error as string}`;
const message = error instanceof Error ? error.message : (error as string);
this.emit("connection-error", message);
throw error;
}
}

isConnectedToMongoDB(): boolean {
return this.connectionManager.currentConnectionState.tag === "connected";
}

get serviceProvider(): NodeDriverServiceProvider {
if (this.isConnectedToMongoDB()) {
const state = this.connectionManager.currentConnectionState as ConnectionStateConnected;
return state.serviceProvider;
}

this.emit("connect");
throw new MongoDBError(ErrorCodes.NotConnectedToMongoDB, "Not connected to MongoDB");
}
}
6 changes: 3 additions & 3 deletions src/resources/common/debug.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ type ConnectionStateDebuggingInformation = {

export class DebugResource extends ReactiveResource(
{
name: "debug-mongodb-connectivity",
uri: "debug://mongodb-connectivity",
name: "debug-mongodb",
uri: "debug://mongodb",
config: {
description: "Debugging information for connectivity issues.",
description: "Debugging information for MongoDB connectivity issues.",
},
},
{
Expand Down
10 changes: 8 additions & 2 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@ export class Server {
}

async connect(transport: Transport): Promise<void> {
// Resources are now reactive, so we register them ASAP so they can listen to events like
// connection events.
this.registerResources();
await this.validateConfig();

this.mcpServer.server.registerCapabilities({ logging: {} });

// TODO: Eventually we might want to make tools reactive too instead of relying on custom logic.
this.registerTools();
this.registerResources();

// This is a workaround for an issue we've seen with some models, where they'll see that everything in the `arguments`
// object is optional, and then not pass it at all. However, the MCP server expects the `arguments` object to be if
Expand Down Expand Up @@ -194,7 +197,10 @@ export class Server {

if (this.userConfig.connectionString) {
try {
await this.session.connectToMongoDB(this.userConfig.connectionString, this.userConfig.connectOptions);
await this.session.connectToMongoDB({
connectionString: this.userConfig.connectionString,
...this.userConfig.connectOptions,
});
} catch (error) {
console.error(
"Failed to connect to MongoDB instance using the connection string from the config: ",
Expand Down
6 changes: 3 additions & 3 deletions src/tools/atlas/connect/connectCluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class ConnectClusterTool extends AtlasToolBase {
clusterName: string
): Promise<"connected" | "disconnected" | "connecting" | "connected-to-other-cluster" | "unknown"> {
if (!this.session.connectedAtlasCluster) {
if (this.session.serviceProvider) {
if (this.session.isConnectedToMongoDB()) {
return "connected-to-other-cluster";
}
return "disconnected";
Expand All @@ -40,7 +40,7 @@ export class ConnectClusterTool extends AtlasToolBase {
return "connected-to-other-cluster";
}

if (!this.session.serviceProvider) {
if (!this.session.isConnectedToMongoDB()) {
return "connecting";
}

Expand Down Expand Up @@ -145,7 +145,7 @@ export class ConnectClusterTool extends AtlasToolBase {
try {
lastError = undefined;

await this.session.connectToMongoDB(connectionString, this.config.connectOptions);
await this.session.connectToMongoDB({ connectionString, ...this.config.connectOptions });
break;
} catch (err: unknown) {
const error = err instanceof Error ? err : new Error(String(err));
Expand Down
Loading
Loading