Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@ jobs:
run: npm ci

- name: Lint
run: npm run lint
run: npm run lint:fix

- name: Build
run: npm run build

- name: Test
run: npm run test

- name: Type checks
run: npm run type-check
1 change: 1 addition & 0 deletions .nvmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
22.13.1
6 changes: 3 additions & 3 deletions eslint.config.mjs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import apify from '@apify/eslint-config';
import apifyTypeScriptConfig from '@apify/eslint-config/ts.js';

// eslint-disable-next-line import/no-default-export
export default [
{ ignores: ['**/dist', '**/.venv'] }, // Ignores need to happen first
...apify,
{ ignores: ['**/dist'] }, // Ignores need to happen first
...apifyTypeScriptConfig,
{
languageOptions: {
sourceType: 'module',
Expand Down
375 changes: 187 additions & 188 deletions package-lock.json

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
],
"dependencies": {
"@apify/log": "^2.5.16",
"@modelcontextprotocol/sdk": "^1.9.0",
"@modelcontextprotocol/sdk": "github:jirispilka/mcp-typescript-sdk#fix/add-src-dir",
"ajv": "^8.17.1",
"apify": "^3.4.0",
"apify-client": "^2.12.1",
Expand Down Expand Up @@ -59,13 +59,13 @@
"start": "npm run start:dev",
"start:prod": "node dist/main.js",
"start:dev": "tsx src/main.ts",
"lint": "./node_modules/.bin/eslint .",
"lint:fix": "./node_modules/.bin/eslint . --fix",
"build": "tsc",
"build:watch": "tsc -w",
"lint": "eslint .",
"lint:fix": "eslint . --fix",
"build": "tsc -b src",
"build:watch": "tsc -b src -w",
"type-check": "tsc --noEmit",
"inspector": "npx @modelcontextprotocol/inspector dist/stdio.js",
"test": "vitest run",
"type-check": "tsc --noEmit",
"clean": "tsc -b src --clean"
},
"author": "Apify",
Expand Down
8 changes: 8 additions & 0 deletions src/actor/const.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ export const HEADER_READINESS_PROBE = 'x-apify-container-server-readiness-probe'

export enum Routes {
ROOT = '/',
MCP = '/mcp',
SSE = '/sse',
MESSAGE = '/message',
}

export const getHelpMessage = (host: string) => `To interact with the server you can either:
- send request to ${host}${Routes.MCP}?token=YOUR-APIFY-TOKEN and receive a response
or
- connect for Server-Sent Events (SSE) via GET request to: ${host}${Routes.SSE}?token=YOUR-APIFY-TOKEN
- send messages via POST request to: ${host}${Routes.MESSAGE}?token=YOUR-APIFY-TOKEN
(Include your message content in the request body.)`;
221 changes: 161 additions & 60 deletions src/actor/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,91 +2,192 @@
* Express server implementation used for standby Actor mode.
*/

import { randomUUID } from 'node:crypto';

import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import type { Request, Response } from 'express';
import express from 'express';

import log from '@apify/log';

import { HEADER_READINESS_PROBE, Routes } from './const.js';
import { type ActorsMcpServer } from '../mcp-server.js';
import { getHelpMessage, HEADER_READINESS_PROBE, Routes } from './const.js';
import { getActorRunData, processParamsGetTools } from './utils.js';

export function createExpressApp(
host: string,
mcpServer: ActorsMcpServer,
): express.Express {
const HELP_MESSAGE = `Connect to the server with GET request to ${host}/sse?token=YOUR-APIFY-TOKEN`
+ ` and then send POST requests to ${host}/message?token=YOUR-APIFY-TOKEN`;

const app = express();
app.use(express.json());
let transportSSE: SSEServerTransport;
const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {};

let transport: SSEServerTransport;
app.get(Routes.ROOT, async (req: Request, res: Response) => {
if (req.headers && req.get(HEADER_READINESS_PROBE) !== undefined) {
log.debug('Received readiness probe');
res.status(200).json({ message: 'Server is ready' }).end();
return;
}
try {
log.info(`Received GET message at: ${Routes.ROOT}`);
const tools = await processParamsGetTools(req.url);
if (tools) {
mcpServer.updateTools(tools);
}
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.status(200).json({ message: `Actor is using Model Context Protocol. ${getHelpMessage(host)}`, data: getActorRunData() }).end();
} catch (error) {
log.error(`Error in GET ${Routes.ROOT} ${error}`);
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: 'Internal server error',
},
id: null,
});
}
});

app.route(Routes.ROOT)
.get(async (req: Request, res: Response) => {
if (req.headers && req.get(HEADER_READINESS_PROBE) !== undefined) {
log.debug('Received readiness probe');
res.status(200).json({ message: 'Server is ready' }).end();
return;
app.head(Routes.ROOT, (_req: Request, res: Response) => {
res.status(200).end();
});

app.get(Routes.SSE, async (req: Request, res: Response) => {
try {
log.info(`Received GET message at: ${Routes.SSE}`);
const tools = await processParamsGetTools(req.url);
if (tools) {
mcpServer.updateTools(tools);
}
try {
log.info(`Received GET message at: ${Routes.ROOT}`);
const tools = await processParamsGetTools(req.url);
if (tools) {
mcpServer.updateTools(tools);
}
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.status(200).json({ message: `Actor is using Model Context Protocol. ${HELP_MESSAGE}`, data: getActorRunData() }).end();
} catch (error) {
log.error(`Error in GET ${Routes.ROOT} ${error}`);
res.status(500).json({ message: 'Internal Server Error' }).end();
transportSSE = new SSEServerTransport(Routes.MESSAGE, res);
await mcpServer.connect(transportSSE);
} catch (error) {
log.error(`Error in GET ${Routes.SSE}: ${error}`);
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: 'Internal server error',
},
id: null,
});
}
});

app.post(Routes.MESSAGE, async (req: Request, res: Response) => {
try {
log.info(`Received POST message at: ${Routes.MESSAGE}`);
if (transportSSE) {
await transportSSE.handlePostMessage(req, res);
} else {
log.error('Server is not connected to the client.');
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: Server is not connected to the client. '
+ 'Connect to the server with GET request to /sse endpoint',
},
id: null,
});
}
})
.head((_req: Request, res: Response) => {
res.status(200).end();
});

app.route(Routes.SSE)
.get(async (req: Request, res: Response) => {
try {
log.info(`Received GET message at: ${Routes.SSE}`);
const tools = await processParamsGetTools(req.url);
if (tools) {
mcpServer.updateTools(tools);
}
transport = new SSEServerTransport(Routes.MESSAGE, res);
await mcpServer.connect(transport);
} catch (error) {
log.error(`Error in GET ${Routes.SSE}: ${error}`);
res.status(500).json({ message: 'Internal Server Error' }).end();
} catch (error) {
log.error(`Error in POST ${Routes.MESSAGE}: ${error}`);
if (!res.headersSent) {
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: 'Internal server error',
},
id: null,
});
}
});

app.route(Routes.MESSAGE)
.post(async (req: Request, res: Response) => {
try {
log.info(`Received POST message at: ${Routes.MESSAGE}`);
if (transport) {
await transport.handlePostMessage(req, res);
} else {
res.status(400).json({
message: 'Server is not connected to the client. '
+ 'Connect to the server with GET request to /sse endpoint',
});
}
});

app.post(Routes.MCP, async (req: Request, res: Response) => {
console.log('Received MCP request:', req.body); // eslint-disable-line no-console
try {
// Check for existing session ID
const sessionId = req.headers['mcp-session-id'] as string | undefined;
let transport: StreamableHTTPServerTransport;

if (sessionId && transports[sessionId]) {
// Reuse existing transport
transport = transports[sessionId];
} else if (!sessionId && isInitializeRequest(req.body)) {
// New initialization request - use JSON response mode
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
enableJsonResponse: true, // Enable JSON response mode
});

// Connect the transport to the MCP server BEFORE handling the request
await mcpServer.connect(transport);

// After handling the request, if we get a session ID back, store the transport
await transport.handleRequest(req, res, req.body);

// Store the transport by session ID for future requests
if (transport.sessionId) {
transports[transport.sessionId] = transport;
}
} catch (error) {
log.error(`Error in POST ${Routes.MESSAGE}: ${error}`);
res.status(500).json({ message: 'Internal Server Error' }).end();
return; // Already handled
} else {
// Invalid request - no session ID or not initialization request
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: No valid session ID provided or not initialization request',
},
id: null,
});
return;
}
});

// Handle the request with existing transport - no need to reconnect
await transport.handleRequest(req, res, req.body);
} catch (error) {
console.error('Error handling MCP request:', error); // eslint-disable-line no-console
if (!res.headersSent) {
res.status(500).json({
jsonrpc: '2.0',
error: {
code: -32603,
message: 'Internal server error',
},
id: null,
});
}
}
});

// Handle GET requests for SSE streams according to spec
app.get(Routes.MCP, async (_req: Request, res: Response) => {
// We don't support GET requests for this server
// The spec requires returning 405 Method Not Allowed in this case
res.status(405).set('Allow', 'POST').send('Method Not Allowed');
});

// Catch-all for undefined routes
app.use((req: Request, res: Response) => {
res.status(404).json({ message: `There is nothing at route ${req.method} ${req.originalUrl}. ${HELP_MESSAGE}` }).end();
res.status(404).json({ message: `There is nothing at route ${req.method} ${req.originalUrl}. ${getHelpMessage(host)}` }).end();
});

return app;
}

// Helper function to detect initialize requests
function isInitializeRequest(body: unknown): boolean {
if (Array.isArray(body)) {
return body.some((msg) => typeof msg === 'object' && msg !== null && 'method' in msg && msg.method === 'initialize');
}
return typeof body === 'object' && body !== null && 'method' in body && body.method === 'initialize';
}
4 changes: 2 additions & 2 deletions src/actor/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ import { parse } from 'node:querystring';

import { Actor } from 'apify';

import { processInput } from './input.js';
import type { ActorRunData, Input } from './types.js';
import { addTool, getActorsAsTools, removeTool } from '../tools/index.js';
import type { ToolWrap } from '../types.js';
import { processInput } from './input.js';
import type { ActorRunData, Input } from './types.js';

export function parseInputParamsFromUrl(url: string): Input {
const query = url.split('?')[1] || '';
Expand Down
2 changes: 1 addition & 1 deletion src/const.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export const ACTOR_MAX_MEMORY_MBYTES = 4_096; // If the Actor requires 8GB of me

// MCP Server
export const SERVER_NAME = 'apify-mcp-server';
export const SERVER_VERSION = '0.1.0';
export const SERVER_VERSION = '1.0.0';

// User agent headers
export const USER_AGENT_ORIGIN = 'Origin/mcp-server';
Expand Down
13 changes: 8 additions & 5 deletions src/examples/clientSse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
* It requires the `APIFY_TOKEN` in the `.env` file.
*/

import path from 'path';
import { fileURLToPath } from 'url';
import path from 'node:path';
import { fileURLToPath } from 'node:url';

import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js';
import { CallToolResultSchema } from '@modelcontextprotocol/sdk/types.js';
import dotenv from 'dotenv';
import { EventSource, EventSourceInit } from 'eventsource';
import dotenv from 'dotenv'; // eslint-disable-line import/no-extraneous-dependencies
import type { EventSourceInit } from 'eventsource';
import { EventSource } from 'eventsource'; // eslint-disable-line import/no-extraneous-dependencies

import { actorNameToToolName } from '../tools/utils.js';

Expand All @@ -36,13 +37,15 @@ if (!process.env.APIFY_TOKEN) {

// Declare EventSource on globalThis if not available (needed for Node.js environment)
declare global {

// eslint-disable-next-line no-var, vars-on-top
var EventSource: {
new(url: string, eventSourceInitDict?: EventSourceInit): EventSource;
prototype: EventSource;
CONNECTING: 0;
OPEN: 1;
CLOSED: 2;
}; // eslint-disable-line no-var
};
}

if (typeof globalThis.EventSource === 'undefined') {
Expand Down
Loading