@@ -4,14 +4,17 @@ import {
44 createDataStreamResponse ,
55 smoothStream ,
66 streamText ,
7+ experimental_createMCPClient ,
78} from 'ai' ;
9+ import { Experimental_StdioMCPTransport } from 'ai/mcp-stdio' ;
810import { auth } from '@/app/(auth)/auth' ;
911import { systemPrompt } from '@/lib/ai/prompts' ;
1012import {
1113 deleteChatById ,
1214 getChatById ,
1315 saveChat ,
1416 saveMessages ,
17+ getEnabledMcpServersByUserId ,
1518} from '@/lib/db/queries' ;
1619import {
1720 generateUUID ,
@@ -29,6 +32,8 @@ import { myProvider } from '@/lib/ai/providers';
2932export const maxDuration = 60 ;
3033
3134export async function POST ( request : Request ) {
35+ let mcpClientsToClose : Awaited < ReturnType < typeof experimental_createMCPClient > > [ ] = [ ] ;
36+
3237 try {
3338 const {
3439 id,
@@ -45,6 +50,7 @@ export async function POST(request: Request) {
4550 if ( ! session || ! session . user || ! session . user . id ) {
4651 return new Response ( 'Unauthorized' , { status : 401 } ) ;
4752 }
53+ const userId = session . user . id ;
4854
4955 const userMessage = getMostRecentUserMessage ( messages ) ;
5056
@@ -59,9 +65,9 @@ export async function POST(request: Request) {
5965 message : userMessage ,
6066 } ) ;
6167
62- await saveChat ( { id, userId : session . user . id , title } ) ;
68+ await saveChat ( { id, userId : userId , title } ) ;
6369 } else {
64- if ( chat . userId !== session . user . id ) {
70+ if ( chat . userId !== userId ) {
6571 return new Response ( 'Unauthorized' , { status : 401 } ) ;
6672 }
6773 }
@@ -80,87 +86,146 @@ export async function POST(request: Request) {
8086 } ) ;
8187
8288 return createDataStreamResponse ( {
83- execute : ( dataStream ) => {
84- const result = streamText ( {
85- model : myProvider . languageModel ( selectedChatModel ) ,
86- system : systemPrompt ( { selectedChatModel } ) ,
87- messages,
88- maxSteps : 5 ,
89- experimental_activeTools :
90- selectedChatModel === 'chat-model-reasoning'
91- ? [ ]
92- : [
93- 'getWeather' ,
94- 'createDocument' ,
95- 'updateDocument' ,
96- 'requestSuggestions' ,
97- ] ,
98- experimental_transform : smoothStream ( { chunking : 'word' } ) ,
99- experimental_generateMessageId : generateUUID ,
100- tools : {
89+ execute : async ( dataStream ) => {
90+ try {
91+ const staticTools = {
10192 getWeather,
10293 createDocument : createDocument ( { session, dataStream } ) ,
10394 updateDocument : updateDocument ( { session, dataStream } ) ,
10495 requestSuggestions : requestSuggestions ( {
10596 session,
10697 dataStream,
10798 } ) ,
108- } ,
109- onFinish : async ( { response } ) => {
110- if ( session . user ?. id ) {
99+ } ;
100+ let combinedTools : Record < string , any > = { ...staticTools } ;
101+
102+ try {
103+ const enabledServers = await getEnabledMcpServersByUserId ( { userId } ) ;
104+
105+ for ( const server of enabledServers ) {
111106 try {
112- const assistantId = getTrailingMessageId ( {
113- messages : response . messages . filter (
114- ( message ) => message . role === 'assistant' ,
115- ) ,
116- } ) ;
117-
118- if ( ! assistantId ) {
119- throw new Error ( 'No assistant message found!' ) ;
107+ let transport ;
108+ const config = server . config as any ;
109+
110+ if ( config . transportType === 'sse' ) {
111+ transport = {
112+ type : 'sse' as const ,
113+ url : config . url ,
114+ } ;
115+ } else if ( config . transportType === 'stdio' ) {
116+ if ( isProductionEnvironment ) {
117+ console . warn ( `SECURITY WARNING: Initializing MCP client with stdio transport in production for server: ${ server . name } (ID: ${ server . id } )` ) ;
118+ }
119+ transport = new Experimental_StdioMCPTransport ( {
120+ command : config . command ,
121+ args : config . args || [ ] ,
122+ } ) ;
123+ } else {
124+ console . warn ( `Unsupported MCP transport type '${ config . transportType } ' for server ${ server . name } ` ) ;
125+ continue ;
120126 }
121127
122- const [ , assistantMessage ] = appendResponseMessages ( {
123- messages : [ userMessage ] ,
124- responseMessages : response . messages ,
125- } ) ;
126-
127- await saveMessages ( {
128- messages : [
129- {
130- id : assistantId ,
131- chatId : id ,
132- role : assistantMessage . role ,
133- parts : assistantMessage . parts ,
134- attachments :
135- assistantMessage . experimental_attachments ?? [ ] ,
136- createdAt : new Date ( ) ,
137- } ,
138- ] ,
139- } ) ;
140- } catch ( _ ) {
141- console . error ( 'Failed to save chat' ) ;
128+ const mcpClient = await experimental_createMCPClient ( { transport } ) ;
129+ mcpClientsToClose . push ( mcpClient ) ;
130+
131+ const mcpTools = await mcpClient . tools ( ) ;
132+ combinedTools = { ...combinedTools , ...mcpTools } ;
133+ console . log ( `Loaded ${ Object . keys ( mcpTools ) . length } tools from MCP server: ${ server . name } ` ) ;
134+
135+ } catch ( mcpError ) {
136+ console . error ( `Failed to initialize or get tools from MCP server ${ server . name } (ID: ${ server . id } ):` , mcpError ) ;
142137 }
143138 }
144- } ,
145- experimental_telemetry : {
146- isEnabled : isProductionEnvironment ,
147- functionId : 'stream-text' ,
148- } ,
149- } ) ;
150-
151- result . consumeStream ( ) ;
152-
153- result . mergeIntoDataStream ( dataStream , {
154- sendReasoning : true ,
155- } ) ;
139+ } catch ( dbError ) {
140+ console . error ( 'Failed to fetch enabled MCP servers:' , dbError ) ;
141+ }
142+
143+ const activeToolsList = selectedChatModel === 'chat-model-reasoning'
144+ ? [ ]
145+ : Object . keys ( combinedTools ) ;
146+
147+ const result = streamText ( {
148+ model : myProvider . languageModel ( selectedChatModel ) ,
149+ system : systemPrompt ( { selectedChatModel } ) ,
150+ messages,
151+ maxSteps : 5 ,
152+ tools : combinedTools ,
153+ experimental_activeTools : activeToolsList ,
154+ experimental_transform : smoothStream ( { chunking : 'word' } ) ,
155+ experimental_generateMessageId : generateUUID ,
156+ onFinish : async ( { response } ) => {
157+ if ( session . user ?. id ) {
158+ try {
159+ const assistantId = getTrailingMessageId ( {
160+ messages : response . messages . filter (
161+ ( message ) => message . role === 'assistant' ,
162+ ) ,
163+ } ) ;
164+
165+ if ( ! assistantId ) {
166+ throw new Error ( 'No assistant message found!' ) ;
167+ }
168+
169+ const [ , assistantMessage ] = appendResponseMessages ( {
170+ messages : [ userMessage ] ,
171+ responseMessages : response . messages ,
172+ } ) ;
173+
174+ await saveMessages ( {
175+ messages : [
176+ {
177+ id : assistantId ,
178+ chatId : id ,
179+ role : assistantMessage . role ,
180+ parts : assistantMessage . parts ,
181+ attachments :
182+ assistantMessage . experimental_attachments ?? [ ] ,
183+ createdAt : new Date ( ) ,
184+ } ,
185+ ] ,
186+ } ) ;
187+ } catch ( _ ) {
188+ console . error ( 'Failed to save chat messages after stream completion' ) ;
189+ }
190+ }
191+ console . log ( `Closing ${ mcpClientsToClose . length } MCP clients in onFinish...` ) ;
192+ for ( const client of mcpClientsToClose ) {
193+ try {
194+ await client . close ( ) ;
195+ } catch ( closeError : unknown ) {
196+ console . error ( 'Error closing MCP client in onFinish:' , closeError ) ;
197+ }
198+ }
199+ mcpClientsToClose = [ ] ;
200+ } ,
201+ experimental_telemetry : {
202+ isEnabled : isProductionEnvironment ,
203+ functionId : 'stream-text' ,
204+ } ,
205+ } ) ;
206+
207+ result . consumeStream ( ) ;
208+ result . mergeIntoDataStream ( dataStream , { sendReasoning : true } ) ;
209+
210+ } catch ( streamError ) {
211+ console . error ( 'Error during streamText execution or MCP setup:' , streamError ) ;
212+ throw streamError ;
213+ } finally {
214+ console . log ( 'Stream execute try/catch finished.' ) ;
215+ }
156216 } ,
157- onError : ( ) => {
158- return 'Oops, an error occured!' ;
217+ onError : ( error ) => {
218+ console . error ( 'Data stream error:' , error ) ;
219+ return 'Oops, an error occured!' ;
159220 } ,
160221 } ) ;
161222 } catch ( error ) {
223+ console . error ( 'Error in POST /api/chat route (initial setup):' , error ) ;
224+ for ( const client of mcpClientsToClose ) {
225+ client . close ( ) . catch ( ( closeError : unknown ) => console . error ( 'Error closing MCP client during outer catch:' , closeError ) ) ;
226+ }
162227 return new Response ( 'An error occurred while processing your request!' , {
163- status : 404 ,
228+ status : 500 ,
164229 } ) ;
165230 }
166231}
@@ -190,6 +255,7 @@ export async function DELETE(request: Request) {
190255
191256 return new Response ( 'Chat deleted' , { status : 200 } ) ;
192257 } catch ( error ) {
258+ console . error ( 'Error deleting chat:' , error ) ;
193259 return new Response ( 'An error occurred while processing your request!' , {
194260 status : 500 ,
195261 } ) ;
0 commit comments