Skip to content

Commit c8635d8

Browse files
ochafikclaude
andcommitted
feat: add MCP aggregator server example
Adds a new aggregator server that federates multiple MCP servers into one. It connects to backend servers and exposes all their tools and resources through a single unified interface. Features: - Tools namespaced by server name (e.g., `server-name/tool-name`) - Resources URIs rewritten with server prefix - Supports both HTTP and stdio transports - Lazy backend connection on session creation - Retry logic with exponential backoff for backend connections Configuration via environment variables: - BACKEND_SERVERS: JSON array of server URLs - PORT: Port to listen on (default: 3100) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 894f793 commit c8635d8

File tree

2 files changed

+277
-0
lines changed

2 files changed

+277
-0
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"name": "@modelcontextprotocol/ext-apps-aggregator-server",
3+
"version": "1.0.0",
4+
"type": "module",
5+
"private": true,
6+
"scripts": {
7+
"build": "echo 'No build required for aggregator-server'",
8+
"serve:http": "bun server.ts",
9+
"serve:stdio": "bun server.ts --stdio",
10+
"start": "npm run start:http",
11+
"start:http": "npm run serve:http",
12+
"start:stdio": "npm run serve:stdio"
13+
},
14+
"dependencies": {
15+
"@modelcontextprotocol/ext-apps": "../..",
16+
"@modelcontextprotocol/sdk": "^1.24.0"
17+
}
18+
}
Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
/**
2+
* MCP Aggregator Server - Federates multiple MCP servers into one.
3+
*
4+
* This server connects to multiple backend MCP servers and exposes all their
5+
* tools and resources through a single unified interface. Tools and resources
6+
* are namespaced by server name to avoid collisions.
7+
*
8+
* Configuration:
9+
* BACKEND_SERVERS: JSON array of server URLs
10+
* PORT: Port to listen on (default: 3100)
11+
*
12+
* Example:
13+
* BACKEND_SERVERS='["http://localhost:3102/mcp","http://localhost:3103/mcp"]' \
14+
* PORT=3100 bun examples/aggregator-server/server.ts
15+
*/
16+
17+
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
18+
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
19+
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
20+
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
21+
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
22+
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
23+
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js";
24+
import type { CallToolResult, ReadResourceResult, Resource, Tool } from "@modelcontextprotocol/sdk/types.js";
25+
import { createMcpExpressApp } from "@modelcontextprotocol/sdk/server/express.js";
26+
import cors from "cors";
27+
import { randomUUID } from "node:crypto";
28+
import type { Request, Response } from "express";
29+
30+
const log = {
31+
info: console.log.bind(console, "[AGGREGATOR]"),
32+
warn: console.warn.bind(console, "[AGGREGATOR]"),
33+
error: console.error.bind(console, "[AGGREGATOR]"),
34+
};
35+
36+
interface BackendServer {
37+
name: string;
38+
url: string;
39+
client: Client;
40+
tools: Map<string, Tool>;
41+
resources: Map<string, Resource>;
42+
}
43+
44+
// Global state
45+
let backends: BackendServer[] = [];
46+
let backendsPromise: Promise<void> | null = null;
47+
48+
function sanitizeName(name: string): string {
49+
return name.toLowerCase().replace(/[^a-z0-9]+/g, "-").replace(/^-|-$/g, "");
50+
}
51+
52+
async function connectToBackend(url: string): Promise<BackendServer | null> {
53+
const maxRetries = 15;
54+
const baseDelay = 500;
55+
56+
for (let attempt = 1; attempt <= maxRetries; attempt++) {
57+
try {
58+
const client = new Client({ name: "MCP Aggregator", version: "1.0.0" });
59+
await client.connect(new StreamableHTTPClientTransport(new URL(url)));
60+
61+
const name = client.getServerVersion()?.name ?? new URL(url).hostname;
62+
const toolsList = await client.listTools();
63+
const tools = new Map(toolsList.tools.map((t) => [t.name, t]));
64+
65+
let resources = new Map<string, Resource>();
66+
try {
67+
const resourcesList = await client.listResources();
68+
resources = new Map(resourcesList.resources.map((r) => [r.uri, r]));
69+
} catch {
70+
// Server may not support resources
71+
}
72+
73+
log.info(`Connected to ${name}: ${tools.size} tools, ${resources.size} resources`);
74+
return { name, url, client, tools, resources };
75+
} catch {
76+
if (attempt < maxRetries) {
77+
await new Promise((r) => setTimeout(r, baseDelay * attempt));
78+
}
79+
}
80+
}
81+
log.warn(`Failed to connect to ${url} after ${maxRetries} attempts`);
82+
return null;
83+
}
84+
85+
function getBackendUrls(): string[] {
86+
if (process.env.BACKEND_SERVERS) {
87+
return JSON.parse(process.env.BACKEND_SERVERS);
88+
}
89+
// Default for testing: connect to basic-server-react
90+
return ["http://localhost:3102/mcp"];
91+
}
92+
93+
function startBackendDiscovery(): void {
94+
if (!backendsPromise) {
95+
backendsPromise = (async () => {
96+
const urls = getBackendUrls();
97+
log.info(`Discovering ${urls.length} backend servers...`);
98+
const results = await Promise.all(urls.map(connectToBackend));
99+
backends = results.filter((b): b is BackendServer => b !== null);
100+
log.info(`Connected to ${backends.length}/${urls.length} backends`);
101+
})();
102+
}
103+
}
104+
105+
function prefixUri(prefix: string, uri: string): string {
106+
return uri.startsWith("ui://") ? `ui://${prefix}/${uri.slice(5)}` : `${prefix}/${uri}`;
107+
}
108+
109+
function rewriteMeta(prefix: string, meta: Tool["_meta"]): Tool["_meta"] {
110+
if (!meta) return undefined;
111+
const result = { ...meta };
112+
const ui = result.ui as { resourceUri?: string } | undefined;
113+
if (ui?.resourceUri) result.ui = { ...ui, resourceUri: prefixUri(prefix, ui.resourceUri) };
114+
if (result["ui/resourceUri"]) result["ui/resourceUri"] = prefixUri(prefix, result["ui/resourceUri"] as string);
115+
return result;
116+
}
117+
118+
async function createServerAsync(): Promise<McpServer> {
119+
await backendsPromise;
120+
121+
const server = new McpServer({ name: "MCP Aggregator", version: "1.0.0" });
122+
123+
for (const backend of backends) {
124+
const prefix = sanitizeName(backend.name);
125+
126+
for (const [name, tool] of Array.from(backend.tools.entries())) {
127+
server.registerTool(
128+
`${prefix}/${name}`,
129+
{
130+
title: tool.title ?? name,
131+
description: `[${backend.name}] ${tool.description ?? ""}`.trim(),
132+
inputSchema: tool.inputSchema?.properties ?? {},
133+
_meta: rewriteMeta(prefix, tool._meta),
134+
},
135+
async (args): Promise<CallToolResult> => {
136+
log.info(`Forwarding: ${prefix}/${name}`);
137+
return (await backend.client.callTool({ name, arguments: args })) as CallToolResult;
138+
},
139+
);
140+
}
141+
142+
for (const [uri, resource] of Array.from(backend.resources.entries())) {
143+
server.registerResource(
144+
`[${backend.name}] ${resource.name}`,
145+
prefixUri(prefix, uri),
146+
{ description: resource.description, mimeType: resource.mimeType },
147+
async (): Promise<ReadResourceResult> => {
148+
log.info(`Forwarding resource: ${prefix}/${uri}`);
149+
const result = await backend.client.readResource({ uri });
150+
return { contents: result.contents.map((c) => ({ ...c, uri: prefixUri(prefix, c.uri) })) };
151+
},
152+
);
153+
}
154+
}
155+
156+
const toolCount = backends.reduce((n, b) => n + b.tools.size, 0);
157+
const resourceCount = backends.reduce((n, b) => n + b.resources.size, 0);
158+
log.info(`Session ready: ${toolCount} tools, ${resourceCount} resources`);
159+
160+
return server;
161+
}
162+
163+
type Transport = StreamableHTTPServerTransport | SSEServerTransport;
164+
interface Session { transport: Transport; server: McpServer }
165+
166+
async function startHttpServer(port: number): Promise<void> {
167+
const sessions = new Map<string, Session>();
168+
const app = createMcpExpressApp({ host: "0.0.0.0" });
169+
app.use(cors({ exposedHeaders: ["mcp-session-id"] }));
170+
171+
app.all("/mcp", async (req: Request, res: Response) => {
172+
try {
173+
const sessionId = req.headers["mcp-session-id"] as string | undefined;
174+
let session = sessionId ? sessions.get(sessionId) : undefined;
175+
176+
if (session && !(session.transport instanceof StreamableHTTPServerTransport)) {
177+
return res.status(400).json({ jsonrpc: "2.0", error: { code: -32000, message: "Session uses different transport" }, id: null });
178+
}
179+
180+
if (!session) {
181+
if (req.method !== "POST" || !isInitializeRequest(req.body)) {
182+
return res.status(400).json({ jsonrpc: "2.0", error: { code: -32000, message: "Bad request: not initialized" }, id: null });
183+
}
184+
185+
const serverInstance = await createServerAsync();
186+
const transport = new StreamableHTTPServerTransport({
187+
sessionIdGenerator: () => randomUUID(),
188+
onsessioninitialized: (id) => sessions.set(id, { transport, server: serverInstance }),
189+
});
190+
transport.onclose = () => { if (transport.sessionId) sessions.delete(transport.sessionId); };
191+
await serverInstance.connect(transport);
192+
session = { transport, server: serverInstance };
193+
}
194+
195+
await (session.transport as StreamableHTTPServerTransport).handleRequest(req, res, req.body);
196+
} catch (error) {
197+
console.error("MCP error:", error);
198+
if (!res.headersSent) {
199+
res.status(500).json({ jsonrpc: "2.0", error: { code: -32603, message: "Internal server error" }, id: null });
200+
}
201+
}
202+
});
203+
204+
app.get("/sse", async (_req: Request, res: Response) => {
205+
try {
206+
const serverInstance = await createServerAsync();
207+
const transport = new SSEServerTransport("/messages", res);
208+
sessions.set(transport.sessionId, { transport, server: serverInstance });
209+
res.on("close", () => sessions.delete(transport.sessionId));
210+
await serverInstance.connect(transport);
211+
} catch (error) {
212+
console.error("SSE error:", error);
213+
if (!res.headersSent) res.status(500).end();
214+
}
215+
});
216+
217+
app.post("/messages", async (req: Request, res: Response) => {
218+
try {
219+
const session = sessions.get(req.query.sessionId as string);
220+
if (!session || !(session.transport instanceof SSEServerTransport)) {
221+
return res.status(404).json({ jsonrpc: "2.0", error: { code: -32001, message: "Session not found" }, id: null });
222+
}
223+
await session.transport.handlePostMessage(req, res, req.body);
224+
} catch (error) {
225+
console.error("Message error:", error);
226+
if (!res.headersSent) {
227+
res.status(500).json({ jsonrpc: "2.0", error: { code: -32603, message: "Internal server error" }, id: null });
228+
}
229+
}
230+
});
231+
232+
return new Promise<void>((resolve, reject) => {
233+
const httpServer = app.listen(port);
234+
httpServer.on("listening", () => { log.info(`Listening on http://localhost:${port}/mcp`); resolve(); });
235+
httpServer.on("error", reject);
236+
237+
const shutdown = () => {
238+
log.info("Shutting down...");
239+
sessions.forEach((s) => s.transport.close().catch(() => {}));
240+
httpServer.close(() => process.exit(0));
241+
};
242+
process.on("SIGINT", shutdown);
243+
process.on("SIGTERM", shutdown);
244+
});
245+
}
246+
247+
async function main() {
248+
startBackendDiscovery();
249+
250+
if (process.argv.includes("--stdio")) {
251+
const server = await createServerAsync();
252+
await server.connect(new StdioServerTransport());
253+
} else {
254+
const port = parseInt(process.env.PORT ?? "3100", 10);
255+
await startHttpServer(port);
256+
}
257+
}
258+
259+
main().catch((e) => { console.error(e); process.exit(1); });

0 commit comments

Comments
 (0)