diff --git a/server/utils/MCP/hypervisor/index.js b/server/utils/MCP/hypervisor/index.js index 861274f20b7..902a290b2c7 100644 --- a/server/utils/MCP/hypervisor/index.js +++ b/server/utils/MCP/hypervisor/index.js @@ -1,6 +1,9 @@ const { safeJsonParse } = require("../../http"); const path = require("path"); const fs = require("fs"); +const { exec } = require("child_process"); +const { promisify } = require("util"); +const execAsync = promisify(exec); const { Client } = require("@modelcontextprotocol/sdk/client/index.js"); const { StdioClientTransport, @@ -33,6 +36,7 @@ const { */ class MCPHypervisor { static _instance; + static _instanceLock = false; /** * The path to the JSON file containing the MCP server definitions. * @type {string} @@ -52,10 +56,23 @@ class MCPHypervisor { constructor() { if (MCPHypervisor._instance) return MCPHypervisor._instance; + + if (MCPHypervisor._instanceLock) { + const waitForInstance = () => { + if (MCPHypervisor._instance) return MCPHypervisor._instance; + return new Promise((resolve) => + setTimeout(() => resolve(waitForInstance()), 10) + ); + }; + return waitForInstance(); + } + + MCPHypervisor._instanceLock = true; MCPHypervisor._instance = this; this.className = "MCPHypervisor"; this.log("Initializing MCP Hypervisor - subsequent calls will boot faster"); this.#setupConfigFile(); + MCPHypervisor._instanceLock = false; return this; } @@ -194,7 +211,7 @@ class MCPHypervisor { this.log(`Pruning MCP server: ${name}`); const mcp = this.mcps[name]; const childProcess = mcp.transport._process; - if (childProcess) childProcess.kill(1); + if (childProcess) childProcess.kill("SIGTERM"); mcp.transport.close(); delete this.mcps[name]; @@ -218,7 +235,7 @@ class MCPHypervisor { const childProcess = mcp.transport._process; if (childProcess) this.log(`Killing MCP ${name} (PID: ${childProcess.pid})`, { - killed: childProcess.kill(1), + killed: childProcess.kill("SIGTERM"), }); mcp.transport.close(); @@ -228,18 +245,61 @@ class MCPHypervisor { this.mcpLoadingResults = {}; } + /** + * Load shell environment for desktop applications. + * GUI applications on macOS and Linux don't inherit login shell environment. + * @returns {Promise<{[key: string]: string}>} - Environment variables from shell + */ + async #loadShellEnvironment() { + try { + if (process.platform === "win32") { + return process.env; // Windows doesn't have this issue + } + + // Try to load shell environment + const shell = process.env.SHELL || "/bin/bash"; + const command = `${shell} -l -c 'env'`; + const { stdout } = await execAsync(command, { timeout: 5000 }); + + const env = {}; + stdout.split("\n").forEach((line) => { + const match = line.match(/^([^=]+)=(.*)$/); + if (match) { + env[match[1]] = match[2]; + } + }); + + return env; + } catch (error) { + console.warn( + "Failed to load shell environment, using process.env:", + error.message + ); + return process.env; + } + } + /** * Build the MCP server environment variables - ensures proper PATH and NODE_PATH * inheritance across all platforms and deployment scenarios. * @param {Object} server - The server definition * @returns {{env: { [key: string]: string } | {}}} - The environment variables */ - #buildMCPServerENV(server) { - // Start with essential environment variables, inheriting from current process - // This ensures GUI applications on macOS/Linux get proper PATH inheritance + async #buildMCPServerENV(server) { + // Load shell environment for desktop applications + const shellEnv = await this.#loadShellEnvironment(); + + // Start with essential environment variables, inheriting from shell environment let baseEnv = { - PATH: process.env.PATH || "/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin", - NODE_PATH: process.env.NODE_PATH || "/usr/local/lib/node_modules", + PATH: + shellEnv.PATH || + process.env.PATH || + "/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin", + NODE_PATH: + shellEnv.NODE_PATH || + process.env.NODE_PATH || + "/usr/local/lib/node_modules", + ...shellEnv, // Include all shell environment variables }; // Docker-specific environment setup @@ -273,21 +333,58 @@ class MCPHypervisor { * @returns {MCPServerTypes | null} - The server type */ #parseServerType(server) { - if (server.hasOwnProperty("command")) return "stdio"; - if (server.hasOwnProperty("url")) return "http"; + // Explicit type takes precedence (for SSE/Streamable/HTTP) + if ( + server.type === "sse" || + server.type === "streamable" || + server.type === "http" + ) { + return "http"; // All are HTTP-based transports + } + + // Default to stdio if command is present + if (Object.prototype.hasOwnProperty.call(server, "command")) return "stdio"; + + // Fallback to http if url is present + if (Object.prototype.hasOwnProperty.call(server, "url")) return "http"; return "sse"; } /** * Validate the server definition by type * - Will throw an error if the server definition is invalid + * @param {string} name - The name of the MCP server * @param {Object} server - The server definition * @param {MCPServerTypes} type - The server type * @returns {void} */ - #validateServerDefinitionByType(server, type) { + #validateServerDefinitionByType(name, server, type) { + // For SSE/Streamable/HTTP servers + if ( + server.type === "sse" || + server.type === "streamable" || + server.type === "http" + ) { + if (!server.url) { + throw new Error( + `MCP server "${name}": missing required "url" for ${server.type} transport` + ); + } + + // Validate URL is absolute + try { + new URL(server.url); + } catch (error) { + throw new Error(`MCP server "${name}": invalid URL "${server.url}"`); + } + return; + } + if (type === "stdio") { - if (server.hasOwnProperty("args") && !Array.isArray(server.args)) + if ( + Object.prototype.hasOwnProperty.call(server, "args") && + !Array.isArray(server.args) + ) throw new Error("MCP server args must be an array"); } @@ -304,16 +401,17 @@ class MCPHypervisor { * Setup the server transport by type and server definition * @param {Object} server - The server definition * @param {MCPServerTypes} type - The server type - * @returns {StdioClientTransport | StreamableHTTPClientTransport | SSEClientTransport} - The server transport + * @returns {Promise} - The server transport */ - #setupServerTransport(server, type) { + async #setupServerTransport(server, type) { // if not stdio then it is http or sse if (type !== "stdio") return this.createHttpTransport(server); + const envConfig = await this.#buildMCPServerENV(server); return new StdioClientTransport({ command: server.command, args: server?.args ?? [], - ...this.#buildMCPServerENV(server), + ...envConfig, }); } @@ -328,6 +426,7 @@ class MCPHypervisor { // If the server block has a type property then use that to determine the transport type switch (server.type) { case "streamable": + case "http": // Add explicit support for http type return new StreamableHTTPClientTransport(url, { requestInit: { headers: server.headers, @@ -354,10 +453,10 @@ class MCPHypervisor { const serverType = this.#parseServerType(server); if (!serverType) throw new Error("MCP server command or url is required"); - this.#validateServerDefinitionByType(server, serverType); + this.#validateServerDefinitionByType(name, server, serverType); this.log(`Attempting to start MCP server: ${name}`); const mcp = new Client({ name: name, version: "1.0.0" }); - const transport = this.#setupServerTransport(server, serverType); + const transport = await this.#setupServerTransport(server, serverType); // Add connection event listeners transport.onclose = () => this.log(`${name} - Transport closed`); @@ -369,10 +468,24 @@ class MCPHypervisor { // Connect and await the connection with a timeout this.mcps[name] = mcp; const connectionPromise = mcp.connect(transport); + + let timeoutId; const timeoutPromise = new Promise((_, reject) => { - setTimeout(() => reject(new Error("Connection timeout")), 30_000); // 30 second timeout + timeoutId = setTimeout( + () => reject(new Error("Connection timeout")), + 30_000 + ); }); - await Promise.race([connectionPromise, timeoutPromise]); + + try { + await Promise.race([connectionPromise, timeoutPromise]); + // Clear timeout if connection succeeds to prevent memory leak + if (timeoutId) clearTimeout(timeoutId); + } catch (error) { + // Clear timeout on error as well + if (timeoutId) clearTimeout(timeoutId); + throw error; + } return true; } @@ -390,7 +503,10 @@ class MCPHypervisor { const serverDefinitions = this.mcpServerConfigs; for (const { name, server } of serverDefinitions) { if ( - server.anythingllm?.hasOwnProperty("autoStart") && + Object.prototype.hasOwnProperty.call( + server.anythingllm || {}, + "autoStart" + ) && server.anythingllm.autoStart === false ) { this.log( diff --git a/server/utils/MCP/index.js b/server/utils/MCP/index.js index 03e72a86078..d192c02e471 100644 --- a/server/utils/MCP/index.js +++ b/server/utils/MCP/index.js @@ -2,11 +2,26 @@ const MCPHypervisor = require("./hypervisor"); class MCPCompatibilityLayer extends MCPHypervisor { static _instance; + static _instanceLock = false; constructor() { - super(); if (MCPCompatibilityLayer._instance) return MCPCompatibilityLayer._instance; + + if (MCPCompatibilityLayer._instanceLock) { + const waitForInstance = () => { + if (MCPCompatibilityLayer._instance) + return MCPCompatibilityLayer._instance; + return new Promise((resolve) => + setTimeout(() => resolve(waitForInstance()), 10) + ); + }; + return waitForInstance(); + } + + MCPCompatibilityLayer._instanceLock = true; + super(); MCPCompatibilityLayer._instance = this; + MCPCompatibilityLayer._instanceLock = false; } /** @@ -26,11 +41,20 @@ class MCPCompatibilityLayer extends MCPHypervisor { * @returns {Promise<{name: string, description: string, plugin: Function}[]|null>} Array of plugin configurations or null if not found */ async convertServerToolsToPlugins(name, _aibitat = null) { + // eslint-disable-line no-unused-vars const mcp = this.mcps[name]; if (!mcp) return null; - const tools = (await mcp.listTools()).tools; - if (!tools.length) return null; + let tools; + try { + const response = await mcp.listTools(); + tools = response.tools; + } catch (error) { + console.error(`Failed to list tools for MCP server ${name}:`, error); + return null; + } + + if (!tools || !tools.length) return null; const plugins = []; for (const tool of tools) { @@ -53,6 +77,16 @@ class MCPCompatibilityLayer extends MCPHypervisor { }, handler: async function (args = {}) { try { + // Get fresh MCP reference to avoid stale connections + const mcpLayer = new MCPCompatibilityLayer(); + const currentMcp = mcpLayer.mcps[name]; + + if (!currentMcp) { + throw new Error( + `MCP server ${name} is not currently running` + ); + } + aibitat.handlerProps.log( `Executing MCP server: ${name}:${tool.name} with args:`, args @@ -60,7 +94,8 @@ class MCPCompatibilityLayer extends MCPHypervisor { aibitat.introspect( `Executing MCP server: ${name} with ${JSON.stringify(args, null, 2)}` ); - const result = await mcp.callTool({ + + const result = await currentMcp.callTool({ name: tool.name, arguments: args, }); @@ -71,9 +106,23 @@ class MCPCompatibilityLayer extends MCPHypervisor { aibitat.introspect( `MCP server: ${name}:${tool.name} completed successfully` ); - return typeof result === "object" - ? JSON.stringify(result) - : String(result); + // Safe JSON stringify with circular reference handling + if (typeof result === "object") { + try { + return JSON.stringify(result); + } catch (e) { + // Fallback for circular references + const seen = new WeakSet(); + return JSON.stringify(result, (key, value) => { + if (typeof value === "object" && value !== null) { + if (seen.has(value)) return "[Circular]"; + seen.add(value); + } + return value; + }); + } + } + return String(result); } catch (error) { aibitat.handlerProps.log( `MCP server: ${name}:${tool.name} failed with error:`, @@ -133,8 +182,20 @@ class MCPCompatibilityLayer extends MCPHypervisor { continue; } - const online = !!(await mcp.ping()); - const tools = online ? (await mcp.listTools()).tools : []; + let online = false; + let tools = []; + + try { + online = !!(await mcp.ping()); + if (online) { + const response = await mcp.listTools(); + tools = response.tools || []; + } + } catch (error) { + console.error(`Error checking MCP server ${name} status:`, error); + online = false; + tools = []; + } servers.push({ name, config: config?.server || null, @@ -162,7 +223,17 @@ class MCPCompatibilityLayer extends MCPHypervisor { error: `MCP server ${name} not found in config file.`, }; const mcp = this.mcps[name]; - const online = !!mcp ? !!(await mcp.ping()) : false; // If the server is not in the mcps object, it is not running + let online = false; + + // Safely check if server is online + if (mcp) { + try { + online = !!(await mcp.ping()); + } catch (error) { + console.error(`Error pinging MCP server ${name}:`, error); + online = false; + } + } if (online) { const killed = this.pruneMCPServer(name); @@ -190,8 +261,26 @@ class MCPCompatibilityLayer extends MCPHypervisor { }; const mcp = this.mcps[name]; - const online = !!mcp ? !!(await mcp.ping()) : false; // If the server is not in the mcps object, it is not running - if (online) this.pruneMCPServer(name); + let online = false; + + // Safely check if server is online + if (mcp) { + try { + online = !!(await mcp.ping()); + } catch (error) { + // If ping fails, still try to clean up the server + console.error( + `Error pinging MCP server ${name} during deletion:`, + error + ); + online = true; // Assume it needs cleanup if ping fails + } + } + + // Always attempt cleanup regardless of ping result + if (mcp || online) { + this.pruneMCPServer(name); + } this.removeMCPServerFromConfig(name); delete this.mcps[name];