-
Notifications
You must be signed in to change notification settings - Fork 2.6k
fix(mcp): align MCP client with mcp-go server protocol #4406
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -475,139 +475,144 @@ export class McpHub { | |
| let transport: StdioClientTransport | SSEClientTransport | StreamableHTTPClientTransport | ||
|
|
||
| // Inject environment variables to the config | ||
| const configInjected = (await injectEnv(config)) as typeof config | ||
|
|
||
| if (configInjected.type === "stdio") { | ||
| transport = new StdioClientTransport({ | ||
| command: configInjected.command, | ||
| args: configInjected.args, | ||
| cwd: configInjected.cwd, | ||
| env: { | ||
| ...getDefaultEnvironment(), | ||
| ...(configInjected.env || {}), | ||
| }, | ||
| stderr: "pipe", | ||
| }) | ||
| const configInjected = (await injectEnv(config)) as z.infer<typeof ServerConfigSchema> | ||
|
|
||
| switch (configInjected.type) { | ||
| case "stdio": { | ||
| transport = new StdioClientTransport({ | ||
| command: configInjected.command, | ||
| args: configInjected.args, | ||
| cwd: configInjected.cwd, | ||
| env: { | ||
| ...getDefaultEnvironment(), | ||
| ...(configInjected.env || {}), | ||
| }, | ||
| stderr: "pipe", | ||
| }) | ||
|
|
||
| // Set up stdio specific error handling | ||
| transport.onerror = async (error) => { | ||
| console.error(`Transport error for "${name}":`, error) | ||
| const connection = this.findConnection(name, source) | ||
| if (connection) { | ||
| connection.server.status = "disconnected" | ||
| this.appendErrorMessage(connection, error instanceof Error ? error.message : `${error}`) | ||
| // Set up stdio specific error handling | ||
| transport.onerror = async (error) => { | ||
| console.error(`Transport error for "${name}":`, error) | ||
| const connection = this.findConnection(name, source) | ||
| if (connection) { | ||
| connection.server.status = "disconnected" | ||
| this.appendErrorMessage(connection, error instanceof Error ? error.message : `${error}`) | ||
| } | ||
| await this.notifyWebviewOfServerChanges() | ||
| } | ||
| await this.notifyWebviewOfServerChanges() | ||
| } | ||
|
|
||
| transport.onclose = async () => { | ||
| const connection = this.findConnection(name, source) | ||
| if (connection) { | ||
| connection.server.status = "disconnected" | ||
| transport.onclose = async () => { | ||
| const connection = this.findConnection(name, source) | ||
| if (connection) { | ||
| connection.server.status = "disconnected" | ||
| } | ||
| await this.notifyWebviewOfServerChanges() | ||
| } | ||
| await this.notifyWebviewOfServerChanges() | ||
| } | ||
|
|
||
| // transport.stderr is only available after the process has been started. However we can't start it separately from the .connect() call because it also starts the transport. And we can't place this after the connect call since we need to capture the stderr stream before the connection is established, in order to capture errors during the connection process. | ||
| // As a workaround, we start the transport ourselves, and then monkey-patch the start method to no-op so that .connect() doesn't try to start it again. | ||
| await transport.start() | ||
| const stderrStream = transport.stderr | ||
| if (stderrStream) { | ||
| stderrStream.on("data", async (data: Buffer) => { | ||
| const output = data.toString() | ||
| // Check if output contains INFO level log | ||
| const isInfoLog = /INFO/i.test(output) | ||
|
|
||
| if (isInfoLog) { | ||
| // Log normal informational messages | ||
| console.log(`Server "${name}" info:`, output) | ||
| } else { | ||
| // Treat as error log | ||
| console.error(`Server "${name}" stderr:`, output) | ||
| const connection = this.findConnection(name, source) | ||
| if (connection) { | ||
| this.appendErrorMessage(connection, output) | ||
| if (connection.server.status === "disconnected") { | ||
| await this.notifyWebviewOfServerChanges() | ||
| // transport.stderr is only available after the process has been started. However we can't start it separately from the .connect() call because it also starts the transport. And we can't place this after the connect call since we need to capture the stderr stream before the connection is established, in order to capture errors during the connection process. | ||
| // As a workaround, we start the transport ourselves, and then monkey-patch the start method to no-op so that .connect() doesn't try to start it again. | ||
| await transport.start() | ||
| const stderrStream = transport.stderr | ||
| if (stderrStream) { | ||
| stderrStream.on("data", async (data: Buffer) => { | ||
| const output = data.toString() | ||
| // Check if output contains INFO level log | ||
| const isInfoLog = /INFO/i.test(output) | ||
|
|
||
| if (isInfoLog) { | ||
| // Log normal informational messages | ||
| console.log(`Server "${name}" info:`, output) | ||
| } else { | ||
| // Treat as error log | ||
| console.error(`Server "${name}" stderr:`, output) | ||
| const connection = this.findConnection(name, source) | ||
| if (connection) { | ||
| this.appendErrorMessage(connection, output) | ||
| if (connection.server.status === "disconnected") { | ||
| await this.notifyWebviewOfServerChanges() | ||
| } | ||
| } | ||
| } | ||
| } | ||
| }) | ||
| } else { | ||
| console.error(`No stderr stream for ${name}`) | ||
| } | ||
| } else if (configInjected.type === "streamable-http") { | ||
| // Streamable HTTP connection | ||
| transport = new StreamableHTTPClientTransport(new URL(configInjected.url), { | ||
| requestInit: { | ||
| headers: configInjected.headers, | ||
| }, | ||
| }) | ||
|
|
||
| // Set up Streamable HTTP specific error handling | ||
| transport.onerror = async (error) => { | ||
| console.error(`Transport error for "${name}" (streamable-http):`, error) | ||
| const connection = this.findConnection(name, source) | ||
| if (connection) { | ||
| connection.server.status = "disconnected" | ||
| this.appendErrorMessage(connection, error instanceof Error ? error.message : `${error}`) | ||
| } | ||
| await this.notifyWebviewOfServerChanges() | ||
| } | ||
|
|
||
| transport.onclose = async () => { | ||
| const connection = this.findConnection(name, source) | ||
| if (connection) { | ||
| connection.server.status = "disconnected" | ||
| }) | ||
| } else { | ||
| console.error(`No stderr stream for ${name}`) | ||
| } | ||
| await this.notifyWebviewOfServerChanges() | ||
| break | ||
| } | ||
| } else if (configInjected.type === "sse") { | ||
| // SSE connection | ||
| const sseOptions = { | ||
| requestInit: { | ||
| headers: configInjected.headers, | ||
| }, | ||
| } | ||
| // Configure ReconnectingEventSource options | ||
| const reconnectingEventSourceOptions = { | ||
| max_retry_time: 5000, // Maximum retry time in milliseconds | ||
| withCredentials: configInjected.headers?.["Authorization"] ? true : false, // Enable credentials if Authorization header exists | ||
| fetch: (url: string | URL, init: RequestInit) => { | ||
| const headers = new Headers({ ...(init?.headers || {}), ...(configInjected.headers || {}) }) | ||
| return fetch(url, { | ||
| ...init, | ||
| headers, | ||
| case "streamable-http": | ||
| case "sse": { | ||
| // For both sse and streamable-http, we try streamable-http first and fallback to sse. | ||
| try { | ||
| transport = new StreamableHTTPClientTransport(new URL(configInjected.url), { | ||
| requestInit: { | ||
| headers: configInjected.headers, | ||
| }, | ||
| }) | ||
| }, | ||
| } | ||
| global.EventSource = ReconnectingEventSource | ||
| transport = new SSEClientTransport(new URL(configInjected.url), { | ||
| ...sseOptions, | ||
| eventSourceInit: reconnectingEventSourceOptions, | ||
| }) | ||
| // We don't await client.connect here because it will be called later. | ||
| // This is just to see if the transport can be created. | ||
| console.log(`Attempting to connect to "${name}" using Streamable HTTP transport.`) | ||
|
||
| } catch (streamableError) { | ||
| if (configInjected.type === "sse") { | ||
| console.warn( | ||
| `Streamable HTTP connection failed for "${name}", falling back to SSE transport. Error: ${streamableError}`, | ||
| ) | ||
| const sseOptions = { | ||
| requestInit: { | ||
| headers: configInjected.headers, | ||
| }, | ||
| } | ||
| const reconnectingEventSourceOptions = { | ||
| max_retry_time: 5000, | ||
| withCredentials: configInjected.headers?.["Authorization"] ? true : false, | ||
| fetch: (url: string | URL, init: RequestInit) => { | ||
| const headers = new Headers({ | ||
| ...(init?.headers || {}), | ||
| ...(configInjected.headers || {}), | ||
| }) | ||
| return fetch(url, { | ||
| ...init, | ||
| headers, | ||
| }) | ||
| }, | ||
| } | ||
| global.EventSource = ReconnectingEventSource | ||
| transport = new SSEClientTransport(new URL(configInjected.url), { | ||
| ...sseOptions, | ||
| eventSourceInit: reconnectingEventSourceOptions, | ||
| }) | ||
| console.log(`Falling back to "${name}" using SSE transport.`) | ||
| } else { | ||
| // If it was explicitly streamable-http and failed, re-throw the error | ||
| throw streamableError | ||
| } | ||
| } | ||
|
|
||
| // Set up SSE specific error handling | ||
| transport.onerror = async (error) => { | ||
| console.error(`Transport error for "${name}":`, error) | ||
| const connection = this.findConnection(name, source) | ||
| if (connection) { | ||
| connection.server.status = "disconnected" | ||
| this.appendErrorMessage(connection, error instanceof Error ? error.message : `${error}`) | ||
| // Set up common error and close handling for both SSE and Streamable HTTP | ||
| transport.onerror = async (error) => { | ||
| console.error(`Transport error for "${name}":`, error) | ||
| const connection = this.findConnection(name, source) | ||
| if (connection) { | ||
| connection.server.status = "disconnected" | ||
| this.appendErrorMessage(connection, error instanceof Error ? error.message : `${error}`) | ||
| } | ||
| await this.notifyWebviewOfServerChanges() | ||
| } | ||
| await this.notifyWebviewOfServerChanges() | ||
| } | ||
|
|
||
| transport.onclose = async () => { | ||
| const connection = this.findConnection(name, source) | ||
| if (connection) { | ||
| connection.server.status = "disconnected" | ||
| transport.onclose = async () => { | ||
| const connection = this.findConnection(name, source) | ||
| if (connection) { | ||
| connection.server.status = "disconnected" | ||
| } | ||
| await this.notifyWebviewOfServerChanges() | ||
| } | ||
| await this.notifyWebviewOfServerChanges() | ||
| break | ||
| } | ||
| default: { | ||
| // This should be unreachable if the config is validated correctly. | ||
| // The `never` type helps enforce this at compile time. | ||
| const exhaustiveCheck: never = configInjected | ||
| throw new Error(`Unsupported MCP server type: ${exhaustiveCheck}`) | ||
| } | ||
| } else { | ||
| // Should not happen if validateServerConfig is correct | ||
| throw new Error(`Unsupported MCP server type: ${(configInjected as any).type}`) | ||
| } | ||
|
|
||
| // Only override transport.start for stdio transports that have already been started | ||
|
|
@@ -619,7 +624,7 @@ export class McpHub { | |
| server: { | ||
| name, | ||
| config: JSON.stringify(configInjected), | ||
| status: "connecting", | ||
| status: "connected", // Set to connected here as connect() is awaited above | ||
| disabled: configInjected.disabled, | ||
| source, | ||
| projectPath: source === "project" ? vscode.workspace.workspaceFolders?.[0]?.uri.fsPath : undefined, | ||
|
|
@@ -630,9 +635,6 @@ export class McpHub { | |
| } | ||
| this.connections.push(connection) | ||
|
|
||
| // Connect (this will automatically start the transport) | ||
| await client.connect(transport) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The removal of The current code creates transport objects and immediately marks connections as "connected" without actually establishing the protocol handshake. This will likely cause failures when attempting to use MCP server capabilities. Am I missing something? |
||
| connection.server.status = "connected" | ||
| connection.server.error = "" | ||
| connection.server.instructions = client.getInstructions() | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fallback mechanism only handles transport creation errors, not connection failures. If
StreamableHTTPClientTransportis created successfully but fails during the actual connection attempt, there's no fallback to SSE.Is this intentional or should we consider implementing connection-level fallback:
This provides more robust error handling for actual connection failures.