Skip to content

Commit 902f6e7

Browse files
committed
fix(assistant): use internal fetch for MCP on workers
1 parent 1f29857 commit 902f6e7

File tree

1 file changed

+78
-7
lines changed
  • .pnpm-patch-docus/modules/assistant/runtime/server/api

1 file changed

+78
-7
lines changed

.pnpm-patch-docus/modules/assistant/runtime/server/api/search.ts

Lines changed: 78 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,81 @@
11
import type { ToolCallPart, ToolSet, UIMessageStreamWriter } from 'ai'
22
import { createMCPClient } from '@ai-sdk/mcp'
33
import { convertToModelMessages, createUIMessageStream, createUIMessageStreamResponse, streamText } from 'ai'
4+
import type { H3Event } from 'h3'
5+
6+
interface McpTransport {
7+
start(): Promise<void>
8+
close(): Promise<void>
9+
send(message: Record<string, unknown>): Promise<void>
10+
onmessage?: ((message: Record<string, unknown>) => void) | undefined
11+
onerror?: ((error: Error) => void) | undefined
12+
onclose?: (() => void) | undefined
13+
}
414

515
const MAX_STEPS = 10
616

17+
/**
18+
* MCP transport that routes through Nitro's internal localFetch (event.fetch)
19+
* instead of globalThis.$fetch (event.$fetch) which uses the global fetch()
20+
* and triggers CF Workers self-fetch error 1042.
21+
*/
22+
function createInternalMcpTransport(event: H3Event, mcpPath: string): McpTransport {
23+
let _onmessage: ((message: Record<string, unknown>) => void) | undefined
24+
let _onerror: ((error: Error) => void) | undefined
25+
let _onclose: (() => void) | undefined
26+
27+
return {
28+
async start() {},
29+
async close() { _onclose?.() },
30+
get onmessage() { return _onmessage },
31+
set onmessage(fn) { _onmessage = fn },
32+
get onerror() { return _onerror },
33+
set onerror(fn) { _onerror = fn },
34+
get onclose() { return _onclose },
35+
set onclose(fn) { _onclose = fn },
36+
async send(message: Record<string, unknown>) {
37+
try {
38+
const response = await event.fetch(mcpPath, {
39+
method: 'POST',
40+
body: JSON.stringify(message),
41+
headers: {
42+
'Content-Type': 'application/json',
43+
'Accept': 'application/json, text/event-stream',
44+
},
45+
})
46+
47+
const contentType = response.headers.get('content-type') || ''
48+
49+
if (contentType.includes('text/event-stream') && response.body) {
50+
const reader = response.body.pipeThrough(new TextDecoderStream()).getReader()
51+
let buffer = ''
52+
while (true) {
53+
const { done, value } = await reader.read()
54+
if (done) break
55+
buffer += value
56+
const lines = buffer.split('\n')
57+
buffer = lines.pop() || ''
58+
for (const line of lines) {
59+
if (line.startsWith('data: '))
60+
_onmessage?.(JSON.parse(line.slice(6)))
61+
}
62+
}
63+
if (buffer.startsWith('data: '))
64+
_onmessage?.(JSON.parse(buffer.slice(6)))
65+
}
66+
else {
67+
const body = await response.json()
68+
const messages = Array.isArray(body) ? body : [body]
69+
for (const m of messages) _onmessage?.(m as Record<string, unknown>)
70+
}
71+
}
72+
catch (error) {
73+
_onerror?.(error as Error)
74+
}
75+
},
76+
}
77+
}
78+
779
// eslint-disable-next-line @typescript-eslint/no-explicit-any
880
function stopWhenResponseComplete({ steps }: { steps: any[] }): boolean {
981
const lastStep = steps.at(-1)
@@ -62,15 +134,14 @@ export default defineEventHandler(async (event) => {
62134

63135
const mcpServer = config.assistant.mcpServer
64136
const isExternalUrl = mcpServer.startsWith('http://') || mcpServer.startsWith('https://')
65-
const mcpUrl = isExternalUrl
66-
? mcpServer
137+
138+
const transport = isExternalUrl
139+
? { type: 'http' as const, url: mcpServer }
67140
: import.meta.dev
68-
? `http://localhost:3000${mcpServer}`
69-
: `${getRequestURL(event).origin}${mcpServer}`
141+
? { type: 'http' as const, url: `http://localhost:3000${mcpServer}` }
142+
: createInternalMcpTransport(event, mcpServer)
70143

71-
const httpClient = await createMCPClient({
72-
transport: { type: 'http', url: mcpUrl },
73-
})
144+
const httpClient = await createMCPClient({ transport })
74145
const mcpTools = await httpClient.tools()
75146

76147
const stream = createUIMessageStream({

0 commit comments

Comments
 (0)