Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 73 additions & 20 deletions src/vs/workbench/contrib/mcp/common/mcpGatewayToolBrokerChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,22 @@ export class McpGatewayToolBrokerChannel extends Disposable implements IServerCh
private readonly _serverIdMap = new Map<string, number>();
private _nextServerIndex = 0;

/**
* Per-server promise that races server startup against the grace period timeout.
* Once set for a server, subsequent list calls await the already-resolved promise
* and return immediately instead of waiting again.
*
* The `resolved` flag tracks whether the promise has settled. If a server's
* cacheState regresses to Unknown/Outdated after the promise resolved (e.g.
* after a cache reset), `_waitForStartup` discards the stale entry and creates
* a fresh race so the server gets another chance to start.
*/
private readonly _startupGrace = new Map<string, { promise: Promise<boolean>; resolved: boolean }>();

constructor(
private readonly _mcpService: IMcpService,
private readonly _logService: ILogService,
private readonly _startupGracePeriodMs = 5000,
) {
super();
this._logService.debug('[McpGateway][ToolBroker] Initialized');
Expand Down Expand Up @@ -86,6 +99,50 @@ export class McpGatewayToolBrokerChannel extends Disposable implements IServerCh
return undefined;
}

private _waitForStartup(server: IMcpServer): Promise<boolean> {
const id = server.definition.id;
const existing = this._startupGrace.get(id);
// If the previous grace promise already resolved but the server is still
// Unknown/Outdated, the entry is stale (e.g. caches were reset). Discard
// it so we create a fresh race below.
if (existing?.resolved) {
const state = server.cacheState.get();
if (state === McpServerCacheState.Unknown || state === McpServerCacheState.Outdated) {
this._startupGrace.delete(id);
}
}
if (!this._startupGrace.has(id)) {
const entry: { promise: Promise<boolean>; resolved: boolean } = {
promise: Promise.race([
this._ensureServerReady(server),
new Promise<boolean>(resolve => setTimeout(() => resolve(false), this._startupGracePeriodMs)),
]),
resolved: false,
};
entry.promise.then(() => { entry.resolved = true; });
this._startupGrace.set(id, entry);
}
return this._startupGrace.get(id)!.promise;
}

private async _shouldUseCachedData(server: IMcpServer): Promise<boolean> {
const cacheState = server.cacheState.get();
if (cacheState === McpServerCacheState.Unknown || cacheState === McpServerCacheState.Outdated) {
// On first list call: wait up to the grace period for the server to start.
// On subsequent calls: the stored promise is already resolved, returns immediately.
// Outdated servers get the same grace period as Unknown — a prior fast startup
// does not guarantee a fast restart.
await this._waitForStartup(server);
const newState = server.cacheState.get();
return newState === McpServerCacheState.Live
|| newState === McpServerCacheState.Cached
|| newState === McpServerCacheState.RefreshingFromCached;
}
return cacheState === McpServerCacheState.Live
|| cacheState === McpServerCacheState.Cached
|| cacheState === McpServerCacheState.RefreshingFromCached;
}

listen<T>(_ctx: unknown, event: string): Event<T> {
switch (event) {
case 'onDidChangeTools':
Expand Down Expand Up @@ -129,28 +186,20 @@ export class McpGatewayToolBrokerChannel extends Disposable implements IServerCh
}

private async _listTools(): Promise<readonly MCP.Tool[]> {
const mcpTools: MCP.Tool[] = [];
const servers = this._mcpService.servers.get();
this._logService.debug(`[McpGateway][ToolBroker] listTools: ${servers.length} server(s) known`);
await Promise.all(servers.map(server => this._ensureServerReady(server)));

for (const server of servers) {
const cacheState = server.cacheState.get();
if (cacheState !== McpServerCacheState.Live && cacheState !== McpServerCacheState.Cached && cacheState !== McpServerCacheState.RefreshingFromCached) {
this._logService.debug(`[McpGateway][ToolBroker] Skipping server '${server.definition.id}' (cacheState=${cacheState})`);
continue;
const perServer = await Promise.all(servers.map(async server => {
if (!await this._shouldUseCachedData(server)) {
this._logService.debug(`[McpGateway][ToolBroker] Server '${server.definition.id}' not ready, skipping tool listing`);
return [] as MCP.Tool[];
}
return server.tools.get()
.filter(t => t.visibility & McpToolVisibility.Model)
.map(t => t.definition);
}));

for (const tool of server.tools.get()) {
if (!(tool.visibility & McpToolVisibility.Model)) {
continue;
}

mcpTools.push(tool.definition);
}
}

const mcpTools = perServer.flat();
this._logService.debug(`[McpGateway][ToolBroker] listTools result: ${mcpTools.length} tool(s): [${mcpTools.map(t => t.name).join(', ')}]`);

return mcpTools;
}

Expand Down Expand Up @@ -180,7 +229,9 @@ export class McpGatewayToolBrokerChannel extends Disposable implements IServerCh
this._logService.debug(`[McpGateway][ToolBroker] listResources: ${servers.length} server(s) known`);

await Promise.all(servers.map(async server => {
await this._ensureServerReady(server);
if (!await this._shouldUseCachedData(server)) {
return;
}

const capabilities = server.capabilities.get();
if (!capabilities || !(capabilities & McpCapability.Resources)) {
Expand Down Expand Up @@ -220,7 +271,9 @@ export class McpGatewayToolBrokerChannel extends Disposable implements IServerCh
this._logService.debug(`[McpGateway][ToolBroker] listResourceTemplates: ${servers.length} server(s) known`);

await Promise.all(servers.map(async server => {
await this._ensureServerReady(server);
if (!await this._shouldUseCachedData(server)) {
return;
}

const capabilities = server.capabilities.get();
if (!capabilities || !(capabilities & McpCapability.Resources)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import assert from 'assert';
import { CancellationToken } from '../../../../../base/common/cancellation.js';
import { observableValue } from '../../../../../base/common/observable.js';
import { runWithFakedTimers } from '../../../../../base/test/common/timeTravelScheduler.js';
import { ensureNoDisposablesAreLeakedInTestSuite } from '../../../../../base/test/common/utils.js';
import { NullLogService } from '../../../../../platform/log/common/log.js';
import { IGatewayCallToolResult } from '../../../../../platform/mcp/common/mcpGateway.js';
Expand Down Expand Up @@ -133,11 +134,121 @@ suite('McpGatewayToolBrokerChannel', () => {
);

mcpService.servers.set([server], undefined);
await channel.call<readonly MCP.Tool[]>(undefined, 'listTools');
const tools = await channel.call<readonly MCP.Tool[]>(undefined, 'listTools');

// Server started during the grace period; tools are now available.
assert.strictEqual(server.startCalls, 1);
assert.deepStrictEqual(tools.map(t => t.name), ['echo']);
channel.dispose();
});

test('starts server and waits within grace period when cache state is outdated', async () => {
const mcpService = new TestMcpService();
const channel = new McpGatewayToolBrokerChannel(mcpService, new NullLogService());

const server = createServer(
'collectionA',
'serverA',
[createTool('echo', async () => ({ content: [{ type: 'text', text: 'A' }] }))],
McpServerCacheState.Outdated,
);

mcpService.servers.set([server], undefined);
const tools = await channel.call<readonly MCP.Tool[]>(undefined, 'listTools');

// Outdated server gets the same grace period as Unknown — started and tools returned.
assert.strictEqual(server.startCalls, 1);
assert.deepStrictEqual(tools.map(t => t.name), ['echo']);
channel.dispose();
});

test('returns empty tools and does not re-wait if server does not start within grace period', () => {
return runWithFakedTimers({ useFakeTimers: true }, async () => {
const mcpService = new TestMcpService();
const channel = new McpGatewayToolBrokerChannel(mcpService, new NullLogService(), 100);

const server = createNeverStartingServer(
'collectionA',
'serverA',
[createTool('echo', async () => ({ content: [{ type: 'text', text: 'A' }] }))],
);

mcpService.servers.set([server], undefined);

// First call: waits up to the grace period, server never starts → empty result.
const tools = await channel.call<readonly MCP.Tool[]>(undefined, 'listTools');
assert.deepStrictEqual(tools, []);

// Second call: grace-period promise already resolved; returns immediately without re-waiting.
const tools2 = await channel.call<readonly MCP.Tool[]>(undefined, 'listTools');
assert.deepStrictEqual(tools2, []);

channel.dispose();
});
});

test('invalidates stale grace entry when cacheState regresses to Unknown after timeout', () => {
return runWithFakedTimers({ useFakeTimers: true }, async () => {
const mcpService = new TestMcpService();
const channel = new McpGatewayToolBrokerChannel(mcpService, new NullLogService(), 100);

const server = createNeverStartingServer(
'collectionA',
'serverA',
[createTool('echo', async () => ({ content: [{ type: 'text', text: 'A' }] }))],
);

mcpService.servers.set([server], undefined);

// First call: grace period elapses, server never starts → empty.
const tools1 = await channel.call<readonly MCP.Tool[]>(undefined, 'listTools');
assert.deepStrictEqual(tools1, []);
assert.strictEqual(server.startCalls, 1);

// Simulate a cache reset: server goes back to Unknown.
server.cacheStateValue.set(McpServerCacheState.Unknown, undefined);

// Make the server succeed this time.
server.startBehavior = 'succeed';

// Second call: stale grace entry should be discarded, a new grace race starts,
// and the server successfully starts → tools returned.
const tools2 = await channel.call<readonly MCP.Tool[]>(undefined, 'listTools');
assert.deepStrictEqual(tools2.map(t => t.name), ['echo']);
assert.strictEqual(server.startCalls, 2);

channel.dispose();
});
});

test('does not invalidate grace entry when cacheState is not Unknown/Outdated', () => {
return runWithFakedTimers({ useFakeTimers: true }, async () => {
const mcpService = new TestMcpService();
const channel = new McpGatewayToolBrokerChannel(mcpService, new NullLogService(), 100);

const server = createServer(
'collectionA',
'serverA',
[createTool('echo', async () => ({ content: [{ type: 'text', text: 'A' }] }))],
McpServerCacheState.Unknown,
);

mcpService.servers.set([server], undefined);

// First call: server starts successfully during grace period.
const tools1 = await channel.call<readonly MCP.Tool[]>(undefined, 'listTools');
assert.deepStrictEqual(tools1.map(t => t.name), ['echo']);
assert.strictEqual(server.startCalls, 1);

// Second call: cacheState is now Live (server started), grace entry should NOT
// be invalidated, so no additional start call is made.
const tools2 = await channel.call<readonly MCP.Tool[]>(undefined, 'listTools');
assert.deepStrictEqual(tools2.map(t => t.name), ['echo']);
assert.strictEqual(server.startCalls, 1);

channel.dispose();
});
});
});

function createServer(
Expand Down Expand Up @@ -178,6 +289,51 @@ function createServer(
};
}

function createNeverStartingServer(
collectionId: string,
definitionId: string,
initialTools: readonly IMcpTool[],
): IMcpServer & { startCalls: number; startBehavior: 'hang' | 'succeed'; cacheStateValue: ReturnType<typeof observableValue<McpServerCacheState>> } {
const owner = {};
const tools = observableValue<readonly IMcpTool[]>(owner, initialTools);
const connectionState = observableValue<McpConnectionState>(owner, { state: McpConnectionState.Kind.Running });
const cacheState = observableValue<McpServerCacheState>(owner, McpServerCacheState.Unknown);
let startCalls = 0;
let startBehavior: 'hang' | 'succeed' = 'hang';

const result: IMcpServer & { startCalls: number; startBehavior: 'hang' | 'succeed'; cacheStateValue: ReturnType<typeof observableValue<McpServerCacheState>> } = {
collection: { id: collectionId, label: collectionId },
definition: { id: definitionId, label: definitionId },
connection: observableValue(owner, undefined),
connectionState,
serverMetadata: observableValue(owner, undefined),
readDefinitions: () => observableValue(owner, { server: undefined, collection: undefined }),
showOutput: async () => { },
start: async () => {
startCalls++;
if (result.startBehavior === 'succeed') {
cacheState.set(McpServerCacheState.Live, undefined);
return { state: McpConnectionState.Kind.Running };
}
// Never resolves — simulates a server that hangs on startup.
return new Promise<McpConnectionState>(() => { });
},
stop: async () => { },
cacheState,
tools,
prompts: observableValue(owner, []),
capabilities: observableValue(owner, undefined),
resources: () => (async function* () { })(),
resourceTemplates: async () => [],
dispose: () => { },
get startCalls() { return startCalls; },
get startBehavior() { return startBehavior; },
set startBehavior(v) { startBehavior = v; },
cacheStateValue: cacheState,
};
return result;
}

function createTool(name: string, call: (params: Record<string, unknown>) => Promise<MCP.CallToolResult>, visibility: McpToolVisibility = McpToolVisibility.Model): IMcpTool {
const definition: MCP.Tool = {
name,
Expand Down
Loading