Skip to content

Commit c2527af

Browse files
authored
feat: add support for Actorized MCP servers streamable transport. Refactor Actors as a tool adding logic. Update Apify client and SDK and MCP SDK. Refactor standby Actor MCP web server to support multiple concurrent clients. (#151)
* refactor Actor tool fetching logic with caching, server streamable add DELETE endpoint, rewrite server logic to use multiple internal MCP servers - this had to be done as the legacy SSE transport test were hanging for some reason, bump MCP sdk and apify and apify-client versions, split tests touching MCP server internals into separate file, remove tool loading from the root get endpoint (!!!), prepare support for streamable Actorized MCP servers * add actorized mcp servers integration test, internal mcp server mcp client add support for streamable - use it first then fallback to legacy sse * update readme about default Actors loading using query param, add TODO to remove Actor loading from Actor input directly since it probably does not work * lint * try to route actorized server notifications
1 parent 7a790a2 commit c2527af

23 files changed

+715
-360
lines changed

.actor/ACTOR.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,17 @@ Interested in building and monetizing your own AI agent on Apify? Check out our
6767
## Tools
6868

6969
### Actors
70-
7170
Any [Apify Actor](https://apify.com/store) can be used as a tool.
72-
By default, the server is pre-configured with the Actors specified below, but this can be overridden by providing Actor input.
71+
By default, the server is pre-configured with the Actors specified below, but this can be overridden by providing the `?actors` URL query parameter.
7372

7473
```text
7574
'apify/rag-web-browser'
7675
```
76+
For example, to additionally load the `apify/instagram-scraper` Actor, you can start the server with the following URL:
77+
```text
78+
https://actors-mcp-server.apify.actor?token=<APIFY_TOKEN>&actors=apify/rag-web-browser,apify/instagram-scraper
79+
```
80+
7781
The MCP server loads the Actor input schema and creates MCP tools corresponding to the Actors.
7882
See this example of input schema for the [RAG Web Browser](https://apify.com/apify/rag-web-browser/input-schema).
7983

package-lock.json

Lines changed: 36 additions & 32 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@
3232
"dependencies": {
3333
"@apify/datastructures": "^2.0.3",
3434
"@apify/log": "^2.5.16",
35-
"@modelcontextprotocol/sdk": "^1.11.5",
35+
"@modelcontextprotocol/sdk": "^1.13.2",
3636
"ajv": "^8.17.1",
37-
"apify": "^3.4.0",
38-
"apify-client": "^2.12.3",
37+
"apify": "^3.4.2",
38+
"apify-client": "^2.12.6",
3939
"express": "^4.21.2",
4040
"yargs": "^17.7.2",
4141
"zod": "^3.24.1",

src/actor/server.ts

Lines changed: 65 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ import express from 'express';
1111

1212
import log from '@apify/log';
1313

14-
import { type ActorsMcpServer } from '../mcp/server.js';
15-
import { parseInputParamsFromUrl, processParamsGetTools } from '../mcp/utils.js';
14+
import { ActorsMcpServer } from '../mcp/server.js';
15+
import { parseInputParamsFromUrl } from '../mcp/utils.js';
16+
import { getActorsAsTools } from '../tools/actor.js';
1617
import { getHelpMessage, HEADER_READINESS_PROBE, Routes } from './const.js';
1718
import { getActorRunData } from './utils.js';
1819

@@ -34,10 +35,15 @@ async function loadToolsAndActors(mcpServer: ActorsMcpServer, url: string, apify
3435

3536
export function createExpressApp(
3637
host: string,
37-
mcpServer: ActorsMcpServer,
38+
mcpServerOptions: {
39+
enableAddingActors?: boolean;
40+
enableDefaultActors?: boolean;
41+
actors?: string[];
42+
},
3843
): express.Express {
3944
const app = express();
40-
let transportSSE: SSEServerTransport;
45+
const mcpServers: { [sessionId: string]: ActorsMcpServer } = {};
46+
const transportsSSE: { [sessionId: string]: SSEServerTransport } = {};
4147
const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {};
4248

4349
function respondWithError(res: Response, error: unknown, logMessage: string, statusCode = 500) {
@@ -62,11 +68,6 @@ export function createExpressApp(
6268
}
6369
try {
6470
log.info(`Received GET message at: ${Routes.ROOT}`);
65-
// TODO: I think we should remove this logic, root should return only help message
66-
const tools = await processParamsGetTools(req.url, process.env.APIFY_TOKEN as string);
67-
if (tools) {
68-
mcpServer.upsertTools(tools);
69-
}
7071
res.setHeader('Content-Type', 'text/event-stream');
7172
res.setHeader('Cache-Control', 'no-cache');
7273
res.setHeader('Connection', 'keep-alive');
@@ -83,9 +84,25 @@ export function createExpressApp(
8384
app.get(Routes.SSE, async (req: Request, res: Response) => {
8485
try {
8586
log.info(`Received GET message at: ${Routes.SSE}`);
87+
const mcpServer = new ActorsMcpServer(mcpServerOptions, false);
88+
// Load tools from Actor input for backwards compatibility
89+
if (mcpServerOptions.actors && mcpServerOptions.actors.length > 0) {
90+
const tools = await getActorsAsTools(mcpServerOptions.actors, process.env.APIFY_TOKEN as string);
91+
mcpServer.upsertTools(tools);
92+
}
8693
await loadToolsAndActors(mcpServer, req.url, process.env.APIFY_TOKEN as string);
87-
transportSSE = new SSEServerTransport(Routes.MESSAGE, res);
88-
await mcpServer.connect(transportSSE);
94+
const transport = new SSEServerTransport(Routes.MESSAGE, res);
95+
transportsSSE[transport.sessionId] = transport;
96+
mcpServers[transport.sessionId] = mcpServer;
97+
await mcpServer.connect(transport);
98+
99+
res.on('close', () => {
100+
log.info('Connection closed, cleaning up', {
101+
sessionId: transport.sessionId,
102+
});
103+
delete transportsSSE[transport.sessionId];
104+
delete mcpServers[transport.sessionId];
105+
});
89106
} catch (error) {
90107
respondWithError(res, error, `Error in GET ${Routes.SSE}`);
91108
}
@@ -94,8 +111,22 @@ export function createExpressApp(
94111
app.post(Routes.MESSAGE, async (req: Request, res: Response) => {
95112
try {
96113
log.info(`Received POST message at: ${Routes.MESSAGE}`);
97-
if (transportSSE) {
98-
await transportSSE.handlePostMessage(req, res);
114+
const sessionId = new URL(req.url, `http://${req.headers.host}`).searchParams.get('sessionId');
115+
if (!sessionId) {
116+
log.error('No session ID provided in POST request');
117+
res.status(400).json({
118+
jsonrpc: '2.0',
119+
error: {
120+
code: -32000,
121+
message: 'Bad Request: No session ID provided',
122+
},
123+
id: null,
124+
});
125+
return;
126+
}
127+
const transport = transportsSSE[sessionId];
128+
if (transport) {
129+
await transport.handlePostMessage(req, res);
99130
} else {
100131
log.error('Server is not connected to the client.');
101132
res.status(400).json({
@@ -132,6 +163,12 @@ export function createExpressApp(
132163
sessionIdGenerator: () => randomUUID(),
133164
enableJsonResponse: false, // Use SSE response mode
134165
});
166+
const mcpServer = new ActorsMcpServer(mcpServerOptions, false);
167+
// Load tools from Actor input for backwards compatibility
168+
if (mcpServerOptions.actors && mcpServerOptions.actors.length > 0) {
169+
const tools = await getActorsAsTools(mcpServerOptions.actors, process.env.APIFY_TOKEN as string);
170+
mcpServer.upsertTools(tools);
171+
}
135172
// Load MCP server tools
136173
await loadToolsAndActors(mcpServer, req.url, process.env.APIFY_TOKEN as string);
137174
// Connect the transport to the MCP server BEFORE handling the request
@@ -143,6 +180,7 @@ export function createExpressApp(
143180
// Store the transport by session ID for future requests
144181
if (transport.sessionId) {
145182
transports[transport.sessionId] = transport;
183+
mcpServers[transport.sessionId] = mcpServer;
146184
}
147185
return; // Already handled
148186
} else {
@@ -172,6 +210,20 @@ export function createExpressApp(
172210
res.status(405).set('Allow', 'POST').send('Method Not Allowed');
173211
});
174212

213+
app.delete(Routes.MCP, async (req: Request, res: Response) => {
214+
const sessionId = req.headers['mcp-session-id'] as string | undefined;
215+
216+
const transport = transports[sessionId || ''] as StreamableHTTPServerTransport | undefined;
217+
if (transport) {
218+
log.info(`Deleting MCP session with ID: ${sessionId}`);
219+
await transport.handleRequest(req, res, req.body);
220+
return;
221+
}
222+
223+
log.error('Session not found', { sessionId });
224+
res.status(400).send('Bad Request: Session not found').end();
225+
});
226+
175227
// Catch-all for undefined routes
176228
app.use((req: Request, res: Response) => {
177229
res.status(404).json({ message: `There is nothing at route ${req.method} ${req.originalUrl}. ${getHelpMessage(host)}` }).end();

src/const.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ export const ACTOR_ADDITIONAL_INSTRUCTIONS = `Never call/execute tool/Actor unle
5454
You can always use ${HelperTools.DATASET_GET_ITEMS} tool to get more items from the dataset.
5555
Actor run input is always stored in the key-value store, recordKey: INPUT.`;
5656

57-
export const TOOL_CACHE_MAX_SIZE = 500;
58-
export const TOOL_CACHE_TTL_SECS = 30 * 60;
57+
export const ACTOR_CACHE_MAX_SIZE = 500;
58+
export const ACTOR_CACHE_TTL_SECS = 30 * 60; // 30 minutes
5959

6060
export const ACTOR_PRICING_MODEL = {
6161
/** Rental actors */
@@ -72,3 +72,5 @@ export const ACTOR_PRICING_MODEL = {
7272
* so we can safely filter out rental Actors from the search and ensure we return some results.
7373
*/
7474
export const ACTOR_SEARCH_ABOVE_LIMIT = 50;
75+
76+
export const MCP_STREAMABLE_ENDPOINT = '/mcp';

src/main.ts

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@ import log from '@apify/log';
1010

1111
import { createExpressApp } from './actor/server.js';
1212
import { processInput } from './input.js';
13-
import { ActorsMcpServer } from './mcp/server.js';
14-
import { callActorGetDataset, getActorsAsTools } from './tools/index.js';
13+
import { callActorGetDataset } from './tools/index.js';
1514
import type { Input } from './types.js';
1615

1716
const STANDBY_MODE = Actor.getEnv().metaOrigin === 'STANDBY';
@@ -30,22 +29,23 @@ const input = processInput((await Actor.getInput<Partial<Input>>()) ?? ({} as In
3029
log.info(`Loaded input: ${JSON.stringify(input)} `);
3130

3231
if (STANDBY_MODE) {
33-
const mcpServer = new ActorsMcpServer({
34-
enableAddingActors: Boolean(input.enableAddingActors),
35-
enableDefaultActors: false,
36-
});
37-
38-
const app = createExpressApp(HOST, mcpServer);
39-
log.info('Actor is running in the STANDBY mode.');
40-
32+
let actorsToLoad: string[] = [];
33+
// TODO: in standby mode the input loading does not actually work,
34+
// we should remove this since we are using the URL query parameters to load Actors
4135
// Load only Actors specified in the input
4236
// If you wish to start without any Actor, create a task and leave the input empty
4337
if (input.actors && input.actors.length > 0) {
4438
const { actors } = input;
45-
const actorsToLoad = Array.isArray(actors) ? actors : actors.split(',');
46-
const tools = await getActorsAsTools(actorsToLoad, process.env.APIFY_TOKEN as string);
47-
mcpServer.upsertTools(tools);
39+
actorsToLoad = Array.isArray(actors) ? actors : actors.split(',');
4840
}
41+
// Include Actors to load in the MCP server options for backwards compatibility
42+
const app = createExpressApp(HOST, {
43+
enableAddingActors: Boolean(input.enableAddingActors),
44+
enableDefaultActors: false,
45+
actors: actorsToLoad,
46+
});
47+
log.info('Actor is running in the STANDBY mode.');
48+
4949
app.listen(PORT, () => {
5050
log.info(`The Actor web server is listening for user requests at ${HOST}`);
5151
});
@@ -62,3 +62,9 @@ if (STANDBY_MODE) {
6262
log.info(`Pushed ${datasetInfo?.itemCount} items to the dataset`);
6363
await Actor.exit();
6464
}
65+
66+
// So Ctrl+C works locally
67+
process.on('SIGINT', async () => {
68+
log.info('Received SIGINT, shutting down gracefully...');
69+
await Actor.exit();
70+
});

0 commit comments

Comments
 (0)