diff --git a/src/actor/server.ts b/src/actor/server.ts index b1c52248..83d1beae 100644 --- a/src/actor/server.ts +++ b/src/actor/server.ts @@ -11,6 +11,7 @@ import express from 'express'; import log from '@apify/log'; +import { ApifyClient } from '../apify-client.js'; import { ActorsMcpServer } from '../mcp/server.js'; import { getHelpMessage, HEADER_READINESS_PROBE, Routes, TransportType } from './const.js'; import { getActorRunData } from './utils.js'; @@ -69,13 +70,14 @@ export function createExpressApp( rt: Routes.SSE, tr: TransportType.SSE, }); - const mcpServer = new ActorsMcpServer(false); + const mcpServer = new ActorsMcpServer({ setupSigintHandler: false }); const transport = new SSEServerTransport(Routes.MESSAGE, res); // Load MCP server tools const apifyToken = process.env.APIFY_TOKEN as string; log.debug('Loading tools from URL', { sessionId: transport.sessionId, tr: TransportType.SSE }); - await mcpServer.loadToolsFromUrl(req.url, apifyToken); + const apifyClient = new ApifyClient({ token: apifyToken }); + await mcpServer.loadToolsFromUrl(req.url, apifyClient); transportsSSE[transport.sessionId] = transport; mcpServers[transport.sessionId] = mcpServer; @@ -152,12 +154,13 @@ export function createExpressApp( sessionIdGenerator: () => randomUUID(), enableJsonResponse: false, // Use SSE response mode }); - const mcpServer = new ActorsMcpServer(false); + const mcpServer = new ActorsMcpServer({ setupSigintHandler: false }); // Load MCP server tools const apifyToken = process.env.APIFY_TOKEN as string; log.debug('Loading tools from URL', { sessionId: transport.sessionId, tr: TransportType.HTTP }); - await mcpServer.loadToolsFromUrl(req.url, apifyToken); + const apifyClient = new ApifyClient({ token: apifyToken }); + await mcpServer.loadToolsFromUrl(req.url, apifyClient); // Connect the transport to the MCP server BEFORE handling the request await mcpServer.connect(transport); diff --git a/src/apify-client.ts b/src/apify-client.ts index 026ba79d..e3c6cc8e 100644 --- a/src/apify-client.ts +++ b/src/apify-client.ts @@ -4,6 +4,11 @@ import type { AxiosRequestConfig } from 'axios'; import { USER_AGENT_ORIGIN } from './const.js'; +interface ExtendedApifyClientOptions extends Omit { + token?: string | null | undefined; + skyfirePayId?: string; +} + /** * Adds a User-Agent header to the request config. * @param config @@ -23,22 +28,37 @@ export function getApifyAPIBaseUrl(): string { } export class ApifyClient extends _ApifyClient { - constructor(options: ApifyClientOptions) { + constructor(options: ExtendedApifyClientOptions) { /** * In order to publish to DockerHub, we need to run their build task to validate our MCP server. * This was failing since we were sending this dummy token to Apify in order to build the Actor tools. * So if we encounter this dummy value, we remove it to use Apify client as unauthenticated, which is sufficient * for server start and listing of tools. */ - if (options.token?.toLowerCase() === 'your-apify-token') { + if (options.token?.toLowerCase() === 'your-apify-token' || options.token === null) { // eslint-disable-next-line no-param-reassign delete options.token; } + const { skyfirePayId, ...clientOptions } = options; + const requestInterceptors = [addUserAgent]; + /** + * Add skyfire-pay-id header if provided. + */ + if (skyfirePayId) { + requestInterceptors.push((config) => { + const updatedConfig = { ...config }; + updatedConfig.headers = updatedConfig.headers ?? {}; + updatedConfig.headers['skyfire-pay-id'] = skyfirePayId; + return updatedConfig; + }); + } + super({ - ...options, + // token null case is handled, we can assert type here + ...clientOptions as ApifyClientOptions, baseUrl: getApifyAPIBaseUrl(), - requestInterceptors: [addUserAgent], + requestInterceptors, }); } } diff --git a/src/const.ts b/src/const.ts index d66e742a..104338b2 100644 --- a/src/const.ts +++ b/src/const.ts @@ -53,6 +53,14 @@ export const defaults = { }; export const ACTOR_ADDITIONAL_INSTRUCTIONS = 'Never call/execute tool/Actor unless confirmed by the user.'; +export const SKYFIRE_MIN_CHARGE_USD = 5.0; +export const SKYFIRE_SELLER_ID = process.env.SKYFIRE_SELLER_SERVICE_ID; + +export const SKYFIRE_TOOL_INSTRUCTIONS = `To run the Actor, you need to provide a Skyfire payment ID in the \`skyfire-pay-id\` input property. You first need to create the Skyfire PAY token by calling the \`create-pay-token\` tool from the Skyfire MCP server and then provide the created token in the \`skyfire-pay-id\` input property when calling Actor tool. The Skyfire PAY token is used for billing purposes and needs to be charged with at least ${SKYFIRE_MIN_CHARGE_USD} USD.${SKYFIRE_SELLER_ID ? ` The Skyfire seller ID for Apify MCP server is "${SKYFIRE_SELLER_ID}".` : ''} When retrieving Actor output using the \`get-actor-output\` tool, you also need to provide the same Skyfire PAY token ID in the \`skyfire-pay-id\` input property.`; + +export const SKYFIRE_PAY_ID_PROPERTY_DESCRIPTION = `Skyfire PAY token ID used for billing purposes.`; + +export const SKYFIRE_README_CONTENT = `The Apify MCP Server allows clients to interact with Apify Actors, storage, and documentation. Before running any Apify Actor, you need to set up a Skyfire PAY token for billing purposes. To set up the PAY token you need to use the create-pay-token tool from the Skyfire MCP server. The PAY token must be charged with a minimum of ${SKYFIRE_MIN_CHARGE_USD} USD to be valid.${SKYFIRE_SELLER_ID ? ` The Skyfire seller ID for Apify MCP server is "${SKYFIRE_SELLER_ID}".` : ''} When running an Actor using the call-actor tool, you must provide the Skyfire PAY token ID in the \`skyfire-pay-id\` input property. Similarly, when retrieving Actor output using the get-actor-output tool, you must also provide the same Skyfire PAY token ID in the \`skyfire-pay-id\` input property.`; // Cache export const ACTOR_CACHE_MAX_SIZE = 500; diff --git a/src/index-internals.ts b/src/index-internals.ts index 364fe5ae..0c805246 100644 --- a/src/index-internals.ts +++ b/src/index-internals.ts @@ -2,6 +2,7 @@ This file provides essential internal functions for Apify MCP servers, serving as an internal library. */ +import { ApifyClient } from './apify-client.js'; import { defaults, HelperTools } from './const.js'; import { processParamsGetTools } from './mcp/utils.js'; import { addTool } from './tools/helpers.js'; @@ -12,6 +13,7 @@ import { getExpectedToolNamesByCategories, getToolPublicFieldOnly } from './util import { TTLLRUCache } from './utils/ttl-lru.js'; export { + ApifyClient, getExpectedToolNamesByCategories, TTLLRUCache, actorNameToToolName, diff --git a/src/main.ts b/src/main.ts index 5150d199..070e0fb9 100644 --- a/src/main.ts +++ b/src/main.ts @@ -9,6 +9,7 @@ import type { ActorCallOptions } from 'apify-client'; import log from '@apify/log'; import { createExpressApp } from './actor/server.js'; +import { ApifyClient } from './apify-client.js'; import { processInput } from './input.js'; import { callActorGetDataset } from './tools/index.js'; import type { Input } from './types.js'; @@ -44,7 +45,9 @@ if (STANDBY_MODE) { await Actor.fail('If you need to debug a specific Actor, please provide the debugActor and debugActorInput fields in the input'); } const options = { memory: input.maxActorMemoryBytes } as ActorCallOptions; - const callResult = await callActorGetDataset(input.debugActor!, input.debugActorInput!, process.env.APIFY_TOKEN, options); + + const apifyClient = new ApifyClient({ token: process.env.APIFY_TOKEN }); + const callResult = await callActorGetDataset(input.debugActor!, input.debugActorInput!, apifyClient, options); if (callResult && callResult.previewItems.length > 0) { await Actor.pushData(callResult.previewItems); diff --git a/src/mcp/actors.ts b/src/mcp/actors.ts index 197d071c..64d62991 100644 --- a/src/mcp/actors.ts +++ b/src/mcp/actors.ts @@ -64,19 +64,3 @@ export async function getRealActorID(actorIdOrName: string, apifyToken: string): export async function getActorStandbyURL(realActorId: string, standbyBaseUrl = 'apify.actor'): Promise { return `https://${realActorId}.${standbyBaseUrl}`; } - -export async function getActorDefinition(actorID: string, apifyToken: string): Promise { - const apifyClient = new ApifyClient({ token: apifyToken }); - const actor = apifyClient.actor(actorID); - const defaultBuildClient = await actor.defaultBuild(); - const buildInfo = await defaultBuildClient.get(); - if (!buildInfo) { - throw new Error(`Default build for Actor ${actorID} not found`); - } - const { actorDefinition } = buildInfo; - if (!actorDefinition) { - throw new Error(`Actor default build ${actorID} does not have Actor definition`); - } - - return actorDefinition; -} diff --git a/src/mcp/server.ts b/src/mcp/server.ts index 9d5267a4..2847af9e 100644 --- a/src/mcp/server.ts +++ b/src/mcp/server.ts @@ -15,6 +15,7 @@ import { ListResourceTemplatesRequestSchema, ListToolsRequestSchema, McpError, + ReadResourceRequestSchema, ServerNotificationSchema, SetLevelRequestSchema, } from '@modelcontextprotocol/sdk/types.js'; @@ -23,9 +24,14 @@ import { type ActorCallOptions, ApifyApiError } from 'apify-client'; import log from '@apify/log'; +import { ApifyClient } from '../apify-client.js'; import { + HelperTools, SERVER_NAME, SERVER_VERSION, + SKYFIRE_PAY_ID_PROPERTY_DESCRIPTION, + SKYFIRE_README_CONTENT, + SKYFIRE_TOOL_INSTRUCTIONS, } from '../const.js'; import { prompts } from '../prompts/index.js'; import { callActorGetDataset, defaultTools, getActorsAsTools, toolCategories } from '../tools/index.js'; @@ -40,6 +46,14 @@ import { processParamsGetTools } from './utils.js'; type ToolsChangedHandler = (toolNames: string[]) => void; +interface ActorsMcpServerOptions { + setupSigintHandler?: boolean; + /** + * Switch to enable Skyfire agentic payment mode. + */ + skyfireMode?: boolean; +} + /** * Create Apify MCP server */ @@ -49,8 +63,11 @@ export class ActorsMcpServer { private toolsChangedHandler: ToolsChangedHandler | undefined; private sigintHandler: (() => Promise) | undefined; private currentLogLevel = 'info'; + public readonly options: ActorsMcpServerOptions; - constructor(setupSigintHandler = true) { + constructor(options: ActorsMcpServerOptions = {}) { + this.options = options; + const { setupSigintHandler = true } = options; this.server = new Server( { name: SERVER_NAME, @@ -161,7 +178,7 @@ export class ActorsMcpServer { * @param toolNames - Array of tool names to ensure are loaded * @param apifyToken - Apify API token for authentication */ - public async loadToolsByName(toolNames: string[], apifyToken: string) { + public async loadToolsByName(toolNames: string[], apifyClient: ApifyClient) { const loadedTools = this.listAllToolNames(); const actorsToLoad: string[] = []; const toolsToLoad: ToolEntry[] = []; @@ -186,7 +203,7 @@ export class ActorsMcpServer { } if (actorsToLoad.length > 0) { - await this.loadActorsAsTools(actorsToLoad, apifyToken); + await this.loadActorsAsTools(actorsToLoad, apifyClient); } } @@ -197,8 +214,8 @@ export class ActorsMcpServer { * @param apifyToken - Apify API token for authentication * @returns Promise - Array of loaded tool entries */ - public async loadActorsAsTools(actorIdsOrNames: string[], apifyToken: string): Promise { - const actorTools = await getActorsAsTools(actorIdsOrNames, apifyToken); + public async loadActorsAsTools(actorIdsOrNames: string[], apifyClient: ApifyClient): Promise { + const actorTools = await getActorsAsTools(actorIdsOrNames, apifyClient); if (actorTools.length > 0) { this.upsertTools(actorTools, true); } @@ -212,8 +229,8 @@ export class ActorsMcpServer { * * Used primarily for SSE. */ - public async loadToolsFromUrl(url: string, apifyToken: string) { - const tools = await processParamsGetTools(url, apifyToken); + public async loadToolsFromUrl(url: string, apifyClient: ApifyClient) { + const tools = await processParamsGetTools(url, apifyClient); if (tools.length > 0) { log.debug('Loading tools from query parameters'); this.upsertTools(tools, false); @@ -307,10 +324,44 @@ export class ActorsMcpServer { private setupResourceHandlers(): void { this.server.setRequestHandler(ListResourcesRequestSchema, async () => { - // No resources available, return empty response + /** + * Return the usage guide resource only if Skyfire mode is enabled. No resources otherwise for normal mode. + */ + if (this.options.skyfireMode) { + return { + resources: [ + { + uri: 'file://readme.md', + name: 'readme', + description: `Apify MCP Server usage guide. Read this to understand how to use the server, especially in Skyfire mode before interacting with it.`, + mimeType: 'text/markdown', + }, + ], + }; + } return { resources: [] }; }); + if (this.options.skyfireMode) { + this.server.setRequestHandler(ReadResourceRequestSchema, async (request) => { + const { uri } = request.params; + if (uri === 'file://readme.md') { + return { + contents: [{ + uri: 'file://readme.md', + mimeType: 'text/markdown', + text: SKYFIRE_README_CONTENT, + }], + }; + } + return { + contents: [{ + uri, mimeType: 'text/plain', text: `Resource ${uri} not found`, + }], + }; + }); + } + this.server.setRequestHandler(ListResourceTemplatesRequestSchema, async () => { // No resource templates available, return empty response return { resourceTemplates: [] }; @@ -368,6 +419,26 @@ export class ActorsMcpServer { * @returns {object} - The response object containing the tools. */ this.server.setRequestHandler(ListToolsRequestSchema, async () => { + /** + * Hack for the Skyfire agentic payments, we check if Skyfire mode is enabled we ad-hoc add + * the `skyfire-pay-id` input property to all Actor tools and `call-actor` and `get-actor-output` tool. + */ + if (this.options.skyfireMode) { + for (const toolEntry of this.tools.values()) { + if (toolEntry.type === 'actor' + || (toolEntry.type === 'internal' && toolEntry.tool.name === HelperTools.ACTOR_CALL) + || (toolEntry.type === 'internal' && toolEntry.tool.name === HelperTools.ACTOR_OUTPUT_GET)) { + if (toolEntry.tool.inputSchema && 'properties' in toolEntry.tool.inputSchema) { + (toolEntry.tool.inputSchema.properties as Record)['skyfire-pay-id'] = { + type: 'string', + description: SKYFIRE_PAY_ID_PROPERTY_DESCRIPTION, + }; + } + // Update description to include Skyfire instructions + toolEntry.tool.description += `\n\n${SKYFIRE_TOOL_INSTRUCTIONS}`; + } + } + } const tools = Array.from(this.tools.values()).map((tool) => getToolPublicFieldOnly(tool.tool)); return { tools }; }); @@ -391,7 +462,7 @@ export class ActorsMcpServer { delete request.params.userRentedActorIds; // Validate token - if (!apifyToken) { + if (!apifyToken && !this.options.skyfireMode) { const msg = 'APIFY_TOKEN is required. It must be set in the environment variables or passed as a parameter in the body.'; log.error(msg); await this.server.sendLoggingMessage({ level: 'error', data: msg }); @@ -516,6 +587,17 @@ export class ActorsMcpServer { // Handle actor tool if (tool.type === 'actor') { + if (this.options.skyfireMode + && args['skyfire-pay-id'] === undefined + ) { + return { + content: [{ + type: 'text', + text: SKYFIRE_TOOL_INSTRUCTIONS, + }], + }; + } + const actorTool = tool.tool as ActorTool; // Create progress tracker if progressToken is available @@ -523,12 +605,20 @@ export class ActorsMcpServer { const callOptions: ActorCallOptions = { memory: actorTool.memoryMbytes }; + /** + * Create Apify token, for Skyfire mode use `skyfire-pay-id` and for normal mode use `apifyToken`. + */ + const { 'skyfire-pay-id': skyfirePayId, ...actorArgs } = args as Record; + const apifyClient = this.options.skyfireMode && typeof skyfirePayId === 'string' + ? new ApifyClient({ skyfirePayId }) + : new ApifyClient({ token: apifyToken }); + try { - log.info('Calling Actor', { actorName: actorTool.actorFullName, input: args }); + log.info('Calling Actor', { actorName: actorTool.actorFullName, input: actorArgs }); const callResult = await callActorGetDataset( actorTool.actorFullName, - args, - apifyToken as string, + actorArgs, + apifyClient, callOptions, progressTracker, extra.signal, diff --git a/src/mcp/utils.ts b/src/mcp/utils.ts index 8c682c06..9963646b 100644 --- a/src/mcp/utils.ts +++ b/src/mcp/utils.ts @@ -1,6 +1,8 @@ import { createHash } from 'node:crypto'; import { parse } from 'node:querystring'; +import type { ApifyClient } from 'apify-client'; + import { processInput } from '../input.js'; import type { Input } from '../types.js'; import { loadToolsFromInput } from '../utils/tools-loader.js'; @@ -39,9 +41,9 @@ export function getProxyMCPServerToolName(url: string, toolName: string): string * @param url * @param apifyToken */ -export async function processParamsGetTools(url: string, apifyToken: string) { +export async function processParamsGetTools(url: string, apifyClient: ApifyClient) { const input = parseInputParamsFromUrl(url); - return await loadToolsFromInput(input, apifyToken); + return await loadToolsFromInput(input, apifyClient); } export function parseInputParamsFromUrl(url: string): Input { diff --git a/src/stdio.ts b/src/stdio.ts index a288857b..a0b96b24 100644 --- a/src/stdio.ts +++ b/src/stdio.ts @@ -22,6 +22,7 @@ import { hideBin } from 'yargs/helpers'; import log from '@apify/log'; +import { ApifyClient } from './apify-client.js'; import { processInput } from './input.js'; import { ActorsMcpServer } from './mcp/server.js'; import type { Input, ToolSelector } from './types.js'; @@ -118,8 +119,9 @@ async function main() { // Normalize (merges actors into tools for backward compatibility) const normalized = processInput(input); + const apifyClient = new ApifyClient({ token: process.env.APIFY_TOKEN }); // Use the shared tools loading logic - const tools = await loadToolsFromInput(normalized, process.env.APIFY_TOKEN as string); + const tools = await loadToolsFromInput(normalized, apifyClient); mcpServer.upsertTools(tools); diff --git a/src/tools/actor.ts b/src/tools/actor.ts index 1d1ca683..139ac558 100644 --- a/src/tools/actor.ts +++ b/src/tools/actor.ts @@ -10,13 +10,14 @@ import { ACTOR_ADDITIONAL_INSTRUCTIONS, ACTOR_MAX_MEMORY_MBYTES, HelperTools, + SKYFIRE_TOOL_INSTRUCTIONS, TOOL_MAX_OUTPUT_CHARS, } from '../const.js'; import { getActorMCPServerPath, getActorMCPServerURL } from '../mcp/actors.js'; import { connectMCPClient } from '../mcp/client.js'; import { getMCPServerTools } from '../mcp/proxy.js'; import { actorDefinitionPrunedCache } from '../state.js'; -import type { ActorDefinitionStorage, ActorInfo, DatasetItem, ToolEntry } from '../types.js'; +import type { ActorDefinitionStorage, ActorInfo, ApifyToken, DatasetItem, ToolEntry } from '../types.js'; import { ensureOutputWithinCharLimit, getActorDefinitionStorageFieldNames } from '../utils/actor.js'; import { fetchActorDetails } from '../utils/actor-details.js'; import { buildActorResponseContent } from '../utils/actor-response.js'; @@ -46,9 +47,9 @@ export type CallActorGetDatasetResult = { * If the `APIFY_IS_AT_HOME` the dataset items are pushed to the Apify dataset. * * @param {string} actorName - The name of the Actor to call. - * @param {ActorCallOptions} callOptions - The options to pass to the Actor. * @param {unknown} input - The input to pass to the actor. - * @param {string} apifyToken - The Apify token to use for authentication. + * @param {ApifyClient} apifyClient - The Apify client to use for authentication. + * @param {ActorCallOptions} callOptions - The options to pass to the Actor. * @param {ProgressTracker} progressTracker - Optional progress tracker for real-time updates. * @param {AbortSignal} abortSignal - Optional abort signal to cancel the actor run. * @returns {Promise} - A promise that resolves to an object containing the actor run and dataset items. @@ -57,84 +58,77 @@ export type CallActorGetDatasetResult = { export async function callActorGetDataset( actorName: string, input: unknown, - apifyToken: string, + apifyClient: ApifyClient, callOptions: ActorCallOptions | undefined = undefined, progressTracker?: ProgressTracker | null, abortSignal?: AbortSignal, ): Promise { const CLIENT_ABORT = Symbol('CLIENT_ABORT'); // Just internal symbol to identify client abort - // TODO: we should remove this throw, we are just catching and then rethrowing with generic message - try { - const client = new ApifyClient({ token: apifyToken }); - const actorClient = client.actor(actorName); + const actorClient = apifyClient.actor(actorName); - // Start the actor run - const actorRun: ActorRun = await actorClient.start(input, callOptions); - - // Start progress tracking if tracker is provided - if (progressTracker) { - progressTracker.startActorRunUpdates(actorRun.id, apifyToken, actorName); - } + // Start the actor run + const actorRun: ActorRun = await actorClient.start(input, callOptions); - // Create abort promise that handles both API abort and race rejection - const abortPromise = async () => new Promise((resolve) => { - abortSignal?.addEventListener('abort', async () => { - // Abort the actor run via API - try { - await client.run(actorRun.id).abort({ gracefully: false }); - } catch (e) { - log.error('Error aborting Actor run', { error: e, runId: actorRun.id }); - } - // Reject to stop waiting - resolve(CLIENT_ABORT); - }, { once: true }); - }); + // Start progress tracking if tracker is provided + if (progressTracker) { + progressTracker.startActorRunUpdates(actorRun.id, apifyClient, actorName); + } - // Wait for completion or cancellation - const potentialAbortedRun = await Promise.race([ - client.run(actorRun.id).waitForFinish(), - ...(abortSignal ? [abortPromise()] : []), - ]); + // Create abort promise that handles both API abort and race rejection + const abortPromise = async () => new Promise((resolve) => { + abortSignal?.addEventListener('abort', async () => { + // Abort the actor run via API + try { + await apifyClient.run(actorRun.id).abort({ gracefully: false }); + } catch (e) { + log.error('Error aborting Actor run', { error: e, runId: actorRun.id }); + } + // Reject to stop waiting + resolve(CLIENT_ABORT); + }, { once: true }); + }); + + // Wait for completion or cancellation + const potentialAbortedRun = await Promise.race([ + apifyClient.run(actorRun.id).waitForFinish(), + ...(abortSignal ? [abortPromise()] : []), + ]); - if (potentialAbortedRun === CLIENT_ABORT) { - log.info('Actor run aborted by client', { actorName, input }); - return null; - } - const completedRun = potentialAbortedRun as ActorRun; - - // Process the completed run - const dataset = client.dataset(completedRun.defaultDatasetId); - const [datasetItems, defaultBuild] = await Promise.all([ - dataset.listItems(), - (await actorClient.defaultBuild()).get(), - ]); - - // Generate schema using the shared utility - const generatedSchema = generateSchemaFromItems(datasetItems.items, { - clean: true, - arrayMode: 'all', - }); - const schema = generatedSchema || { type: 'object', properties: {} }; - - /** - * Get important fields that are using in any dataset view as they MAY be used in filtering to ensure the output fits - * the tool output limits. Client has to use the get-actor-output tool to retrieve the full dataset or filtered out fields. - */ - const storageDefinition = defaultBuild?.actorDefinition?.storages?.dataset as ActorDefinitionStorage | undefined; - const importantProperties = getActorDefinitionStorageFieldNames(storageDefinition || {}); - const previewItems = ensureOutputWithinCharLimit(datasetItems.items, importantProperties, TOOL_MAX_OUTPUT_CHARS); - - return { - runId: actorRun.id, - datasetId: completedRun.defaultDatasetId, - itemCount: datasetItems.count, - schema, - previewItems, - }; - } catch (error) { - log.error('Error calling Actor', { error, actorName, input }); - throw new Error(`Error calling Actor: ${error}`); + if (potentialAbortedRun === CLIENT_ABORT) { + log.info('Actor run aborted by client', { actorName, input }); + return null; } + const completedRun = potentialAbortedRun as ActorRun; + + // Process the completed run + const dataset = apifyClient.dataset(completedRun.defaultDatasetId); + const [datasetItems, defaultBuild] = await Promise.all([ + dataset.listItems(), + (await actorClient.defaultBuild()).get(), + ]); + + // Generate schema using the shared utility + const generatedSchema = generateSchemaFromItems(datasetItems.items, { + clean: true, + arrayMode: 'all', + }); + const schema = generatedSchema || { type: 'object', properties: {} }; + + /** + * Get important fields that are using in any dataset view as they MAY be used in filtering to ensure the output fits + * the tool output limits. Client has to use the get-actor-output tool to retrieve the full dataset or filtered out fields. + */ + const storageDefinition = defaultBuild?.actorDefinition?.storages?.dataset as ActorDefinitionStorage | undefined; + const importantProperties = getActorDefinitionStorageFieldNames(storageDefinition || {}); + const previewItems = ensureOutputWithinCharLimit(datasetItems.items, importantProperties, TOOL_MAX_OUTPUT_CHARS); + + return { + runId: actorRun.id, + datasetId: completedRun.defaultDatasetId, + itemCount: datasetItems.count, + schema, + previewItems, + }; } /** @@ -191,7 +185,8 @@ Instructions: ${ACTOR_ADDITIONAL_INSTRUCTIONS}`, properties: {}, required: [], }, - ajvValidate: fixedAjvCompile(ajv, actorDefinitionPruned.input || {}), + // Additional props true to allow skyfire-pay-id + ajvValidate: fixedAjvCompile(ajv, { ...actorDefinitionPruned.input, additionalProperties: true }), memoryMbytes: memoryMbytes > ACTOR_MAX_MEMORY_MBYTES ? ACTOR_MAX_MEMORY_MBYTES : memoryMbytes, }, }; @@ -206,8 +201,16 @@ Instructions: ${ACTOR_ADDITIONAL_INSTRUCTIONS}`, async function getMCPServersAsTools( actorsInfo: ActorInfo[], - apifyToken: string, + apifyToken: ApifyToken, ): Promise { + /** + * This is case for the Skyfire request without any Apify token, we do not support + * standby Actors in this case so we can skip MCP servers since they would fail anyway (they are standby Actors). + */ + if (apifyToken === null || apifyToken === undefined) { + return []; + } + const actorsMCPServerTools: ToolEntry[] = []; for (const actorInfo of actorsInfo) { const actorId = actorInfo.actorDefinitionPruned.id; @@ -243,7 +246,7 @@ async function getMCPServersAsTools( export async function getActorsAsTools( actorIdsOrNames: string[], - apifyToken: string, + apifyClient: ApifyClient, ): Promise { log.debug('Fetching Actors as tools', { actorNames: actorIdsOrNames }); @@ -258,7 +261,7 @@ export async function getActorsAsTools( } as ActorInfo; } - const actorDefinitionPruned = await getActorDefinition(actorIdOrName, apifyToken); + const actorDefinitionPruned = await getActorDefinition(actorIdOrName, apifyClient); if (!actorDefinitionPruned) { log.error('Actor not found or definition is not available', { actorName: actorIdOrName }); return null; @@ -280,7 +283,7 @@ export async function getActorsAsTools( const [normalTools, mcpServerTools] = await Promise.all([ getNormalActorsAsTools(normalActorsInfo), - getMCPServersAsTools(actorMCPServersInfo, apifyToken), + getMCPServersAsTools(actorMCPServersInfo, apifyClient.token), ]); return [...normalTools, ...mcpServerTools]; @@ -335,26 +338,62 @@ Step 2: Call Actor (step="call") The step parameter enforces this workflow - you cannot call an Actor without first getting its info.`, inputSchema: zodToJsonSchema(callActorArgs), - ajvValidate: ajv.compile(zodToJsonSchema(callActorArgs)), + ajvValidate: ajv.compile({ + ...zodToJsonSchema(callActorArgs), + // Additional props true to allow skyfire-pay-id + additionalProperties: true, + }), call: async (toolArgs) => { - const { args, apifyToken, progressTracker, extra } = toolArgs; + const { args, apifyToken, progressTracker, extra, apifyMcpServer } = toolArgs; const { actor: actorName, step, input, callOptions } = callActorArgs.parse(args); try { if (step === 'info') { + const apifyClient = new ApifyClient({ token: apifyToken }); // Step 1: Return Actor card and schema directly - const details = await fetchActorDetails(apifyToken, actorName); + const details = await fetchActorDetails(apifyClient, actorName); if (!details) { return { content: [{ type: 'text', text: `Actor information for '${actorName}' was not found. Please check the Actor ID or name and ensure the Actor exists.` }], }; } + const content = [ + { type: 'text', text: `**Input Schema:**\n${JSON.stringify(details.inputSchema, null, 0)}` }, + ]; + /** + * Add Skyfire instructions also in the info step since clients are most likely truncating the long tool description of the call-actor. + */ + if (apifyMcpServer.options.skyfireMode) { + content.push({ + type: 'text', + text: SKYFIRE_TOOL_INSTRUCTIONS, + }); + } + return { content }; + } + + /** + * In Skyfire mode, we check for the presence of `skyfire-pay-id`. + * If it is missing, we return instructions to the LLM on how to create it and pass it to the tool. + */ + if (apifyMcpServer.options.skyfireMode + && args['skyfire-pay-id'] === undefined + ) { return { - content: [ - { type: 'text', text: `**Input Schema:**\n${JSON.stringify(details.inputSchema, null, 0)}` }, - ], + content: [{ + type: 'text', + text: SKYFIRE_TOOL_INSTRUCTIONS, + }], }; } + + /** + * Create Apify token, for Skyfire mode use `skyfire-pay-id` and for normal mode use `apifyToken`. + */ + const apifyClient = apifyMcpServer.options.skyfireMode && typeof args['skyfire-pay-id'] === 'string' + ? new ApifyClient({ skyfirePayId: args['skyfire-pay-id'] }) + : new ApifyClient({ token: apifyToken }); + // Step 2: Call the Actor if (!input) { return { @@ -364,7 +403,7 @@ The step parameter enforces this workflow - you cannot call an Actor without fir }; } - const [actor] = await getActorsAsTools([actorName], apifyToken); + const [actor] = await getActorsAsTools([actorName], apifyClient); if (!actor) { return { @@ -389,7 +428,7 @@ The step parameter enforces this workflow - you cannot call an Actor without fir const callResult = await callActorGetDataset( actorName, input, - apifyToken, + apifyClient, callOptions, progressTracker, extra.signal, diff --git a/src/tools/build.ts b/src/tools/build.ts index 64fffeb5..be6044db 100644 --- a/src/tools/build.ts +++ b/src/tools/build.ts @@ -26,11 +26,10 @@ import { filterSchemaProperties, shortenProperties } from './utils.js'; */ export async function getActorDefinition( actorIdOrName: string, - apifyToken: string, + apifyClient: ApifyClient, limit: number = ACTOR_README_MAX_LENGTH, ): Promise { - const client = new ApifyClient({ token: apifyToken }); - const actorClient = client.actor(actorIdOrName); + const actorClient = apifyClient.actor(actorIdOrName); try { // Fetch actor details const actor = await actorClient.get(); @@ -123,7 +122,8 @@ export const actorDefinitionTool: ToolEntry = { const { args, apifyToken } = toolArgs; const parsed = getActorDefinitionArgsSchema.parse(args); - const v = await getActorDefinition(parsed.actorName, apifyToken, parsed.limit); + const apifyClient = new ApifyClient({ token: apifyToken }); + const v = await getActorDefinition(parsed.actorName, apifyClient, parsed.limit); if (!v) { return { content: [{ type: 'text', text: `Actor '${parsed.actorName}' not found.` }] }; } diff --git a/src/tools/fetch-actor-details.ts b/src/tools/fetch-actor-details.ts index ef3dbd74..32742be1 100644 --- a/src/tools/fetch-actor-details.ts +++ b/src/tools/fetch-actor-details.ts @@ -1,6 +1,7 @@ import { z } from 'zod'; import zodToJsonSchema from 'zod-to-json-schema'; +import { ApifyClient } from '../apify-client.js'; import { HelperTools } from '../const.js'; import type { InternalTool, ToolEntry } from '../types.js'; import { fetchActorDetails } from '../utils/actor-details.js'; @@ -30,7 +31,8 @@ export const fetchActorDetailsTool: ToolEntry = { call: async (toolArgs) => { const { args, apifyToken } = toolArgs; const parsed = fetchActorDetailsToolArgsSchema.parse(args); - const details = await fetchActorDetails(apifyToken, parsed.actor); + const apifyClient = new ApifyClient({ token: apifyToken }); + const details = await fetchActorDetails(apifyClient, parsed.actor); if (!details) { return { content: [{ type: 'text', text: `Actor information for '${parsed.actor}' was not found. Please check the Actor ID or name and ensure the Actor exists.` }], diff --git a/src/tools/get-actor-output.ts b/src/tools/get-actor-output.ts index 488d30e1..b6b6b1fe 100644 --- a/src/tools/get-actor-output.ts +++ b/src/tools/get-actor-output.ts @@ -2,7 +2,7 @@ import { z } from 'zod'; import zodToJsonSchema from 'zod-to-json-schema'; import { ApifyClient } from '../apify-client.js'; -import { HelperTools, TOOL_MAX_OUTPUT_CHARS } from '../const.js'; +import { HelperTools, SKYFIRE_TOOL_INSTRUCTIONS, TOOL_MAX_OUTPUT_CHARS } from '../const.js'; import type { InternalTool, ToolEntry } from '../types.js'; import { ajv } from '../utils/ajv.js'; import { getValuesByDotKeys, parseCommaSeparatedList } from '../utils/generic.js'; @@ -73,11 +73,35 @@ You can also retrieve only specific fields from the output if needed. Use this t Note: This tool is automatically included if the Apify MCP Server is configured with any Actor tools (e.g. \`apify-slash-rag-web-browser\`) or tools that can interact with Actors (e.g. \`call-actor\`, \`add-actor\`).`, inputSchema: zodToJsonSchema(getActorOutputArgs), - ajvValidate: ajv.compile(zodToJsonSchema(getActorOutputArgs)), + /** + * Allow additional properties for Skyfire mode to pass `skyfire-pay-id`. + */ + ajvValidate: ajv.compile({ ...zodToJsonSchema(getActorOutputArgs), additionalProperties: true }), call: async (toolArgs) => { - const { args, apifyToken } = toolArgs; + const { args, apifyToken, apifyMcpServer } = toolArgs; + + /** + * In Skyfire mode, we check for the presence of `skyfire-pay-id`. + * If it is missing, we return instructions to the LLM on how to create it and pass it to the tool. + */ + if (apifyMcpServer.options.skyfireMode + && args['skyfire-pay-id'] === undefined + ) { + return { + content: [{ + type: 'text', + text: SKYFIRE_TOOL_INSTRUCTIONS, + }], + }; + } + + /** + * Create Apify token, for Skyfire mode use `skyfire-pay-id` and for normal mode use `apifyToken`. + */ + const apifyClient = apifyMcpServer.options.skyfireMode && typeof args['skyfire-pay-id'] === 'string' + ? new ApifyClient({ skyfirePayId: args['skyfire-pay-id'] }) + : new ApifyClient({ token: apifyToken }); const parsed = getActorOutputArgs.parse(args); - const client = new ApifyClient({ token: apifyToken }); // Parse fields into array const fieldsArray = parseCommaSeparatedList(parsed.fields); @@ -88,7 +112,7 @@ Note: This tool is automatically included if the Apify MCP Server is configured * If a dot is present, filtering is done here and not at the API level. */ const hasDot = fieldsArray.some((field) => field.includes('.')); - const response = await client.dataset(parsed.datasetId).listItems({ + const response = await apifyClient.dataset(parsed.datasetId).listItems({ offset: parsed.offset, limit: parsed.limit, fields: fieldsArray.length > 0 && !hasDot ? fieldsArray : undefined, diff --git a/src/tools/helpers.ts b/src/tools/helpers.ts index 47268897..d030d793 100644 --- a/src/tools/helpers.ts +++ b/src/tools/helpers.ts @@ -1,6 +1,7 @@ import { z } from 'zod'; import zodToJsonSchema from 'zod-to-json-schema'; +import { ApifyClient } from '../apify-client.js'; import { HelperTools } from '../const.js'; import type { InternalTool, ToolEntry } from '../types.js'; import { ajv } from '../utils/ajv.js'; @@ -35,7 +36,8 @@ export const addTool: ToolEntry = { }; } - const tools = await apifyMcpServer.loadActorsAsTools([parsed.actor], apifyToken); + const apifyClient = new ApifyClient({ token: apifyToken }); + const tools = await apifyMcpServer.loadActorsAsTools([parsed.actor], apifyClient); /** * If no tools were found, return a message that the Actor was not found * instead of returning that non existent tool was added since the diff --git a/src/types.ts b/src/types.ts index bff0e89d..a90de611 100644 --- a/src/types.ts +++ b/src/types.ts @@ -296,3 +296,9 @@ export type PromptBase = Prompt & { export type ActorInputSchemaProperties = Record; export type DatasetItem = Record; +/** + * Apify token type. + * + * Can be null or undefined in case of Skyfire requests. + */ +export type ApifyToken = string | null | undefined; diff --git a/src/utils/actor-details.ts b/src/utils/actor-details.ts index 494db6fb..3a8915bc 100644 --- a/src/utils/actor-details.ts +++ b/src/utils/actor-details.ts @@ -1,6 +1,6 @@ import type { Actor, Build } from 'apify-client'; -import { ApifyClient } from '../apify-client.js'; +import type { ApifyClient } from '../apify-client.js'; import { filterSchemaProperties, shortenProperties } from '../tools/utils.js'; import type { IActorInputSchema } from '../types.js'; import { formatActorToActorCard } from './actor-card.js'; @@ -14,11 +14,10 @@ export interface ActorDetailsResult { readme: string; } -export async function fetchActorDetails(apifyToken: string, actorName: string): Promise { - const client = new ApifyClient({ token: apifyToken }); +export async function fetchActorDetails(apifyClient: ApifyClient, actorName: string): Promise { const [actorInfo, buildInfo]: [Actor | undefined, Build | undefined] = await Promise.all([ - client.actor(actorName).get(), - client.actor(actorName).defaultBuild().then(async (build) => build.get()), + apifyClient.actor(actorName).get(), + apifyClient.actor(actorName).defaultBuild().then(async (build) => build.get()), ]); if (!actorInfo || !buildInfo || !buildInfo.actorDefinition) return null; const inputSchema = (buildInfo.actorDefinition.input || { diff --git a/src/utils/progress.ts b/src/utils/progress.ts index 385c90ff..893051c2 100644 --- a/src/utils/progress.ts +++ b/src/utils/progress.ts @@ -1,6 +1,6 @@ import type { ProgressNotification } from '@modelcontextprotocol/sdk/types.js'; -import { ApifyClient } from '../apify-client.js'; +import type { ApifyClient } from '../apify-client.js'; import { PROGRESS_NOTIFICATION_INTERVAL_MS } from '../const.js'; export class ProgressTracker { @@ -36,15 +36,14 @@ export class ProgressTracker { } } - startActorRunUpdates(runId: string, apifyToken: string, actorName: string): void { + startActorRunUpdates(runId: string, apifyClient: ApifyClient, actorName: string): void { this.stop(); - const client = new ApifyClient({ token: apifyToken }); let lastStatus = ''; let lastStatusMessage = ''; this.intervalId = setInterval(async () => { try { - const run = await client.run(runId).get(); + const run = await apifyClient.run(runId).get(); if (!run) return; const { status, statusMessage } = run; diff --git a/src/utils/tools-loader.ts b/src/utils/tools-loader.ts index f16953f4..f1ee806d 100644 --- a/src/utils/tools-loader.ts +++ b/src/utils/tools-loader.ts @@ -3,6 +3,8 @@ * This eliminates duplication between stdio.ts and processParamsGetTools. */ +import type { ApifyClient } from 'apify'; + import log from '@apify/log'; import { defaults } from '../const.js'; @@ -35,7 +37,7 @@ function getInternalToolByNameMap(): Map { */ export async function loadToolsFromInput( input: Input, - apifyToken: string, + apifyClient: ApifyClient, ): Promise { // Helpers for readability const normalizeSelectors = (value: Input['tools']): (string | ToolCategory)[] | undefined => { @@ -120,7 +122,7 @@ export async function loadToolsFromInput( // Actor tools (if any) if (actorNamesToLoad.length > 0) { - const actorTools = await getActorsAsTools(actorNamesToLoad, apifyToken); + const actorTools = await getActorsAsTools(actorNamesToLoad, apifyClient); result.push(...actorTools); } diff --git a/tests/integration/actor.server-sse.test.ts b/tests/integration/actor.server-sse.test.ts index 6142cfc1..a75408d7 100644 --- a/tests/integration/actor.server-sse.test.ts +++ b/tests/integration/actor.server-sse.test.ts @@ -7,12 +7,13 @@ import log from '@apify/log'; import { createExpressApp } from '../../src/actor/server.js'; import { createMcpSseClient } from '../helpers.js'; import { createIntegrationTestsSuite } from './suite.js'; +import { getAvailablePort } from './utils/port.js'; let app: Express; let httpServer: HttpServer; -const httpServerPort = 50000; -const httpServerHost = `http://localhost:${httpServerPort}`; -const mcpUrl = `${httpServerHost}/sse`; +let httpServerPort: number; +let httpServerHost: string; +let mcpUrl: string; createIntegrationTestsSuite({ suiteName: 'Apify MCP Server SSE', @@ -21,6 +22,11 @@ createIntegrationTestsSuite({ beforeAllFn: async () => { log.setLevel(log.LEVELS.OFF); + // Get an available port + httpServerPort = await getAvailablePort(); + httpServerHost = `http://localhost:${httpServerPort}`; + mcpUrl = `${httpServerHost}/sse`; + // Create an express app app = createExpressApp(httpServerHost); diff --git a/tests/integration/actor.server-streamable.test.ts b/tests/integration/actor.server-streamable.test.ts index 56aa5226..c21923b3 100644 --- a/tests/integration/actor.server-streamable.test.ts +++ b/tests/integration/actor.server-streamable.test.ts @@ -7,12 +7,13 @@ import log from '@apify/log'; import { createExpressApp } from '../../src/actor/server.js'; import { createMcpStreamableClient } from '../helpers.js'; import { createIntegrationTestsSuite } from './suite.js'; +import { getAvailablePort } from './utils/port.js'; let app: Express; let httpServer: HttpServer; -const httpServerPort = 50001; -const httpServerHost = `http://localhost:${httpServerPort}`; -const mcpUrl = `${httpServerHost}/mcp`; +let httpServerPort: number; +let httpServerHost: string; +let mcpUrl: string; createIntegrationTestsSuite({ suiteName: 'Apify MCP Server Streamable HTTP', @@ -21,6 +22,11 @@ createIntegrationTestsSuite({ beforeAllFn: async () => { log.setLevel(log.LEVELS.OFF); + // Get an available port + httpServerPort = await getAvailablePort(); + httpServerHost = `http://localhost:${httpServerPort}`; + mcpUrl = `${httpServerHost}/mcp`; + // Create an express app app = createExpressApp(httpServerHost); diff --git a/tests/integration/internals.test.ts b/tests/integration/internals.test.ts index 800d4ab5..9f348bb0 100644 --- a/tests/integration/internals.test.ts +++ b/tests/integration/internals.test.ts @@ -3,6 +3,7 @@ import { beforeAll, describe, expect, it } from 'vitest'; import log from '@apify/log'; import { actorNameToToolName } from '../../dist/tools/utils.js'; +import { ApifyClient } from '../../src/apify-client.js'; import { ActorsMcpServer } from '../../src/index.js'; import { addTool } from '../../src/tools/helpers.js'; import { getActorsAsTools } from '../../src/tools/index.js'; @@ -17,14 +18,15 @@ beforeAll(() => { describe('MCP server internals integration tests', () => { it('should load and restore tools from a tool list', async () => { - const actorsMcpServer = new ActorsMcpServer(false); + const actorsMcpServer = new ActorsMcpServer({ setupSigintHandler: false }); + const apifyClient = new ApifyClient({ token: process.env.APIFY_TOKEN }); const initialTools = await loadToolsFromInput({ enableAddingActors: true, - } as Input, process.env.APIFY_TOKEN as string); + } as Input, apifyClient); actorsMcpServer.upsertTools(initialTools); // Load new tool - const newTool = await getActorsAsTools([ACTOR_PYTHON_EXAMPLE], process.env.APIFY_TOKEN as string); + const newTool = await getActorsAsTools([ACTOR_PYTHON_EXAMPLE], apifyClient); actorsMcpServer.upsertTools(newTool); // Store the tool name list @@ -42,7 +44,7 @@ describe('MCP server internals integration tests', () => { expect(actorsMcpServer.listAllToolNames()).toEqual([]); // Load the tool state from the tool name list - await actorsMcpServer.loadToolsByName(names, process.env.APIFY_TOKEN as string); + await actorsMcpServer.loadToolsByName(names, apifyClient); // Check if the tool name list is restored expectArrayWeakEquals(actorsMcpServer.listAllToolNames(), expectedToolNames); @@ -59,14 +61,15 @@ describe('MCP server internals integration tests', () => { toolNotificationCount++; }; - const actorsMCPServer = new ActorsMcpServer(false); - const seeded = await loadToolsFromInput({ enableAddingActors: true } as Input, process.env.APIFY_TOKEN as string); + const actorsMCPServer = new ActorsMcpServer({ setupSigintHandler: false }); + const apifyClient = new ApifyClient({ token: process.env.APIFY_TOKEN }); + const seeded = await loadToolsFromInput({ enableAddingActors: true } as Input, apifyClient); actorsMCPServer.upsertTools(seeded); actorsMCPServer.registerToolsChangedHandler(onToolsChanged); // Add a new Actor const actor = ACTOR_PYTHON_EXAMPLE; - const newTool = await getActorsAsTools([actor], process.env.APIFY_TOKEN as string); + const newTool = await getActorsAsTools([actor], apifyClient); actorsMCPServer.upsertTools(newTool, true); // Check if the notification was received with the correct tools @@ -96,14 +99,15 @@ describe('MCP server internals integration tests', () => { notificationCount++; }; - const actorsMCPServer = new ActorsMcpServer(false); - const seeded = await loadToolsFromInput({ enableAddingActors: true } as Input, process.env.APIFY_TOKEN as string); + const actorsMCPServer = new ActorsMcpServer({ setupSigintHandler: false }); + const apifyClient = new ApifyClient({ token: process.env.APIFY_TOKEN }); + const seeded = await loadToolsFromInput({ enableAddingActors: true } as Input, apifyClient); actorsMCPServer.upsertTools(seeded); actorsMCPServer.registerToolsChangedHandler(onToolsChanged); // Add a new Actor const actor = ACTOR_PYTHON_EXAMPLE; - const newTool = await getActorsAsTools([actor], process.env.APIFY_TOKEN as string); + const newTool = await getActorsAsTools([actor], apifyClient); actorsMCPServer.upsertTools(newTool, true); // Check if the notification was received diff --git a/tests/integration/utils/port.ts b/tests/integration/utils/port.ts new file mode 100644 index 00000000..30d6b329 --- /dev/null +++ b/tests/integration/utils/port.ts @@ -0,0 +1,17 @@ +import { createServer } from 'node:net'; + +/** + * Finds an available port by letting the OS assign one dynamically. + * This is to prevent the address already in use errors to prevent flaky tests. + * @returns Promise - An available port assigned by the OS + */ +export async function getAvailablePort(): Promise { + return new Promise((resolve, reject) => { + const server = createServer(); + server.listen(0, () => { + const { port } = server.address() as { port: number }; + server.close(() => resolve(port)); + }); + server.on('error', reject); + }); +}