Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .actor/ACTOR.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,17 @@ Interested in building and monetizing your own AI agent on Apify? Check out our
## Tools

### Actors

Any [Apify Actor](https://apify.com/store) can be used as a tool.
By default, the server is pre-configured with the Actors specified below, but this can be overridden by providing Actor input.
By default, the server is pre-configured with the Actors specified below, but this can be overridden by providing the `?actors` URL query parameter.

```text
'apify/rag-web-browser'
```
For example, to additionally load the `apify/instagram-scraper` Actor, you can start the server with the following URL:
```text
https://actors-mcp-server.apify.actor?token=<APIFY_TOKEN>&actors=apify/rag-web-browser,apify/instagram-scraper
```

The MCP server loads the Actor input schema and creates MCP tools corresponding to the Actors.
See this example of input schema for the [RAG Web Browser](https://apify.com/apify/rag-web-browser/input-schema).

Expand Down
68 changes: 36 additions & 32 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
"dependencies": {
"@apify/datastructures": "^2.0.3",
"@apify/log": "^2.5.16",
"@modelcontextprotocol/sdk": "^1.11.5",
"@modelcontextprotocol/sdk": "^1.13.2",
"ajv": "^8.17.1",
"apify": "^3.4.0",
"apify-client": "^2.12.3",
"apify": "^3.4.2",
"apify-client": "^2.12.6",
"express": "^4.21.2",
"yargs": "^17.7.2",
"zod": "^3.24.1",
Expand Down
78 changes: 65 additions & 13 deletions src/actor/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import express from 'express';

import log from '@apify/log';

import { type ActorsMcpServer } from '../mcp/server.js';
import { parseInputParamsFromUrl, processParamsGetTools } from '../mcp/utils.js';
import { ActorsMcpServer } from '../mcp/server.js';
import { parseInputParamsFromUrl } from '../mcp/utils.js';
import { getActorsAsTools } from '../tools/actor.js';
import { getHelpMessage, HEADER_READINESS_PROBE, Routes } from './const.js';
import { getActorRunData } from './utils.js';

Expand All @@ -34,10 +35,15 @@ async function loadToolsAndActors(mcpServer: ActorsMcpServer, url: string, apify

export function createExpressApp(
host: string,
mcpServer: ActorsMcpServer,
mcpServerOptions: {
enableAddingActors?: boolean;
enableDefaultActors?: boolean;
actors?: string[];
},
): express.Express {
const app = express();
let transportSSE: SSEServerTransport;
const mcpServers: { [sessionId: string]: ActorsMcpServer } = {};
const transportsSSE: { [sessionId: string]: SSEServerTransport } = {};
const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {};

function respondWithError(res: Response, error: unknown, logMessage: string, statusCode = 500) {
Expand All @@ -62,11 +68,6 @@ export function createExpressApp(
}
try {
log.info(`Received GET message at: ${Routes.ROOT}`);
// TODO: I think we should remove this logic, root should return only help message
const tools = await processParamsGetTools(req.url, process.env.APIFY_TOKEN as string);
if (tools) {
mcpServer.upsertTools(tools);
}
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
Expand All @@ -83,9 +84,25 @@ export function createExpressApp(
app.get(Routes.SSE, async (req: Request, res: Response) => {
try {
log.info(`Received GET message at: ${Routes.SSE}`);
const mcpServer = new ActorsMcpServer(mcpServerOptions, false);
// Load tools from Actor input for backwards compatibility
if (mcpServerOptions.actors && mcpServerOptions.actors.length > 0) {
const tools = await getActorsAsTools(mcpServerOptions.actors, process.env.APIFY_TOKEN as string);
mcpServer.upsertTools(tools);
}
await loadToolsAndActors(mcpServer, req.url, process.env.APIFY_TOKEN as string);
transportSSE = new SSEServerTransport(Routes.MESSAGE, res);
await mcpServer.connect(transportSSE);
const transport = new SSEServerTransport(Routes.MESSAGE, res);
transportsSSE[transport.sessionId] = transport;
mcpServers[transport.sessionId] = mcpServer;
await mcpServer.connect(transport);

res.on('close', () => {
log.info('Connection closed, cleaning up', {
sessionId: transport.sessionId,
});
delete transportsSSE[transport.sessionId];
delete mcpServers[transport.sessionId];
});
} catch (error) {
respondWithError(res, error, `Error in GET ${Routes.SSE}`);
}
Expand All @@ -94,8 +111,22 @@ export function createExpressApp(
app.post(Routes.MESSAGE, async (req: Request, res: Response) => {
try {
log.info(`Received POST message at: ${Routes.MESSAGE}`);
if (transportSSE) {
await transportSSE.handlePostMessage(req, res);
const sessionId = new URL(req.url, `http://${req.headers.host}`).searchParams.get('sessionId');
if (!sessionId) {
log.error('No session ID provided in POST request');
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: No session ID provided',
},
id: null,
});
return;
}
const transport = transportsSSE[sessionId];
if (transport) {
await transport.handlePostMessage(req, res);
} else {
log.error('Server is not connected to the client.');
res.status(400).json({
Expand Down Expand Up @@ -132,6 +163,12 @@ export function createExpressApp(
sessionIdGenerator: () => randomUUID(),
enableJsonResponse: false, // Use SSE response mode
});
const mcpServer = new ActorsMcpServer(mcpServerOptions, false);
// Load tools from Actor input for backwards compatibility
if (mcpServerOptions.actors && mcpServerOptions.actors.length > 0) {
const tools = await getActorsAsTools(mcpServerOptions.actors, process.env.APIFY_TOKEN as string);
mcpServer.upsertTools(tools);
}
// Load MCP server tools
await loadToolsAndActors(mcpServer, req.url, process.env.APIFY_TOKEN as string);
// Connect the transport to the MCP server BEFORE handling the request
Expand All @@ -143,6 +180,7 @@ export function createExpressApp(
// Store the transport by session ID for future requests
if (transport.sessionId) {
transports[transport.sessionId] = transport;
mcpServers[transport.sessionId] = mcpServer;
}
return; // Already handled
} else {
Expand Down Expand Up @@ -172,6 +210,20 @@ export function createExpressApp(
res.status(405).set('Allow', 'POST').send('Method Not Allowed');
});

app.delete(Routes.MCP, async (req: Request, res: Response) => {
const sessionId = req.headers['mcp-session-id'] as string | undefined;

const transport = transports[sessionId || ''] as StreamableHTTPServerTransport | undefined;
if (transport) {
log.info(`Deleting MCP session with ID: ${sessionId}`);
await transport.handleRequest(req, res, req.body);
return;
}

log.error('Session not found', { sessionId });
res.status(400).send('Bad Request: Session not found').end();
});

// Catch-all for undefined routes
app.use((req: Request, res: Response) => {
res.status(404).json({ message: `There is nothing at route ${req.method} ${req.originalUrl}. ${getHelpMessage(host)}` }).end();
Expand Down
6 changes: 4 additions & 2 deletions src/const.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ export const ACTOR_ADDITIONAL_INSTRUCTIONS = `Never call/execute tool/Actor unle
You can always use ${HelperTools.DATASET_GET_ITEMS} tool to get more items from the dataset.
Actor run input is always stored in the key-value store, recordKey: INPUT.`;

export const TOOL_CACHE_MAX_SIZE = 500;
export const TOOL_CACHE_TTL_SECS = 30 * 60;
export const ACTOR_CACHE_MAX_SIZE = 500;
export const ACTOR_CACHE_TTL_SECS = 30 * 60; // 30 minutes

export const ACTOR_PRICING_MODEL = {
/** Rental actors */
Expand All @@ -72,3 +72,5 @@ export const ACTOR_PRICING_MODEL = {
* so we can safely filter out rental Actors from the search and ensure we return some results.
*/
export const ACTOR_SEARCH_ABOVE_LIMIT = 50;

export const MCP_STREAMABLE_ENDPOINT = '/mcp';
32 changes: 19 additions & 13 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import log from '@apify/log';

import { createExpressApp } from './actor/server.js';
import { processInput } from './input.js';
import { ActorsMcpServer } from './mcp/server.js';
import { callActorGetDataset, getActorsAsTools } from './tools/index.js';
import { callActorGetDataset } from './tools/index.js';
import type { Input } from './types.js';

const STANDBY_MODE = Actor.getEnv().metaOrigin === 'STANDBY';
Expand All @@ -30,22 +29,23 @@ const input = processInput((await Actor.getInput<Partial<Input>>()) ?? ({} as In
log.info(`Loaded input: ${JSON.stringify(input)} `);

if (STANDBY_MODE) {
const mcpServer = new ActorsMcpServer({
enableAddingActors: Boolean(input.enableAddingActors),
enableDefaultActors: false,
});

const app = createExpressApp(HOST, mcpServer);
log.info('Actor is running in the STANDBY mode.');

let actorsToLoad: string[] = [];
// TODO: in standby mode the input loading does not actually work,
// we should remove this since we are using the URL query parameters to load Actors
// Load only Actors specified in the input
// If you wish to start without any Actor, create a task and leave the input empty
if (input.actors && input.actors.length > 0) {
const { actors } = input;
const actorsToLoad = Array.isArray(actors) ? actors : actors.split(',');
const tools = await getActorsAsTools(actorsToLoad, process.env.APIFY_TOKEN as string);
mcpServer.upsertTools(tools);
actorsToLoad = Array.isArray(actors) ? actors : actors.split(',');
}
// Include Actors to load in the MCP server options for backwards compatibility
const app = createExpressApp(HOST, {
enableAddingActors: Boolean(input.enableAddingActors),
enableDefaultActors: false,
actors: actorsToLoad,
});
log.info('Actor is running in the STANDBY mode.');

app.listen(PORT, () => {
log.info(`The Actor web server is listening for user requests at ${HOST}`);
});
Expand All @@ -62,3 +62,9 @@ if (STANDBY_MODE) {
log.info(`Pushed ${datasetInfo?.itemCount} items to the dataset`);
await Actor.exit();
}

// So Ctrl+C works locally
process.on('SIGINT', async () => {
log.info('Received SIGINT, shutting down gracefully...');
await Actor.exit();
});
Loading