Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 46 additions & 2 deletions src/mcp/actors.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { log } from 'apify';
import type { ActorDefinition } from 'apify-client';

import { ApifyClient, getApifyAPIBaseUrl } from '../apify-client.js';
Expand All @@ -8,7 +9,7 @@ export async function isActorMCPServer(actorID: string, apifyToken: string): Pro
}

export async function getActorsMCPServerPath(actorID: string, apifyToken: string): Promise<string | undefined> {
const actorDefinition = await getActorDefinition(actorID, apifyToken);
const actorDefinition = await getFullActorDefinition(actorID, apifyToken);

if ('webServerMcpPath' in actorDefinition && typeof actorDefinition.webServerMcpPath === 'string') {
return actorDefinition.webServerMcpPath;
Expand All @@ -26,20 +27,32 @@ export async function getActorsMCPServerURL(actorID: string, apifyToken: string)
return `${standbyUrl}${mcpPath}`;
}

// ID does not change, so no TTL
export const actorIDCache: Record<string, string> = {};
/**
* Gets Actor ID from the Actor object.
*
* @param actorID
* @param apifyToken
*/
export async function getRealActorID(actorID: string, apifyToken: string): Promise<string> {
if (actorIDCache[actorID]) {
log.debug(`Actor ${actorID} ID cache hit`);
return actorIDCache[actorID];
}
log.debug(`Actor ${actorID} ID cache miss`);

const apifyClient = new ApifyClient({ token: apifyToken });

const actor = apifyClient.actor(actorID);
const info = await actor.get();
if (!info) {
throw new Error(`Actor ${actorID} not found`);
}

if (!actorIDCache[actorID]) {
actorIDCache[actorID] = info.id;
}
return info.id;
}

Expand All @@ -56,7 +69,29 @@ export async function getActorStandbyURL(actorID: string, apifyToken: string, st
return `https://${actorRealID}.${standbyBaseUrl}`;
}

export async function getActorDefinition(actorID: string, apifyToken: string): Promise<ActorDefinition> {
export const actorDefinitionCache: Record<string, {
timestamp: number;
definition: ActorDefinition;
}> = {};
export const ACTOR_DEFINITION_CACHE_TTL_MS = 1000 * 60 * 60; // 1 hour
/**
* Gets full Actor definition from the Apify API.
*/
export async function getFullActorDefinition(actorID: string, apifyToken: string): Promise<ActorDefinition> {
const cacheInTTL = Date.now() - (actorDefinitionCache[actorID]?.timestamp || 0) < ACTOR_DEFINITION_CACHE_TTL_MS;
// Hit the cache
if (actorDefinitionCache[actorID]
&& cacheInTTL) {
log.debug(`Actor ${actorID} definition cache hit`);
return actorDefinitionCache[actorID].definition;
}
// Refresh the cache after TTL expired
if (actorDefinitionCache[actorID] && !cacheInTTL) {
log.debug(`Actor ${actorID} definition cache TTL expired, re-fetching`);
} else {
log.debug(`Actor ${actorID} definition cache miss`);
}

const apifyClient = new ApifyClient({ token: apifyToken });
const actor = apifyClient.actor(actorID);
const info = await actor.get();
Expand Down Expand Up @@ -84,5 +119,14 @@ export async function getActorDefinition(actorID: string, apifyToken: string): P
throw new Error(`Actor default build ${actorID} does not have Actor definition`);
}

// If the Actor is public, we cache the definition
// This code branch is executed only on cache miss, so we know the cache entry is empty
if (info.isPublic) {
actorDefinitionCache[actorID] = {
timestamp: Date.now(),
definition: actorDefinition,
};
}

return actorDefinition;
}
59 changes: 45 additions & 14 deletions src/mcp/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import type { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import type { Transport } from '@modelcontextprotocol/sdk/shared/transport.js';
import { CallToolRequestSchema, ListToolsRequestSchema } from '@modelcontextprotocol/sdk/types.js';
import { CallToolRequestSchema, ErrorCode, ListToolsRequestSchema, McpError } from '@modelcontextprotocol/sdk/types.js';
import type { ActorCallOptions } from 'apify-client';

import log from '@apify/log';
Expand Down Expand Up @@ -116,15 +116,14 @@ export class ActorsMcpServer {
* @returns {object} - The response object containing the tools.
*/
this.server.setRequestHandler(ListToolsRequestSchema, async () => {
// TODO if there is actor-mcp as a tool, also list the tools from that Actor
const tools = Array.from(this.tools.values()).map((tool) => (tool.tool));
return { tools };
});

/**
* Handles the request to call a tool.
* @param {object} request - The request object containing tool name and arguments.
* @throws {Error} - Throws an error if the tool is unknown or arguments are invalid.
* @throws {McpError} - based on the McpServer class code from the typescript MCP SDK
*/
this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
Expand All @@ -135,27 +134,47 @@ export class ActorsMcpServer {

// Validate token
if (!apifyToken) {
throw new Error('APIFY_TOKEN is required but not set in the environment variables or passed as a parameter.');
const msg = 'APIFY_TOKEN is required. It must be set in the environment variables or passed as a parameter in the body.';
log.error(msg);
await this.server.sendLoggingMessage({ level: 'error', data: msg });
throw new McpError(
ErrorCode.InvalidParams,
msg,
);
}

// TODO - log errors
// TODO: handle errors better, server.sendLoggingMessage ( )
// TODO - do not raise but return mcp errors
// TODO - if connection is /mcp client will not receive notification on tool change

// Find tool by name or actor full name
const tool = Array.from(this.tools.values())
.find((t) => t.tool.name === name || (t.type === 'actor' && (t.tool as ActorTool).actorFullName === name));
if (!tool) {
await this.server.sendLoggingMessage({ level: 'info', data: `Unknown tool $\{name}, available tools ${this.getToolNames()}` });
throw new Error(`Unknown tool: ${name}`);
const msg = `Tool ${name} not found. Available tools: ${this.getToolNames().join(', ')}`;
log.error(msg);
await this.server.sendLoggingMessage({ level: 'error', data: msg });
throw new McpError(
ErrorCode.InvalidParams,
msg,
);
}
if (!args) {
throw new Error(`Missing arguments for tool: ${name}`);
const msg = `Missing arguments for tool ${name}`;
log.error(msg);
await this.server.sendLoggingMessage({ level: 'error', data: msg });
throw new McpError(
ErrorCode.InvalidParams,
msg,
);
}
log.info(`Validate arguments for tool: ${tool.tool.name} with arguments: ${JSON.stringify(args)}`);
if (!tool.tool.ajvValidate(args)) {
throw new Error(`Invalid arguments for tool ${tool.tool.name}: args: ${JSON.stringify(args)} error: ${JSON.stringify(tool?.tool.ajvValidate.errors)}`);
const msg = `Invalid arguments for tool ${tool.tool.name}: args: ${JSON.stringify(args)} error: ${JSON.stringify(tool?.tool.ajvValidate.errors)}`;
log.error(msg);
await this.server.sendLoggingMessage({ level: 'error', data: msg });
throw new McpError(
ErrorCode.InvalidParams,
msg,
);
}

try {
Expand Down Expand Up @@ -207,11 +226,23 @@ export class ActorsMcpServer {
return { content };
}
} catch (error) {
log.error(`Error calling tool: ${error}`);
throw new Error(`Error calling tool: ${error}`);
log.error(`Error calling tool ${name}: ${error}`);
throw new McpError(
ErrorCode.InternalError,
`An error occurred while calling the tool.`,
);
}

throw new Error(`Tool ${name} is not implemented`);
const msg = `Unknown tool: ${name}`;
log.error(msg);
await this.server.sendLoggingMessage({
level: 'error',
data: msg,
});
throw new McpError(
ErrorCode.InvalidParams,
msg,
);
});
}

Expand Down
3 changes: 2 additions & 1 deletion src/tools/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ async function getMCPServersAsTools(
): Promise<ToolWrap[]> {
const actorsMCPServerTools: ToolWrap[] = [];
for (const actorID of actors) {
// getFullActorDefinition uses cache, so we can call it twice (this is the second time)
const serverUrl = await getActorsMCPServerURL(actorID, apifyToken);
log.info(`ActorID: ${actorID} MCP server URL: ${serverUrl}`);

Expand All @@ -150,7 +151,7 @@ export async function getActorsAsTools(
// Actorized MCP servers
const actorsMCPServers: string[] = [];
for (const actorID of actors) {
// TODO: rework, we are fetching actor definition from API twice - in the getMCPServerTools
// getFullActorDefinition uses cache, so we can call it twice (second time in the getMCPServerTools)
if (await isActorMCPServer(actorID, apifyToken)) {
actorsMCPServers.push(actorID);
}
Expand Down
Loading