Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@ jobs:
run: npm ci

- name: Lint
run: npm run lint
run: npm run lint:fix

- name: Build
run: npm run build

- name: Test
run: npm run test

- name: Type checks
run: npm run type-check
1 change: 1 addition & 0 deletions .nvmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
22.13.1
6 changes: 3 additions & 3 deletions eslint.config.mjs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import apify from '@apify/eslint-config';
import apifyTypeScriptConfig from '@apify/eslint-config/ts.js';

// eslint-disable-next-line import/no-default-export
export default [
{ ignores: ['**/dist', '**/.venv'] }, // Ignores need to happen first
...apify,
{ ignores: ['**/dist'] }, // Ignores need to happen first
...apifyTypeScriptConfig,
{
languageOptions: {
sourceType: 'module',
Expand Down
375 changes: 187 additions & 188 deletions package-lock.json

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
],
"dependencies": {
"@apify/log": "^2.5.16",
"@modelcontextprotocol/sdk": "^1.9.0",
"@modelcontextprotocol/sdk": "github:jirispilka/mcp-typescript-sdk#fix/add-src-dir",
"ajv": "^8.17.1",
"apify": "^3.4.0",
"apify-client": "^2.12.1",
Expand Down Expand Up @@ -59,13 +59,13 @@
"start": "npm run start:dev",
"start:prod": "node dist/main.js",
"start:dev": "tsx src/main.ts",
"lint": "./node_modules/.bin/eslint .",
"lint:fix": "./node_modules/.bin/eslint . --fix",
"build": "tsc",
"build:watch": "tsc -w",
"lint": "eslint .",
"lint:fix": "eslint . --fix",
"build": "tsc -b src",
"build:watch": "tsc -b src -w",
"type-check": "tsc --noEmit",
"inspector": "npx @modelcontextprotocol/inspector dist/stdio.js",
"test": "vitest run",
"type-check": "tsc --noEmit",
"clean": "tsc -b src --clean"
},
"author": "Apify",
Expand Down
8 changes: 8 additions & 0 deletions src/actor/const.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ export const HEADER_READINESS_PROBE = 'x-apify-container-server-readiness-probe'

export enum Routes {
ROOT = '/',
MCP = '/mcp',
SSE = '/sse',
MESSAGE = '/message',
}

export const getHelpMessage = (host: string) => `To interact with the server you can either:
- send request to ${host}${Routes.MCP}?token=YOUR-APIFY-TOKEN and receive a response
or
- connect for Server-Sent Events (SSE) via GET request to: ${host}${Routes.SSE}?token=YOUR-APIFY-TOKEN
- send messages via POST request to: ${host}${Routes.MESSAGE}?token=YOUR-APIFY-TOKEN
(Include your message content in the request body.)`;
199 changes: 139 additions & 60 deletions src/actor/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,91 +2,170 @@
* Express server implementation used for standby Actor mode.
*/

import { randomUUID } from 'node:crypto';

import { SSEServerTransport } from '@modelcontextprotocol/sdk/server/sse.js';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import type { Request, Response } from 'express';
import express from 'express';

import log from '@apify/log';

import { HEADER_READINESS_PROBE, Routes } from './const.js';
import { type ActorsMcpServer } from '../mcp-server.js';
import { getHelpMessage, HEADER_READINESS_PROBE, Routes } from './const.js';
import { getActorRunData, processParamsGetTools } from './utils.js';

export function createExpressApp(
host: string,
mcpServer: ActorsMcpServer,
): express.Express {
const HELP_MESSAGE = `Connect to the server with GET request to ${host}/sse?token=YOUR-APIFY-TOKEN`
+ ` and then send POST requests to ${host}/message?token=YOUR-APIFY-TOKEN`;

const app = express();
app.use(express.json());
let transportSSE: SSEServerTransport;
const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {};

let transport: SSEServerTransport;
function respondWithError(res: Response, error: unknown, logMessage: string, statusCode = 500) {
log.error(`${logMessage}: ${error}`);
if (!res.headersSent) {
res.status(statusCode).json({
jsonrpc: '2.0',
error: {
code: statusCode === 500 ? -32603 : -32000,
message: statusCode === 500 ? 'Internal server error' : 'Bad Request',
},
id: null,
});
}
}

app.route(Routes.ROOT)
.get(async (req: Request, res: Response) => {
if (req.headers && req.get(HEADER_READINESS_PROBE) !== undefined) {
log.debug('Received readiness probe');
res.status(200).json({ message: 'Server is ready' }).end();
return;
app.get(Routes.ROOT, async (req: Request, res: Response) => {
if (req.headers && req.get(HEADER_READINESS_PROBE) !== undefined) {
log.debug('Received readiness probe');
res.status(200).json({ message: 'Server is ready' }).end();
return;
}
try {
log.info(`Received GET message at: ${Routes.ROOT}`);
const tools = await processParamsGetTools(req.url);
if (tools) {
mcpServer.updateTools(tools);
}
try {
log.info(`Received GET message at: ${Routes.ROOT}`);
const tools = await processParamsGetTools(req.url);
if (tools) {
mcpServer.updateTools(tools);
}
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.status(200).json({ message: `Actor is using Model Context Protocol. ${HELP_MESSAGE}`, data: getActorRunData() }).end();
} catch (error) {
log.error(`Error in GET ${Routes.ROOT} ${error}`);
res.status(500).json({ message: 'Internal Server Error' }).end();
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.status(200).json({ message: `Actor is using Model Context Protocol. ${getHelpMessage(host)}`, data: getActorRunData() }).end();
} catch (error) {
respondWithError(res, error, `Error in GET ${Routes.ROOT}`);
}
});

app.head(Routes.ROOT, (_req: Request, res: Response) => {
res.status(200).end();
});

app.get(Routes.SSE, async (req: Request, res: Response) => {
try {
log.info(`Received GET message at: ${Routes.SSE}`);
const tools = await processParamsGetTools(req.url);
if (tools) {
mcpServer.updateTools(tools);
}
})
.head((_req: Request, res: Response) => {
res.status(200).end();
});

app.route(Routes.SSE)
.get(async (req: Request, res: Response) => {
try {
log.info(`Received GET message at: ${Routes.SSE}`);
const tools = await processParamsGetTools(req.url);
if (tools) {
mcpServer.updateTools(tools);
}
transport = new SSEServerTransport(Routes.MESSAGE, res);
await mcpServer.connect(transport);
} catch (error) {
log.error(`Error in GET ${Routes.SSE}: ${error}`);
res.status(500).json({ message: 'Internal Server Error' }).end();
transportSSE = new SSEServerTransport(Routes.MESSAGE, res);
await mcpServer.connect(transportSSE);
} catch (error) {
respondWithError(res, error, `Error in GET ${Routes.SSE}`);
}
});

app.post(Routes.MESSAGE, async (req: Request, res: Response) => {
try {
log.info(`Received POST message at: ${Routes.MESSAGE}`);
if (transportSSE) {
await transportSSE.handlePostMessage(req, res);
} else {
log.error('Server is not connected to the client.');
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: Server is not connected to the client. '
+ 'Connect to the server with GET request to /sse endpoint',
},
id: null,
});
}
});

app.route(Routes.MESSAGE)
.post(async (req: Request, res: Response) => {
try {
log.info(`Received POST message at: ${Routes.MESSAGE}`);
if (transport) {
await transport.handlePostMessage(req, res);
} else {
res.status(400).json({
message: 'Server is not connected to the client. '
+ 'Connect to the server with GET request to /sse endpoint',
});
} catch (error) {
respondWithError(res, error, `Error in POST ${Routes.MESSAGE}`);
}
});

app.post(Routes.MCP, async (req: Request, res: Response) => {
log.info('Received MCP request:', req.body);
try {
// Check for existing session ID
const sessionId = req.headers['mcp-session-id'] as string | undefined;
let transport: StreamableHTTPServerTransport;

if (sessionId && transports[sessionId]) {
// Reuse existing transport
transport = transports[sessionId];
} else if (!sessionId && isInitializeRequest(req.body)) {
// New initialization request - use JSON response mode
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
enableJsonResponse: true, // Enable JSON response mode
});

// Connect the transport to the MCP server BEFORE handling the request
await mcpServer.connect(transport);

// After handling the request, if we get a session ID back, store the transport
await transport.handleRequest(req, res, req.body);

// Store the transport by session ID for future requests
if (transport.sessionId) {
transports[transport.sessionId] = transport;
}
} catch (error) {
log.error(`Error in POST ${Routes.MESSAGE}: ${error}`);
res.status(500).json({ message: 'Internal Server Error' }).end();
return; // Already handled
} else {
// Invalid request - no session ID or not initialization request
res.status(400).json({
jsonrpc: '2.0',
error: {
code: -32000,
message: 'Bad Request: No valid session ID provided or not initialization request',
},
id: null,
});
return;
}
});

// Handle the request with existing transport - no need to reconnect
await transport.handleRequest(req, res, req.body);
} catch (error) {
respondWithError(res, error, 'Error handling MCP request');
}
});

// Handle GET requests for SSE streams according to spec
app.get(Routes.MCP, async (_req: Request, res: Response) => {
// We don't support GET requests for this server
// The spec requires returning 405 Method Not Allowed in this case
res.status(405).set('Allow', 'POST').send('Method Not Allowed');
});

// Catch-all for undefined routes
app.use((req: Request, res: Response) => {
res.status(404).json({ message: `There is nothing at route ${req.method} ${req.originalUrl}. ${HELP_MESSAGE}` }).end();
res.status(404).json({ message: `There is nothing at route ${req.method} ${req.originalUrl}. ${getHelpMessage(host)}` }).end();
});

return app;
}

// Helper function to detect initialize requests
function isInitializeRequest(body: unknown): boolean {
if (Array.isArray(body)) {
return body.some((msg) => typeof msg === 'object' && msg !== null && 'method' in msg && msg.method === 'initialize');
}
return typeof body === 'object' && body !== null && 'method' in body && body.method === 'initialize';
}
4 changes: 2 additions & 2 deletions src/actor/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ import { parse } from 'node:querystring';

import { Actor } from 'apify';

import { processInput } from './input.js';
import type { ActorRunData, Input } from './types.js';
import { addTool, getActorsAsTools, removeTool } from '../tools/index.js';
import type { ToolWrap } from '../types.js';
import { processInput } from './input.js';
import type { ActorRunData, Input } from './types.js';

export function parseInputParamsFromUrl(url: string): Input {
const query = url.split('?')[1] || '';
Expand Down
2 changes: 1 addition & 1 deletion src/const.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export const ACTOR_MAX_MEMORY_MBYTES = 4_096; // If the Actor requires 8GB of me

// MCP Server
export const SERVER_NAME = 'apify-mcp-server';
export const SERVER_VERSION = '0.1.0';
export const SERVER_VERSION = '1.0.0';

// User agent headers
export const USER_AGENT_ORIGIN = 'Origin/mcp-server';
Expand Down
13 changes: 8 additions & 5 deletions src/examples/clientSse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
* It requires the `APIFY_TOKEN` in the `.env` file.
*/

import path from 'path';
import { fileURLToPath } from 'url';
import path from 'node:path';
import { fileURLToPath } from 'node:url';

import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js';
import { CallToolResultSchema } from '@modelcontextprotocol/sdk/types.js';
import dotenv from 'dotenv';
import { EventSource, EventSourceInit } from 'eventsource';
import dotenv from 'dotenv'; // eslint-disable-line import/no-extraneous-dependencies
import type { EventSourceInit } from 'eventsource';
import { EventSource } from 'eventsource'; // eslint-disable-line import/no-extraneous-dependencies

import { actorNameToToolName } from '../tools/utils.js';

Expand All @@ -36,13 +37,15 @@ if (!process.env.APIFY_TOKEN) {

// Declare EventSource on globalThis if not available (needed for Node.js environment)
declare global {

// eslint-disable-next-line no-var, vars-on-top
var EventSource: {
new(url: string, eventSourceInitDict?: EventSourceInit): EventSource;
prototype: EventSource;
CONNECTING: 0;
OPEN: 1;
CLOSED: 2;
}; // eslint-disable-line no-var
};
}

if (typeof globalThis.EventSource === 'undefined') {
Expand Down
8 changes: 4 additions & 4 deletions src/examples/clientStdio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
* You can choose actors to run in the server, for example: `apify/rag-web-browser`.
*/

import { execSync } from 'child_process';
import path from 'path';
import { fileURLToPath } from 'url';
import { execSync } from 'node:child_process';
import path from 'node:path';
import { fileURLToPath } from 'node:url';

import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { CallToolResultSchema } from '@modelcontextprotocol/sdk/types.js';
import dotenv from 'dotenv';
import dotenv from 'dotenv'; // eslint-disable-line import/no-extraneous-dependencies

import { actorNameToToolName } from '../tools/utils.js';

Expand Down
Loading