-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathmcpServer.ts
More file actions
141 lines (112 loc) · 3.56 KB
/
mcpServer.ts
File metadata and controls
141 lines (112 loc) · 3.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import { Hono } from 'hono';
import { streamSSE } from 'hono/streaming';
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { Transport } from '@modelcontextprotocol/sdk/shared/transport.js';
import { JSONRPCMessage, JSONRPCMessageSchema } from '@modelcontextprotocol/sdk/types.js';
import { ToolRegistry } from './src/registry/registry.js';
const app = new Hono();
class HonoSSETransport implements Transport {
sessionId: string;
private sseStream: any; // Hono SSE stream type
private endpoint: string;
onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;
constructor(endpoint: string, stream: any) {
this.endpoint = endpoint;
this.sessionId = crypto.randomUUID();
this.sseStream = stream;
}
async start(): Promise<void> {
// Send initial ping
await this.sseStream.writeSSE({ data: 'ping', event: 'heartbeat' });
// Send endpoint information
await this.sseStream.writeSSE({
data: `${encodeURI(this.endpoint)}?sessionId=${this.sessionId}`,
event: 'endpoint',
});
}
async handlePostMessage(request: Request): Promise<Response> {
if (!this.sseStream) {
return new Response('SSE connection not established', { status: 500 });
}
try {
const contentType = request.headers.get('content-type');
if (contentType !== 'application/json') {
throw new Error(`Unsupported content-type: ${contentType}`);
}
const body = await request.json();
let parsedMessage: JSONRPCMessage;
try {
parsedMessage = JSONRPCMessageSchema.parse(body);
} catch (error) {
this.onerror?.(error as Error);
throw error;
}
this.onmessage?.(parsedMessage);
return new Response('Accepted', { status: 202 });
} catch (error) {
this.onerror?.(error as Error);
return new Response(String(error), { status: 400 });
}
}
async close(): Promise<void> {
this.onclose?.();
}
async send(message: JSONRPCMessage): Promise<void> {
if (!this.sseStream) {
throw new Error('Not connected');
}
await this.sseStream.writeSSE({
data: JSON.stringify(message),
event: 'message',
});
}
get id(): string {
return this.sessionId;
}
}
// Create server instance
const server = new McpServer({
name: 'askSentientAI',
version: '1.0.0',
capabilities: {
tools: {},
},
});
ToolRegistry.registerMcpTools(server);
// Store active transports by session ID
const transports: Record<string, HonoSSETransport> = {};
app.get('/sse', c => {
return streamSSE(c, async stream => {
const transport = new HonoSSETransport('/messages', stream);
// Store the transport with its session ID
transports[transport.id] = transport;
// Clean up when connection closes
stream.onAbort(() => {
delete transports[transport.id];
transport.close();
});
// Connect to MCP server
await server.connect(transport);
await transport.start();
// Keep the connection alive
while (true) {
await stream.sleep(30000);
await stream.writeSSE({ data: 'ping', event: 'heartbeat' });
}
});
});
app.post('/messages', async c => {
const sessionId = c.req.query('sessionId');
const transport = sessionId ? transports[sessionId] : null;
if (!transport) {
return c.text('No active SSE connection for the provided session ID', 400);
}
return transport.handlePostMessage(c.req.raw);
});
export default {
port: process.env.PORT ? parseInt(process.env.PORT) : 3000,
fetch: app.fetch,
idleTimeout: 120,
};