Skip to content

Commit 6a8e6d4

Browse files
committed
add mcp
1 parent ac3cf29 commit 6a8e6d4

File tree

8 files changed

+1085
-46
lines changed

8 files changed

+1085
-46
lines changed
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// maybe this'll get built-into the SDK: https://github.com/modelcontextprotocol/typescript-sdk/issues/260
2+
import { type Transport } from '@modelcontextprotocol/sdk/shared/transport.js'
3+
import {
4+
type JSONRPCMessage,
5+
JSONRPCMessageSchema,
6+
} from '@modelcontextprotocol/sdk/types.js'
7+
8+
/**
9+
* Server transport for SSE using Standard Fetch: this will send messages over
10+
* an SSE connection and receive messages from HTTP POST requests.
11+
*/
12+
export class FetchSSEServerTransport implements Transport {
13+
#stream?: ReadableStreamDefaultController<string>
14+
#sessionId: string
15+
#endpoint: string
16+
17+
onclose?: () => void
18+
onerror?: (error: Error) => void
19+
onmessage?: (message: JSONRPCMessage) => void
20+
21+
/**
22+
* Creates a new SSE server transport, which will direct the client to POST
23+
* messages to the relative or absolute URL identified by `endpoint`.
24+
*/
25+
constructor(endpoint: string, sessionId?: string | null) {
26+
this.#endpoint = endpoint
27+
this.#sessionId = sessionId ?? crypto.randomUUID()
28+
}
29+
30+
/**
31+
* Starts processing messages on the transport.
32+
* This is called by the Server class and should not be called directly.
33+
*/
34+
async start(): Promise<void> {
35+
if (this.#stream) {
36+
throw new Error(
37+
'FetchSSEServerTransport already started! If using Server class, note that connect() calls start() automatically.',
38+
)
39+
}
40+
}
41+
42+
/**
43+
* Handles the initial SSE connection request.
44+
* This should be called from your Remix loader to establish the SSE stream.
45+
*/
46+
async handleSSERequest(request: Request): Promise<Response> {
47+
const stream = new ReadableStream<string>({
48+
start: (controller) => {
49+
this.#stream = controller
50+
51+
// Send headers
52+
controller.enqueue(': ping\n\n') // Keep connection alive
53+
54+
// Send the endpoint event
55+
controller.enqueue(
56+
`event: endpoint\ndata: ${encodeURI(
57+
this.#endpoint,
58+
)}?sessionId=${this.#sessionId}\n\n`,
59+
)
60+
61+
// Handle cleanup when the connection closes
62+
request.signal.addEventListener('abort', () => {
63+
controller.close()
64+
this.#stream = undefined
65+
this.onclose?.()
66+
})
67+
},
68+
cancel: () => {
69+
this.#stream = undefined
70+
this.onclose?.()
71+
},
72+
})
73+
74+
return new Response(stream, {
75+
headers: {
76+
'Content-Type': 'text/event-stream',
77+
'Cache-Control': 'no-cache',
78+
Connection: 'keep-alive',
79+
'Mcp-Session-Id': this.#sessionId,
80+
},
81+
})
82+
}
83+
84+
/**
85+
* Handles incoming POST messages.
86+
* This should be called from your Remix action to handle incoming messages.
87+
*/
88+
async handlePostMessage(request: Request): Promise<Response> {
89+
if (!this.#stream) {
90+
const message = 'SSE connection not established'
91+
return new Response(message, { status: 500 })
92+
}
93+
94+
let body: unknown
95+
try {
96+
const contentType = request.headers.get('content-type')
97+
if (contentType !== 'application/json') {
98+
throw new Error(`Unsupported content-type: ${contentType}`)
99+
}
100+
101+
body = await request.json()
102+
} catch (error) {
103+
this.onerror?.(error as Error)
104+
return new Response(String(error), { status: 400 })
105+
}
106+
107+
try {
108+
await this.handleMessage(body)
109+
} catch (error) {
110+
console.error(error)
111+
return new Response(`Invalid message: ${body}`, { status: 400 })
112+
}
113+
114+
return new Response('Accepted', { status: 202 })
115+
}
116+
117+
/**
118+
* Handle a client message, regardless of how it arrived.
119+
*/
120+
async handleMessage(message: unknown): Promise<void> {
121+
let parsedMessage: JSONRPCMessage
122+
try {
123+
parsedMessage = JSONRPCMessageSchema.parse(message)
124+
} catch (error) {
125+
this.onerror?.(error as Error)
126+
throw error
127+
}
128+
129+
this.onmessage?.(parsedMessage)
130+
}
131+
132+
async close(): Promise<void> {
133+
this.#stream?.close()
134+
this.#stream = undefined
135+
this.onclose?.()
136+
}
137+
138+
async send(message: JSONRPCMessage): Promise<void> {
139+
if (!this.#stream) {
140+
throw new Error('Not connected')
141+
}
142+
143+
// Send the message through the event stream
144+
this.#stream.enqueue(`event: message\ndata: ${JSON.stringify(message)}\n\n`)
145+
}
146+
147+
/**
148+
* Returns the session ID for this transport.
149+
* This can be used to route incoming POST requests.
150+
*/
151+
get sessionId(): string {
152+
return this.#sessionId
153+
}
154+
}

app/routes/mcp+/index.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import {
2+
type LoaderFunctionArgs,
3+
type ActionFunctionArgs,
4+
} from '@remix-run/router'
5+
import { invariantResponse } from '#app/utils/misc.js'
6+
import { connect, getTransport, requestStorage } from './mcp.server.ts'
7+
8+
export async function loader({ request }: LoaderFunctionArgs) {
9+
const response = await requestStorage.run(request, async () => {
10+
const url = new URL(request.url)
11+
const sessionId = url.searchParams.get('sessionId')
12+
const transport = await connect(sessionId)
13+
return transport.handleSSERequest(request)
14+
})
15+
16+
return response
17+
}
18+
19+
export async function action({ request }: ActionFunctionArgs) {
20+
const response = await requestStorage.run(request, async () => {
21+
const url = new URL(request.url)
22+
const sessionId = url.searchParams.get('sessionId')
23+
invariantResponse(sessionId, 'No session ID')
24+
25+
const transport = await getTransport(sessionId)
26+
invariantResponse(transport, 'No transport', { status: 404 })
27+
28+
return transport.handlePostMessage(request)
29+
})
30+
31+
return response
32+
}

0 commit comments

Comments
 (0)