Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 50 additions & 6 deletions src/mcp/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,68 @@ import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/

import log from '@apify/log';

import { ACTORIZED_MCP_CONNECTION_TIMEOUT_MSEC } from './const.js';
import { getMCPServerID } from './utils.js';

class TimeoutError extends Error {
override readonly name = 'TimeoutError';
}

/**
* Creates and connects a ModelContextProtocol client.
* First tries streamable HTTP transport, then falls back to SSE transport.
*/
export async function connectMCPClient(
url: string, token: string,
): Promise<Client> {
): Promise<Client | null> {
let client: Client;
try {
return await createMCPStreamableClient(url, token);
} catch {
client = await createMCPStreamableClient(url, token);
return client;
} catch (error) {
// If streamable HTTP transport fails on not timeout error, continue with SSE transport
if (error instanceof TimeoutError) {
log.warning('Connection to MCP server using streamable HTTP transport timed out', { url });
return null;
}

// If streamable HTTP transport fails, fall back to SSE transport
log.debug('Streamable HTTP transport failed, falling back to SSE transport', {
url,
});
return await createMCPSSEClient(url, token);
}

try {
client = await createMCPSSEClient(url, token);
return client;
} catch (error) {
if (error instanceof TimeoutError) {
log.warning('Connection to MCP server using SSE transport timed out', { url });
return null;
}

log.error('Failed to connect to MCP server using SSE transport', { cause: error });
throw error;
Comment on lines +34 to +45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit/question: Why is the try block here? We are just catching and then re-throwing. I would remove the try block for the SSE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I throw exception, connection will not be opened.
When TimeoutError occurs, only warning will be logged to us and connection will be opened, but without this MCP server.

}
}

async function withTimeout<T>(millis: number, promise: Promise<T>): Promise<T> {
let timeoutPid: NodeJS.Timeout;
const timeout = new Promise<never>((_resolve, reject) => {
timeoutPid = setTimeout(
() => reject(new TimeoutError(`Timed out after ${millis} ms.`)),
millis,
);
});

return Promise.race([
promise,
timeout,
]).finally(() => {
if (timeoutPid) {
clearTimeout(timeoutPid);
}
});
}

/**
Expand Down Expand Up @@ -56,7 +100,7 @@ async function createMCPSSEClient(
version: '1.0.0',
});

await client.connect(transport);
await withTimeout(ACTORIZED_MCP_CONNECTION_TIMEOUT_MSEC, client.connect(transport));

return client;
}
Expand All @@ -82,7 +126,7 @@ async function createMCPStreamableClient(
version: '1.0.0',
});

await client.connect(transport);
await withTimeout(ACTORIZED_MCP_CONNECTION_TIMEOUT_MSEC, client.connect(transport));

return client;
}
1 change: 1 addition & 0 deletions src/mcp/const.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export const MAX_TOOL_NAME_LENGTH = 64;
export const SERVER_ID_LENGTH = 8;
export const EXTERNAL_TOOL_CALL_TIMEOUT_MSEC = 120_000; // 2 minutes
export const ACTORIZED_MCP_CONNECTION_TIMEOUT_MSEC = 30_000; // 30 seconds

export const LOG_LEVEL_MAP: Record<string, number> = {
debug: 0,
Expand Down
12 changes: 11 additions & 1 deletion src/mcp/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -548,9 +548,19 @@ export class ActorsMcpServer {

if (tool.type === 'actor-mcp') {
const serverTool = tool.tool as ActorMcpTool;
let client: Client | undefined;
let client: Client | null = null;
try {
client = await connectMCPClient(serverTool.serverUrl, apifyToken);
if (!client) {
const msg = `Failed to connect to MCP server ${serverTool.serverUrl}`;
log.error(msg);
await this.server.sendLoggingMessage({ level: 'error', data: msg });
return {
content: [
{ type: 'text', text: msg },
],
};
}

// Only set up notification handlers if progressToken is provided by the client
if (progressToken) {
Expand Down
25 changes: 17 additions & 8 deletions src/tools/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,21 +206,22 @@ async function getMCPServersAsTools(
/**
* This is case for the Skyfire request without any Apify token, we do not support
* standby Actors in this case so we can skip MCP servers since they would fail anyway (they are standby Actors).
*/
*/
if (apifyToken === null || apifyToken === undefined) {
return [];
}

const actorsMCPServerTools: ToolEntry[] = [];
for (const actorInfo of actorsInfo) {
// Process all actors in parallel
const actorToolPromises = actorsInfo.map(async (actorInfo) => {
const actorId = actorInfo.actorDefinitionPruned.id;
if (!actorInfo.webServerMcpPath) {
log.warning('Actor does not have a web server MCP path, skipping', {
actorFullName: actorInfo.actorDefinitionPruned.actorFullName,
actorId,
});
continue;
return [];
}

const mcpServerUrl = await getActorMCPServerURL(
actorInfo.actorDefinitionPruned.id, // Real ID of the Actor
actorInfo.webServerMcpPath,
Expand All @@ -231,17 +232,25 @@ async function getMCPServersAsTools(
mcpServerUrl,
});

let client: Client | undefined;
let client: Client | null = null;
try {
client = await connectMCPClient(mcpServerUrl, apifyToken);
if (!client) {
// Skip this Actor, connectMCPClient will log the error
return [];
}
const serverTools = await getMCPServerTools(actorId, client, mcpServerUrl);
actorsMCPServerTools.push(...serverTools);
return serverTools;
} finally {
if (client) await client.close();
}
}
});

// Wait for all actors to be processed in parallel
const actorToolsArrays = await Promise.all(actorToolPromises);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notice: for the most use cases it should be fine, but we should be aware of rate limits

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find any rate limiting docs in the public docs on internal Notion. What type of limits do you mean?


return actorsMCPServerTools;
// Flatten the arrays of tools
return actorToolsArrays.flat();
}

export async function getActorsAsTools(
Expand Down
16 changes: 16 additions & 0 deletions tests/integration/suite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -940,5 +940,21 @@ export function createIntegrationTestsSuite(

await client.close();
});

// Skipping tests, MCP actors are not available in staging environment
// TODO: run this tests only for local environment
it.skip.for([
'mcp-servers/slidespeak-mcp-server',
'mcp-servers/brave-search-mcp-server',
'jiri.spilka/weather-mcp-server',
'apify/actors-mcp-server',
'jakub.kopecky/browserbase-mcp-server',
'jakub.kopecky/arxiv-mcp-server',
'jiri.spilka/playwright-mcp-server',
])('should connect to "%s" MCP server and at least one tool is available', async (mcpServer) => {
client = await createClientFn({ tools: [mcpServer] });
const tools = await client.listTools();
expect(tools.tools.length).toBeGreaterThan(0);
});
});
}