From 16611dd5ee99f852296f7689143a6c9af7642037 Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Thu, 5 Jun 2025 03:45:46 +0530 Subject: [PATCH 01/21] Added support for tool invocations via MCP to run directly on the gateway --- src/handlers/handlerUtils.ts | 116 +++- src/handlers/services/mcpService.ts | 748 ++++++++++++++++++++++++ src/handlers/services/requestContext.ts | 23 + src/middlewares/cache/index.ts | 4 +- src/types/requestBody.ts | 12 + 5 files changed, 897 insertions(+), 6 deletions(-) create mode 100644 src/handlers/services/mcpService.ts diff --git a/src/handlers/handlerUtils.ts b/src/handlers/handlerUtils.ts index 173d9642b..d3c153378 100644 --- a/src/handlers/handlerUtils.ts +++ b/src/handlers/handlerUtils.ts @@ -38,6 +38,7 @@ import { PreRequestValidatorService } from './services/preRequestValidatorServic import { ProviderContext } from './services/providerContext'; import { RequestContext } from './services/requestContext'; import { ResponseService } from './services/responseService'; +import { McpService } from './services/mcpService'; function constructRequestBody( requestContext: RequestContext, @@ -325,7 +326,6 @@ export async function tryPost( currentIndex: number | string, method: string = 'POST' ): Promise { - // console.log('1. requestBody', requestBody); const requestContext = new RequestContext( c, providerOption, @@ -378,6 +378,17 @@ export async function tryPost( requestContext.params = hookSpan.getContext().request.json; } + // NEW: Initialize MCP service if needed + await using mcpService = requestContext.shouldHandleMcp() + ? new McpService(requestContext) + : null; + if (mcpService) { + await mcpService.init(); + // Add MCP tools to the request + const mcpTools = mcpService.tools; + requestContext.addMcpTools(mcpTools); + } + // Attach the body of the request if (!providerContext.hasRequestHandler(requestContext)) { requestContext.transformToProviderRequestAndSave(); @@ -446,7 +457,8 @@ export async function tryPost( 0, hookSpan.id, providerContext, - hooksService + hooksService, + mcpService || undefined ); return responseService.create({ @@ -1134,7 +1146,8 @@ export async function recursiveAfterRequestHookHandler( retryAttemptsMade: any, hookSpanId: string, providerContext: ProviderContext, - hooksService: HooksService + hooksService: HooksService, + mcpService?: McpService ): Promise<{ mappedResponse: Response; retryCount: number; @@ -1193,6 +1206,50 @@ export async function recursiveAfterRequestHookHandler( areSyncHooksAvailable ); + if ( + mcpService && + !isStreamingMode && + mappedResponseJson?.choices?.[0]?.message?.tool_calls?.[0] + ) { + const mcpResult = await handleMcpToolCalls( + requestContext, + mappedResponseJson, + mcpService + ); + if (mcpResult.success) { + // TODO: the hookspan context might need to be updated here. + // Update hook span context for the new request + // hooksService.hookSpan.updateContext({ + // request: { json: requestContext.params }, + // }); + + // Construct the base object for the request + const providerMappedHeaders = + await providerContext.getHeaders(requestContext); + const fetchOptions: RequestInit = constructRequest( + providerMappedHeaders, + requestContext + ); + + // Recurse with updated conversation + return recursiveAfterRequestHookHandler( + requestContext, + fetchOptions, + 0, // Reset retry attempts for new LLM request + hookSpanId, + providerContext, + hooksService, + mcpService + ); + } else { + // MCP failed, log and continue with current response + console.warn( + 'MCP processing failed, returning current response:', + mcpResult.error + ); + } + } + const arhResponse = await afterRequestHookHandler( c, mappedResponse, @@ -1403,3 +1460,56 @@ export async function beforeRequestHookHandler( transformedBody: isTransformed ? span.getContext().request.json : null, }; } + +// handlerUtils.ts +async function handleMcpToolCalls( + requestContext: RequestContext, + responseJson: any, + mcpService: McpService +): Promise<{ success: boolean; error?: string }> { + try { + const toolCalls = responseJson.choices[0].message.tool_calls; + const conversation = [...(requestContext.params.messages || [])]; + + // Add assistant's response with tool calls to conversation + conversation.push(responseJson.choices[0].message); + + // Execute each tool call and add results to conversation + for (const toolCall of toolCalls) { + try { + const toolResult = await mcpService.executeTool( + toolCall.function.name, + JSON.parse(toolCall.function.arguments) + ); + conversation.push({ + role: 'tool', + tool_call_id: toolCall.id, + content: JSON.stringify(toolResult), + }); + } catch (toolError: any) { + console.error( + `MCP tool call failed for ${toolCall.function.name}:`, + toolError + ); + // Add error message as tool response + conversation.push({ + role: 'tool', + tool_call_id: toolCall.id, + content: JSON.stringify({ + error: 'Tool execution failed', + details: toolError.message, + }), + }); + } + } + + // Update the existing context + requestContext.updateMessages(conversation); + requestContext.transformToProviderRequestAndSave(); + + return { success: true }; + } catch (error: any) { + console.error('Error in handleMcpToolCalls:', error); + return { success: false, error: error.message }; + } +} diff --git a/src/handlers/services/mcpService.ts b/src/handlers/services/mcpService.ts new file mode 100644 index 000000000..696283de4 --- /dev/null +++ b/src/handlers/services/mcpService.ts @@ -0,0 +1,748 @@ +import { McpServerConfig, ToolCall } from '../../types/requestBody'; +import { RequestContext } from './requestContext'; + +// services/mcpService.ts +export class McpService { + private mcpConnections = new Map(); + private mcpTools = new Map(); + + constructor(private requestContext: RequestContext) {} + + async init(): Promise { + const mcpServers = this.requestContext.params.mcp_servers; + if (!mcpServers) { + return; + } + for (const server of mcpServers) { + try { + const client = await this.connectToMcpServer(server); + if (client) { + this.mcpConnections.set(server.name, client); + let tools = await client.listTools(); + if (server.tool_configuration.enabled) { + const allowedTools = server.tool_configuration.allowed_tools; + tools = tools.filter((tool) => allowedTools.includes(tool.name)); + } + const llmTools = this.transformToolsForLLM(server.name, tools); + this.mcpTools.set(server.name, llmTools); + } + } catch (error) { + console.error(`Error connecting to MCP server ${server.url}:`, error); + } + } + return; + } + + get tools(): LLMFunction[] { + return Array.from(this.mcpTools.values()).flat(); + } + + async executeTool( + functionName: string, + toolArgs: any + ): Promise { + const serverName = functionName.split('_')[0]; + const toolName = functionName.split('_').slice(1).join('_'); + const client = this.mcpConnections.get(serverName); + // console.log('Current MCP connections are', this.mcpConnections); + if (!client) { + throw new Error(`MCP server ${serverName} not found`); + } + // console.log('Executing tool', toolName, toolArgs); + return await client.executeTool(toolName, toolArgs); + } + + private transformToolsForLLM( + servername: string, + mcpTools: Tool[] + ): LLMFunction[] { + return mcpTools.map((tool) => ({ + type: 'function' as const, + function: { + name: `${servername}_${tool.name}`, + description: tool.description, + parameters: { + type: 'object' as const, + properties: tool.inputSchema.properties || {}, + required: tool.inputSchema.required || [], + // Preserve any additional schema properties like additionalProperties, etc. + ...Object.fromEntries( + Object.entries(tool.inputSchema).filter( + ([key]) => !['type', 'properties', 'required'].includes(key) + ) + ), + }, + }, + })); + } + + private async connectToMcpServer( + config: McpServerConfig + ): Promise { + const client = new MinimalMCPClient(config.url, config.authorization_token); + const result = await client.initialize(); + if (!result.capabilities.tools) { + throw new Error(`MCP server ${config.url} does not support tools`); + return null; + } + return client; + } + + async [Symbol.asyncDispose](): Promise { + // console.log('Disposing MCP service...'); + + for (const [name, client] of this.mcpConnections) { + try { + // console.log('Closing MCP connection to', name); + await client.close(); + } catch (error) { + console.error(`Error closing MCP connection to ${name}:`, error); + // Continue closing other connections even if one fails + } + } + + this.mcpConnections.clear(); + this.mcpTools.clear(); + + // console.log('MCP service disposed'); + } +} + +// Minimal MCP Client for fetching tools from remote servers +// Supports both StreamableHTTP and SSE transports + +interface JSONRPCRequest { + jsonrpc: '2.0'; + id: string | number; + method: string; + params?: any; +} + +interface JSONRPCResponse { + jsonrpc: '2.0'; + id: string | number; + result?: any; + error?: { + code: number; + message: string; + data?: any; + }; +} + +interface Tool { + name: string; + description?: string; + inputSchema: { + type: 'object'; + properties?: Record; + required?: string[]; + }; + outputSchema?: { + type: 'object'; + properties?: Record; + required?: string[]; + }; +} + +interface InitializeResult { + protocolVersion: string; + capabilities: { + tools?: any; + [key: string]: any; + }; + serverInfo: { + name: string; + version: string; + }; + instructions?: string; +} + +interface ListToolsResult { + tools: Tool[]; + nextCursor?: string; +} + +// LLM Function Call format (OpenAI-style) +export interface LLMFunction { + type: 'function'; + function: { + name: string; + description?: string; + parameters: { + type: 'object'; + properties?: Record; + required?: string[]; + [key: string]: any; + }; + }; +} + +// Minimal MCP Client for fetching tools from remote servers +// Supports both StreamableHTTP and SSE transports + +interface JSONRPCRequest { + jsonrpc: '2.0'; + id: string | number; + method: string; + params?: any; +} + +interface JSONRPCResponse { + jsonrpc: '2.0'; + id: string | number; + result?: any; + error?: { + code: number; + message: string; + data?: any; + }; +} + +interface Tool { + name: string; + description?: string; + inputSchema: { + type: 'object'; + properties?: Record; + required?: string[]; + }; + outputSchema?: { + type: 'object'; + properties?: Record; + required?: string[]; + }; +} + +interface InitializeResult { + protocolVersion: string; + capabilities: { + tools?: any; + [key: string]: any; + }; + serverInfo: { + name: string; + version: string; + }; + instructions?: string; +} + +interface ListToolsResult { + tools: Tool[]; + nextCursor?: string; +} + +interface ToolExecutionResult { + content?: Array<{ + type: 'text' | 'image' | 'audio' | 'resource'; + text?: string; + data?: string; + mimeType?: string; + resource?: any; + }>; + structuredContent?: Record; + isError?: boolean; +} + +class MinimalMCPClient { + private url: URL; + private accessToken: string; + private messageId = 0; + private sessionId?: string; + private isSSE = false; + private sseEndpoint?: URL; + private eventSource?: EventSource; + private pendingRequests = new Map< + string | number, + { + resolve: (value: any) => void; + reject: (error: Error) => void; + } + >(); + private sseConnectionResolve?: () => void; + private sseConnectionReject?: (error: Error) => void; + + constructor( + serverUrl: string, + accessToken: string, + options?: { messageEndpoint?: string } + ) { + this.url = new URL(serverUrl); + this.accessToken = accessToken; + + // Check if this looks like an SSE endpoint + this.isSSE = serverUrl.includes('/sse') || serverUrl.includes('sse'); + + // If custom message endpoint provided, use it + if (options?.messageEndpoint) { + this.sseEndpoint = new URL(options.messageEndpoint); + } + } + + private getNextMessageId(): number { + return ++this.messageId; + } + + private getAuthHeaders(): HeadersInit { + const headers: HeadersInit = { + Authorization: `Bearer ${this.accessToken}`, + }; + + if (!this.isSSE) { + headers['Content-Type'] = 'application/json'; + headers['Accept'] = 'application/json, text/event-stream'; + } + + return headers; + } + + private async initializeSSE(): Promise { + if (!this.isSSE) return; + + return new Promise((resolve, reject) => { + // Set up a timeout for the SSE connection + const timeout = setTimeout(() => { + reject(new Error('Timeout waiting for SSE endpoint from server')); + }, 10000); // 10 second timeout + + // Store resolve/reject to call when endpoint is received + const originalResolve = resolve; + const originalReject = reject; + + this.sseConnectionResolve = () => { + clearTimeout(timeout); + originalResolve(); + }; + + this.sseConnectionReject = (error: Error) => { + clearTimeout(timeout); + originalReject(error); + }; + + // Start the SSE connection + this.establishSSEConnection().catch(this.sseConnectionReject); + }); + } + + private async establishSSEConnection(): Promise { + const headers = new Headers(this.getAuthHeaders()); + headers.set('Accept', 'text/event-stream'); + + const response = await fetch(this.url, { + method: 'GET', + headers, + }); + + if (!response.ok) { + throw new Error( + `Failed to establish SSE connection: HTTP ${response.status}: ${response.statusText}` + ); + } + + // The server should send an 'endpoint' event with the POST URL + // We'll wait for this in the stream parser + this.sseEndpoint = undefined; + + // Parse SSE stream for endpoint information and responses + this.parseSSEStream(response); + } + + private parseSSEStream(response: Response): void { + if (!response.body) return; + + const reader = response.body + .pipeThrough(new TextDecoderStream()) + .getReader(); + + let buffer = ''; + let currentEvent = { + event: '', + data: '', + id: '', + }; + + const processStream = async () => { + try { + while (true) { + const { value, done } = await reader.read(); + if (done) break; + + buffer += value; + + // Process line by line + let lineEnd; + while ((lineEnd = buffer.indexOf('\n')) !== -1) { + const line = buffer.slice(0, lineEnd); + buffer = buffer.slice(lineEnd + 1); + + // Remove \r if present (for \r\n line endings) + const cleanLine = line.replace(/\r$/, ''); + + if (cleanLine === '') { + // Empty line = end of event, dispatch it + if (currentEvent.data || currentEvent.event) { + this.handleSSEEvent(currentEvent.event, currentEvent.data); + } + // Reset for next event + currentEvent = { event: '', data: '', id: '' }; + } else if (cleanLine.startsWith('event: ')) { + currentEvent.event = cleanLine.slice(7); + } else if (cleanLine.startsWith('data: ')) { + // Multiple data lines should be joined with \n + if (currentEvent.data) { + currentEvent.data += '\n' + cleanLine.slice(6); + } else { + currentEvent.data = cleanLine.slice(6); + } + } else if (cleanLine.startsWith('id: ')) { + currentEvent.id = cleanLine.slice(4); + } + // Ignore other fields like retry, etc. + } + } + } catch (error) { + console.error('SSE stream error:', error); + if (this.sseConnectionReject) { + this.sseConnectionReject(error as Error); + this.sseConnectionResolve = undefined; + this.sseConnectionReject = undefined; + } + } + }; + + processStream(); + } + + private handleSSEEvent(eventType: string, data: string): void { + if (eventType === 'endpoint') { + // Server is telling us the POST endpoint URL (usually includes sessionId) + try { + this.sseEndpoint = new URL(data, this.url); + // console.log('SSE POST endpoint received:', this.sseEndpoint.href); + + // Extract session ID from the endpoint URL if present + const sessionId = this.sseEndpoint.searchParams.get('sessionId'); + if (sessionId) { + this.sessionId = sessionId; + // console.log('Session ID extracted:', sessionId); + } + + // Resolve the SSE connection promise now that we have the endpoint + if (this.sseConnectionResolve) { + this.sseConnectionResolve(); + this.sseConnectionResolve = undefined; + this.sseConnectionReject = undefined; + } + } catch (error) { + console.warn('Invalid endpoint URL from SSE:', data, error); + if (this.sseConnectionReject) { + this.sseConnectionReject(new Error(`Invalid endpoint URL: ${data}`)); + this.sseConnectionResolve = undefined; + this.sseConnectionReject = undefined; + } + } + return; + } + + // Handle JSON-RPC responses (default event type or 'message') + if (!eventType || eventType === 'message') { + try { + // Try to parse as JSON + const jsonResponse: JSONRPCResponse = JSON.parse(data); + // console.log( + // 'Parsed JSON-RPC response:', + // jsonResponse.id, + // jsonResponse.error ? 'ERROR' : 'SUCCESS' + // ); + + const pending = this.pendingRequests.get(jsonResponse.id); + + if (pending) { + this.pendingRequests.delete(jsonResponse.id); + + if (jsonResponse.error) { + pending.reject( + new Error( + `MCP Error ${jsonResponse.error.code}: ${jsonResponse.error.message}` + ) + ); + } else { + pending.resolve(jsonResponse.result); + } + } else { + console.warn( + 'Received response for unknown request ID:', + jsonResponse.id + ); + } + } catch (error) { + console.error('Failed to parse JSON from SSE data:', error); + console.error('Raw data (first 500 chars):', data.substring(0, 500)); + console.error( + 'Raw data (last 100 chars):', + data.substring(Math.max(0, data.length - 100)) + ); + } + } + } + + private async sendRequest(request: JSONRPCRequest): Promise { + if (this.isSSE) { + return this.sendSSERequest(request); + } else { + return this.sendDirectRequest(request); + } + } + + private async sendSSERequest(request: JSONRPCRequest): Promise { + if (!this.sseEndpoint) { + throw new Error('SSE POST endpoint not yet received from server'); + } + + return new Promise((resolve, reject) => { + // Store the pending request + this.pendingRequests.set(request.id, { resolve, reject }); + + // Prepare POST URL - session ID should already be in the endpoint URL + const postUrl = new URL(this.sseEndpoint!.href); + + // Send POST request to the endpoint (session ID is in the URL) + const headers = new Headers(this.getAuthHeaders()); + headers.set('Content-Type', 'application/json'); + + fetch(postUrl, { + method: 'POST', + headers, + body: JSON.stringify(request), + }).catch((error) => { + this.pendingRequests.delete(request.id); + reject(new Error(`Failed to send SSE request: ${error.message}`)); + }); + + // Set timeout for the request + setTimeout(() => { + if (this.pendingRequests.has(request.id)) { + this.pendingRequests.delete(request.id); + reject(new Error('SSE request timeout')); + } + }, 30000); // 30 second timeout + }); + } + + private async sendDirectRequest(request: JSONRPCRequest): Promise { + const headers = new Headers(this.getAuthHeaders()); + + // Include session ID if we have one + if (this.sessionId) { + headers.set('mcp-session-id', this.sessionId); + } + + const response = await fetch(this.url, { + method: 'POST', + headers, + body: JSON.stringify(request), + }); + + // Capture session ID from response if present + const newSessionId = response.headers.get('mcp-session-id'); + if (newSessionId) { + this.sessionId = newSessionId; + } + + if (!response.ok) { + throw new Error(`HTTP ${response.status}: ${response.statusText}`); + } + + const contentType = response.headers.get('content-type'); + + if (contentType?.includes('application/json')) { + // Direct JSON response + const jsonResponse: JSONRPCResponse = await response.json(); + + if (jsonResponse.error) { + throw new Error( + `MCP Error ${jsonResponse.error.code}: ${jsonResponse.error.message}` + ); + } + + return jsonResponse.result; + } else if (contentType?.includes('text/event-stream')) { + // SSE response - we need to parse the stream + return this.parseSSEResponse(response); + } else { + throw new Error(`Unexpected content type: ${contentType}`); + } + } + + private async parseSSEResponse(response: Response): Promise { + if (!response.body) { + throw new Error('No response body for SSE stream'); + } + + const reader = response.body + .pipeThrough(new TextDecoderStream()) + .getReader(); + + try { + while (true) { + const { value, done } = await reader.read(); + if (done) break; + + // Parse SSE format + const lines = value.split('\n'); + for (const line of lines) { + if (line.startsWith('data: ')) { + const data = line.slice(6); + try { + const jsonResponse: JSONRPCResponse = JSON.parse(data); + + if (jsonResponse.error) { + throw new Error( + `MCP Error ${jsonResponse.error.code}: ${jsonResponse.error.message}` + ); + } + + return jsonResponse.result; + } catch (e) { + // Ignore parsing errors for non-JSON data lines + continue; + } + } + } + } + } finally { + reader.releaseLock(); + } + + throw new Error('No valid response received from SSE stream'); + } + + async initialize(): Promise { + // Initialize SSE connection if needed + if (this.isSSE) { + await this.initializeSSE(); + } + + const request: JSONRPCRequest = { + jsonrpc: '2.0', + id: this.getNextMessageId(), + method: 'initialize', + params: { + protocolVersion: '2025-03-26', + capabilities: {}, + clientInfo: { + name: 'minimal-mcp-client', + version: '1.0.0', + }, + }, + }; + + const result = await this.sendRequest(request); + + // Send initialized notification + await this.sendNotification('notifications/initialized'); + + return result; + } + + private async sendNotification(method: string, params?: any): Promise { + const notification = { + jsonrpc: '2.0' as const, + method, + params, + }; + + if (this.isSSE && this.sseEndpoint) { + // Send via SSE POST endpoint (session ID already in URL) + const headers = new Headers(this.getAuthHeaders()); + headers.set('Content-Type', 'application/json'); + + const response = await fetch(this.sseEndpoint, { + method: 'POST', + headers, + body: JSON.stringify(notification), + }); + + if (!response.ok) { + throw new Error( + `Failed to send SSE notification: HTTP ${response.status}` + ); + } + } else { + // Send via direct HTTP + const headers = new Headers(this.getAuthHeaders()); + + if (this.sessionId) { + headers.set('mcp-session-id', this.sessionId); + } + + const response = await fetch(this.url, { + method: 'POST', + headers, + body: JSON.stringify(notification), + }); + + if (!response.ok) { + throw new Error(`Failed to send notification: HTTP ${response.status}`); + } + } + } + + async listTools(): Promise { + const request: JSONRPCRequest = { + jsonrpc: '2.0', + id: this.getNextMessageId(), + method: 'tools/list', + }; + + const result: ListToolsResult = await this.sendRequest(request); + return result.tools; + } + + async executeTool( + name: string, + args?: Record + ): Promise { + const request: JSONRPCRequest = { + jsonrpc: '2.0', + id: this.getNextMessageId(), + method: 'tools/call', + params: { + name, + arguments: args || {}, + }, + }; + + const result = await this.sendRequest(request); + return result; + } + + async close(): Promise { + // Clean up pending requests + for (const [id, pending] of this.pendingRequests) { + pending.reject(new Error('Connection closed')); + } + this.pendingRequests.clear(); + + // Close EventSource if we have one + if (this.eventSource) { + this.eventSource.close(); + this.eventSource = undefined; + } + + // Attempt to terminate session if we have a session ID + if (this.sessionId && !this.isSSE) { + try { + const headers = new Headers(this.getAuthHeaders()); + headers.set('mcp-session-id', this.sessionId); + + await fetch(this.url, { + method: 'DELETE', + headers, + }); + } catch (error) { + // Ignore errors when terminating - server might not support it + console.warn('Failed to terminate session:', error); + } + } + } +} diff --git a/src/handlers/services/requestContext.ts b/src/handlers/services/requestContext.ts index 8cde148b0..c46bc3020 100644 --- a/src/handlers/services/requestContext.ts +++ b/src/handlers/services/requestContext.ts @@ -12,6 +12,7 @@ import { HEADER_KEYS, RETRY_STATUS_CODES } from '../../globals'; import { HookObject } from '../../middlewares/hooks/types'; import { HooksManager } from '../../middlewares/hooks'; import { transformToProviderRequest } from '../../services/transformToProviderRequest'; +import { LLMFunction } from './mcpService'; export class RequestContext { private _params: Params | null = null; @@ -218,4 +219,26 @@ export class RequestContext { requestOptions, ]); } + + shouldHandleMcp(): boolean { + return ( + !!this.params.mcp_servers && + this.params.mcp_servers.length > 0 && + this.endpoint === 'chatComplete' + ); + } + + addMcpTools(mcpTools: LLMFunction[]) { + if (mcpTools.length > 0) { + let newParams = { ...this.params }; + newParams.tools = [...(this.params.tools || []), ...mcpTools]; + this.params = newParams; + } + } + + updateMessages(messages: any[]) { + let newParams = { ...this.params }; + newParams.messages = messages; + this.params = newParams; + } } diff --git a/src/middlewares/cache/index.ts b/src/middlewares/cache/index.ts index e686ca103..b1945f0d3 100644 --- a/src/middlewares/cache/index.ts +++ b/src/middlewares/cache/index.ts @@ -35,13 +35,11 @@ export const getFromCache = async ( myText ); - // Convert arraybuffer to hex let cacheKey = Array.from(new Uint8Array(cacheDigest)) .map((b) => b.toString(16).padStart(2, '0')) .join(''); - // console.log("Get from cache", cacheKey, cacheKey in inMemoryCache, stringToHash); if (cacheKey in inMemoryCache) { @@ -105,7 +103,7 @@ export const memoryCache = () => { await next(); let requestOptions = c.get('requestOptions'); - console.log("requestOptions", requestOptions); + console.log('requestOptions', requestOptions); if ( requestOptions && diff --git a/src/types/requestBody.ts b/src/types/requestBody.ts index bb3d7d8ef..a8cd2396a 100644 --- a/src/types/requestBody.ts +++ b/src/types/requestBody.ts @@ -423,6 +423,18 @@ export interface Params { // Embeddings specific dimensions?: number; parameters?: any; + mcp_servers?: McpServerConfig[]; +} + +export interface McpServerConfig { + type: 'url' | 'local'; + url: string; + name: string; + authorization_token: string; + tool_configuration: { + enabled: boolean; + allowed_tools: string[]; + }; } interface Examples { From 855304afd59db544b6a742cb916a3a8c1861eb0c Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Fri, 6 Jun 2025 14:58:32 +0530 Subject: [PATCH 02/21] added mcp handlers --- src/handlers/services/mcpService.ts | 51 +++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 7 deletions(-) diff --git a/src/handlers/services/mcpService.ts b/src/handlers/services/mcpService.ts index 696283de4..9921d0027 100644 --- a/src/handlers/services/mcpService.ts +++ b/src/handlers/services/mcpService.ts @@ -19,7 +19,7 @@ export class McpService { if (client) { this.mcpConnections.set(server.name, client); let tools = await client.listTools(); - if (server.tool_configuration.enabled) { + if (server.tool_configuration?.enabled) { const allowedTools = server.tool_configuration.allowed_tools; tools = tools.filter((tool) => allowedTools.includes(tool.name)); } @@ -94,7 +94,12 @@ export class McpService { for (const [name, client] of this.mcpConnections) { try { // console.log('Closing MCP connection to', name); - await client.close(); + await Promise.race([ + client.close(), + new Promise((_, reject) => + setTimeout(() => reject(new Error('Close timeout')), 10000) + ), + ]); } catch (error) { console.error(`Error closing MCP connection to ${name}:`, error); // Continue closing other connections even if one fails @@ -251,6 +256,8 @@ class MinimalMCPClient { private isSSE = false; private sseEndpoint?: URL; private eventSource?: EventSource; + private abortController?: AbortController; // Add this + private streamReader?: ReadableStreamDefaultReader; // Add this private pendingRequests = new Map< string | number, { @@ -349,6 +356,9 @@ class MinimalMCPClient { private parseSSEStream(response: Response): void { if (!response.body) return; + // Create abort controller for this stream + this.abortController = new AbortController(); + const reader = response.body .pipeThrough(new TextDecoderStream()) .getReader(); @@ -366,6 +376,12 @@ class MinimalMCPClient { const { value, done } = await reader.read(); if (done) break; + // Check if aborted + if (this.abortController?.signal.aborted) { + console.log('SSE stream processing aborted'); + break; + } + buffer += value; // Process line by line @@ -400,11 +416,19 @@ class MinimalMCPClient { } } } catch (error) { - console.error('SSE stream error:', error); - if (this.sseConnectionReject) { - this.sseConnectionReject(error as Error); - this.sseConnectionResolve = undefined; - this.sseConnectionReject = undefined; + if (!this.abortController?.signal.aborted) { + console.error('SSE stream error:', error); + if (this.sseConnectionReject) { + this.sseConnectionReject(error as Error); + this.sseConnectionResolve = undefined; + this.sseConnectionReject = undefined; + } + } + } finally { + try { + reader.releaseLock(); + } catch (e) { + // Reader might already be released } } }; @@ -729,16 +753,29 @@ class MinimalMCPClient { this.eventSource = undefined; } + // For SSE connections, we need to abort any ongoing fetch operations + if (this.isSSE && this.abortController) { + console.log('Aborting SSE connection...'); + this.abortController.abort(); + } + // Attempt to terminate session if we have a session ID if (this.sessionId && !this.isSSE) { try { const headers = new Headers(this.getAuthHeaders()); headers.set('mcp-session-id', this.sessionId); + // Add a timeout to prevent hanging + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), 5000); // 5 second timeout + await fetch(this.url, { method: 'DELETE', headers, + signal: controller.signal, }); + + clearTimeout(timeoutId); } catch (error) { // Ignore errors when terminating - server might not support it console.warn('Failed to terminate session:', error); From d8f62256a6cb0623834c1cb41dfd215ba1c02307 Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Fri, 6 Jun 2025 15:08:16 +0530 Subject: [PATCH 03/21] WIP handlerUtils with mcp --- src/handlers/handlerUtils.ts | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/src/handlers/handlerUtils.ts b/src/handlers/handlerUtils.ts index d3c153378..26a97e276 100644 --- a/src/handlers/handlerUtils.ts +++ b/src/handlers/handlerUtils.ts @@ -459,6 +459,7 @@ export async function tryPost( providerContext, hooksService, mcpService || undefined + logsService || undefined ); return responseService.create({ @@ -1148,6 +1149,7 @@ export async function recursiveAfterRequestHookHandler( providerContext: ProviderContext, hooksService: HooksService, mcpService?: McpService + logsService?: LogsService ): Promise<{ mappedResponse: Response; retryCount: number; @@ -1208,13 +1210,15 @@ export async function recursiveAfterRequestHookHandler( if ( mcpService && + logsService && !isStreamingMode && mappedResponseJson?.choices?.[0]?.message?.tool_calls?.[0] ) { const mcpResult = await handleMcpToolCalls( requestContext, mappedResponseJson, - mcpService + mcpService, + logsService ); if (mcpResult.success) { // TODO: the hookspan context might need to be updated here. @@ -1239,7 +1243,8 @@ export async function recursiveAfterRequestHookHandler( hookSpanId, providerContext, hooksService, - mcpService + mcpService, + logsService ); } else { // MCP failed, log and continue with current response @@ -1465,7 +1470,8 @@ export async function beforeRequestHookHandler( async function handleMcpToolCalls( requestContext: RequestContext, responseJson: any, - mcpService: McpService + mcpService: McpService, + logsService: LogsService ): Promise<{ success: boolean; error?: string }> { try { const toolCalls = responseJson.choices[0].message.tool_calls; @@ -1476,22 +1482,33 @@ async function handleMcpToolCalls( // Execute each tool call and add results to conversation for (const toolCall of toolCalls) { + const startTimeUnixNano = new Date().getTime(); try { const toolResult = await mcpService.executeTool( toolCall.function.name, JSON.parse(toolCall.function.arguments) ); + const endTimeUnixNano = new Date().getTime(); conversation.push({ role: 'tool', tool_call_id: toolCall.id, content: JSON.stringify(toolResult), }); + const toolCallSpan = logsService.createExecuteToolSpan( + toolCall, + toolResult.content, + startTimeUnixNano, + endTimeUnixNano, + requestContext.traceId + ); + logsService.addRequestLog(toolCallSpan); } catch (toolError: any) { console.error( `MCP tool call failed for ${toolCall.function.name}:`, toolError ); // Add error message as tool response + const endTimeUnixNano = new Date().getTime(); conversation.push({ role: 'tool', tool_call_id: toolCall.id, @@ -1500,6 +1517,14 @@ async function handleMcpToolCalls( details: toolError.message, }), }); + const toolCallSpan = logsService.createExecuteToolSpan( + toolCall, + { error: toolError.message }, + startTimeUnixNano, + endTimeUnixNano, + requestContext.traceId + ); + logsService.addRequestLog(toolCallSpan); } } From 648175285fb3630079378017ceca609cbc63eec4 Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Fri, 6 Jun 2025 15:08:28 +0530 Subject: [PATCH 04/21] WIP handlerUtils with mcp --- src/handlers/handlerUtils.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/handlers/handlerUtils.ts b/src/handlers/handlerUtils.ts index 26a97e276..d0f0ccbde 100644 --- a/src/handlers/handlerUtils.ts +++ b/src/handlers/handlerUtils.ts @@ -458,7 +458,7 @@ export async function tryPost( hookSpan.id, providerContext, hooksService, - mcpService || undefined + mcpService || undefined, logsService || undefined ); @@ -1148,7 +1148,7 @@ export async function recursiveAfterRequestHookHandler( hookSpanId: string, providerContext: ProviderContext, hooksService: HooksService, - mcpService?: McpService + mcpService?: McpService, logsService?: LogsService ): Promise<{ mappedResponse: Response; From 613329df8113ebeba939c690c107e7398c4fe2ee Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Fri, 6 Jun 2025 15:09:08 +0530 Subject: [PATCH 05/21] handle local log for the otlp_span --- src/middlewares/log/index.ts | 58 ++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/src/middlewares/log/index.ts b/src/middlewares/log/index.ts index 6eb01c8b2..a28dbc47a 100644 --- a/src/middlewares/log/index.ts +++ b/src/middlewares/log/index.ts @@ -62,33 +62,41 @@ async function processLog(c: Context, start: number) { return; } - try { - // console.log('requestOptionsArray', requestOptionsArray); - const response = requestOptionsArray[0].requestParams.stream - ? { message: 'The response was a stream.' } - : await c.res.clone().json(); - - const responseString = JSON.stringify(response); - if (responseString.length > MAX_RESPONSE_LENGTH) { - requestOptionsArray[0].response = - responseString.substring(0, MAX_RESPONSE_LENGTH) + '...'; - } else { - requestOptionsArray[0].response = response; + for (const requestOption of requestOptionsArray) { + if (requestOption.type === 'otlp_span') { + console.log('otlp_span', JSON.stringify(requestOption)); + continue; } - } catch (error) { - console.error('Error processing log:', error); - } - await broadcastLog( - JSON.stringify({ - time: new Date().toLocaleString(), - method: c.req.method, - endpoint: c.req.url.split(':8787')[1], - status: c.res.status, - duration: ms, - requestOptions: requestOptionsArray, - }) - ); + console.log(requestOption.type || 'requestOption', requestOption); + + try { + const response = requestOption.requestParams.stream + ? { message: 'The response was a stream.' } + : await c.res.clone().json(); + + const responseString = JSON.stringify(response); + if (responseString.length > MAX_RESPONSE_LENGTH) { + requestOption.response = + responseString.substring(0, MAX_RESPONSE_LENGTH) + '...'; + } else { + requestOption.response = response; + } + } catch (error) { + console.error('Error processing log:', error); + } + + await broadcastLog( + JSON.stringify({ + time: new Date().toLocaleString(), + method: c.req.method, + endpoint: c.req.url.split(':8787')[1], + status: c.res.status, + duration: ms, + requestOptions: requestOption, + }) + ); + } } export const logger = () => { From 04cf2ecc990e6a95a1df915b9e3becde5b7010ed Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Fri, 6 Jun 2025 15:09:41 +0530 Subject: [PATCH 06/21] imporve ToolCall type def --- src/types/requestBody.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/types/requestBody.ts b/src/types/requestBody.ts index a8cd2396a..28e46253b 100644 --- a/src/types/requestBody.ts +++ b/src/types/requestBody.ts @@ -250,6 +250,7 @@ export interface ToolCall { function: { name: string; arguments: string; + description?: string; }; } From 992b01ee98d50cfe25ee7aa2a94db9b1165032a8 Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Mon, 23 Jun 2025 17:49:19 +0530 Subject: [PATCH 07/21] remove 'using' --- src/handlers/handlerUtils.ts | 226 +++++++++++++++------------- src/handlers/services/mcpService.ts | 54 ------- 2 files changed, 119 insertions(+), 161 deletions(-) diff --git a/src/handlers/handlerUtils.ts b/src/handlers/handlerUtils.ts index e3986cb78..0f8380c15 100644 --- a/src/handlers/handlerUtils.ts +++ b/src/handlers/handlerUtils.ts @@ -401,129 +401,141 @@ export async function tryPost( } // Initialize MCP service if needed - await using mcpService = requestContext.shouldHandleMcp() + const mcpService = requestContext.shouldHandleMcp() ? new McpService(requestContext) : null; - if (mcpService) { - await mcpService.init(); - // Add MCP tools to the request - const mcpTools = mcpService.tools; - requestContext.addMcpTools(mcpTools); - } - // Attach the body of the request - if (!providerContext.hasRequestHandler(requestContext)) { - requestContext.transformToProviderRequestAndSave(); - } + try { + if (mcpService) { + await mcpService.init(); + // Add MCP tools to the request + const mcpTools = mcpService.tools; + requestContext.addMcpTools(mcpTools); + } - // Construct the base object for the request - const providerMappedHeaders = - await providerContext.getHeaders(requestContext); - const fetchOptions: RequestInit = constructRequest( - providerMappedHeaders, - requestContext - ); + // Attach the body of the request + if (!providerContext.hasRequestHandler(requestContext)) { + requestContext.transformToProviderRequestAndSave(); + } - // Cache Handler - const cacheService = new CacheService(c, hooksService); - const cacheResponseObject: CacheResponseObject = - await cacheService.getCachedResponse( - requestContext, - fetchOptions.headers || {} + // Construct the base object for the request + const providerMappedHeaders = + await providerContext.getHeaders(requestContext); + const fetchOptions: RequestInit = constructRequest( + providerMappedHeaders, + requestContext ); - logObject.addCache( - cacheResponseObject.cacheStatus, - cacheResponseObject.cacheKey - ); - if (cacheResponseObject.cacheResponse) { - const { response, originalResponseJson } = await responseService.create({ - response: cacheResponseObject.cacheResponse, - responseTransformer: requestContext.endpoint, - cache: { - isCacheHit: true, - cacheStatus: cacheResponseObject.cacheStatus, - cacheKey: cacheResponseObject.cacheKey, - }, - isResponseAlreadyMapped: true, - retryAttempt: 0, - fetchOptions, - createdAt: cacheResponseObject.createdAt, - executionTime: 0, - }); - logObject - .updateRequestContext(requestContext, fetchOptions.headers) - .addResponse(response, originalResponseJson) - .log(); + // Cache Handler + const cacheService = new CacheService(c, hooksService); + const cacheResponseObject: CacheResponseObject = + await cacheService.getCachedResponse( + requestContext, + fetchOptions.headers || {} + ); + logObject.addCache( + cacheResponseObject.cacheStatus, + cacheResponseObject.cacheKey + ); + if (cacheResponseObject.cacheResponse) { + const { response, originalResponseJson } = await responseService.create({ + response: cacheResponseObject.cacheResponse, + responseTransformer: requestContext.endpoint, + cache: { + isCacheHit: true, + cacheStatus: cacheResponseObject.cacheStatus, + cacheKey: cacheResponseObject.cacheKey, + }, + isResponseAlreadyMapped: true, + retryAttempt: 0, + fetchOptions, + createdAt: cacheResponseObject.createdAt, + executionTime: 0, + }); - return response; - } + logObject + .updateRequestContext(requestContext, fetchOptions.headers) + .addResponse(response, originalResponseJson) + .log(); - // Prerequest validator (For virtual key budgets) - const preRequestValidatorService = new PreRequestValidatorService( - c, - requestContext - ); - const preRequestValidatorResponse = - await preRequestValidatorService.getResponse(); - if (preRequestValidatorResponse) { - const { response, originalResponseJson } = await responseService.create({ - response: preRequestValidatorResponse, - responseTransformer: undefined, - isResponseAlreadyMapped: false, - cache: { - isCacheHit: false, - cacheStatus: cacheResponseObject.cacheStatus, - cacheKey: cacheResponseObject.cacheKey, - }, - retryAttempt: 0, - fetchOptions, - createdAt: new Date(), - }); + return response; + } + + // Prerequest validator (For virtual key budgets) + const preRequestValidatorService = new PreRequestValidatorService( + c, + requestContext + ); + const preRequestValidatorResponse = + await preRequestValidatorService.getResponse(); + if (preRequestValidatorResponse) { + const { response, originalResponseJson } = await responseService.create({ + response: preRequestValidatorResponse, + responseTransformer: undefined, + isResponseAlreadyMapped: false, + cache: { + isCacheHit: false, + cacheStatus: cacheResponseObject.cacheStatus, + cacheKey: cacheResponseObject.cacheKey, + }, + retryAttempt: 0, + fetchOptions, + createdAt: new Date(), + }); + + logObject + .updateRequestContext(requestContext, fetchOptions.headers) + .addResponse(response, originalResponseJson) + .log(); + + return response; + } + + // Request Handler (Including retries, recursion and hooks) + const { mappedResponse, retryCount, createdAt, originalResponseJson } = + await recursiveAfterRequestHookHandler( + requestContext, + fetchOptions, + 0, + hookSpan.id, + providerContext, + hooksService, + logObject, + mcpService || undefined + ); + + const { response, originalResponseJson: mappedOriginalResponseJson } = + await responseService.create({ + response: mappedResponse, + responseTransformer: undefined, + isResponseAlreadyMapped: true, + cache: { + isCacheHit: false, + cacheStatus: cacheResponseObject.cacheStatus, + cacheKey: cacheResponseObject.cacheKey, + }, + retryAttempt: retryCount, + fetchOptions, + createdAt, + originalResponseJson, + }); logObject .updateRequestContext(requestContext, fetchOptions.headers) - .addResponse(response, originalResponseJson) + .addResponse(response, mappedOriginalResponseJson) .log(); return response; + } finally { + // Clean up MCP service + if (mcpService) { + try { + await mcpService[Symbol.asyncDispose](); + } catch (error) { + console.error('Error disposing MCP service:', error); + } + } } - - // Request Handler (Including retries, recursion and hooks) - const { mappedResponse, retryCount, createdAt, originalResponseJson } = - await recursiveAfterRequestHookHandler( - requestContext, - fetchOptions, - 0, - hookSpan.id, - providerContext, - hooksService, - logObject, - mcpService || undefined - ); - - const { response, originalResponseJson: mappedOriginalResponseJson } = - await responseService.create({ - response: mappedResponse, - responseTransformer: undefined, - isResponseAlreadyMapped: true, - cache: { - isCacheHit: false, - cacheStatus: cacheResponseObject.cacheStatus, - cacheKey: cacheResponseObject.cacheKey, - }, - retryAttempt: retryCount, - fetchOptions, - createdAt, - originalResponseJson, - }); - - logObject - .updateRequestContext(requestContext, fetchOptions.headers) - .addResponse(response, mappedOriginalResponseJson) - .log(); - - return response; } export async function tryTargetsRecursively( diff --git a/src/handlers/services/mcpService.ts b/src/handlers/services/mcpService.ts index 9921d0027..dd120838f 100644 --- a/src/handlers/services/mcpService.ts +++ b/src/handlers/services/mcpService.ts @@ -113,60 +113,6 @@ export class McpService { } } -// Minimal MCP Client for fetching tools from remote servers -// Supports both StreamableHTTP and SSE transports - -interface JSONRPCRequest { - jsonrpc: '2.0'; - id: string | number; - method: string; - params?: any; -} - -interface JSONRPCResponse { - jsonrpc: '2.0'; - id: string | number; - result?: any; - error?: { - code: number; - message: string; - data?: any; - }; -} - -interface Tool { - name: string; - description?: string; - inputSchema: { - type: 'object'; - properties?: Record; - required?: string[]; - }; - outputSchema?: { - type: 'object'; - properties?: Record; - required?: string[]; - }; -} - -interface InitializeResult { - protocolVersion: string; - capabilities: { - tools?: any; - [key: string]: any; - }; - serverInfo: { - name: string; - version: string; - }; - instructions?: string; -} - -interface ListToolsResult { - tools: Tool[]; - nextCursor?: string; -} - // LLM Function Call format (OpenAI-style) export interface LLMFunction { type: 'function'; From 3fd787e1767fbe453802961f8671871c9685cc01 Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Mon, 23 Jun 2025 19:33:03 +0530 Subject: [PATCH 08/21] Merge branch 'feat/cleanup-handlerutils' into feat/mcp-agent --- src/handlers/handlerUtils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handlers/handlerUtils.ts b/src/handlers/handlerUtils.ts index 930d6d5d5..bc6ef2af3 100644 --- a/src/handlers/handlerUtils.ts +++ b/src/handlers/handlerUtils.ts @@ -369,7 +369,7 @@ export async function tryPost( const mcpTools = mcpService.tools; requestContext.addMcpTools(mcpTools); } - + // Attach the body of the request if (!providerContext.hasRequestHandler(requestContext)) { requestContext.transformToProviderRequestAndSave(); From e88ab70b39fe52592a887c5d8b5187ebcc0b79c7 Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Wed, 25 Jun 2025 02:18:57 +0530 Subject: [PATCH 09/21] Support for sending mcp servers as tools --- src/handlers/services/mcpService.ts | 39 +++++++++++------- src/handlers/services/requestContext.ts | 55 +++++++++++++++++++++++-- src/types/requestBody.ts | 23 ++++++++++- 3 files changed, 96 insertions(+), 21 deletions(-) diff --git a/src/handlers/services/mcpService.ts b/src/handlers/services/mcpService.ts index dd120838f..fc899b64a 100644 --- a/src/handlers/services/mcpService.ts +++ b/src/handlers/services/mcpService.ts @@ -1,4 +1,4 @@ -import { McpServerConfig, ToolCall } from '../../types/requestBody'; +import { McpServer, McpServerConfig, ToolCall } from '../../types/requestBody'; import { RequestContext } from './requestContext'; // services/mcpService.ts @@ -9,7 +9,7 @@ export class McpService { constructor(private requestContext: RequestContext) {} async init(): Promise { - const mcpServers = this.requestContext.params.mcp_servers; + const mcpServers: McpServer[] = this.requestContext.mcpServers; if (!mcpServers) { return; } @@ -17,17 +17,24 @@ export class McpService { try { const client = await this.connectToMcpServer(server); if (client) { - this.mcpConnections.set(server.name, client); + this.mcpConnections.set(server.server_label, client); let tools = await client.listTools(); - if (server.tool_configuration?.enabled) { - const allowedTools = server.tool_configuration.allowed_tools; - tools = tools.filter((tool) => allowedTools.includes(tool.name)); + if (server.allowed_tools && server.allowed_tools.length) { + tools = tools.filter((tool) => + server.allowed_tools!.includes(tool.name) + ); } - const llmTools = this.transformToolsForLLM(server.name, tools); - this.mcpTools.set(server.name, llmTools); + const llmTools = this.transformToolsForLLM( + server.server_label, + tools + ); + this.mcpTools.set(server.server_label, llmTools); } } catch (error) { - console.error(`Error connecting to MCP server ${server.url}:`, error); + console.error( + `Error connecting to MCP server ${server.server_url}:`, + error + ); } } return; @@ -77,12 +84,12 @@ export class McpService { } private async connectToMcpServer( - config: McpServerConfig + server: McpServer ): Promise { - const client = new MinimalMCPClient(config.url, config.authorization_token); + const client = new MinimalMCPClient(server.server_url, server.headers); const result = await client.initialize(); if (!result.capabilities.tools) { - throw new Error(`MCP server ${config.url} does not support tools`); + throw new Error(`MCP server ${server.server_url} does not support tools`); return null; } return client; @@ -196,7 +203,7 @@ interface ToolExecutionResult { class MinimalMCPClient { private url: URL; - private accessToken: string; + private headers: Record; private messageId = 0; private sessionId?: string; private isSSE = false; @@ -216,11 +223,11 @@ class MinimalMCPClient { constructor( serverUrl: string, - accessToken: string, + headers?: Record, options?: { messageEndpoint?: string } ) { this.url = new URL(serverUrl); - this.accessToken = accessToken; + this.headers = headers || {}; // Check if this looks like an SSE endpoint this.isSSE = serverUrl.includes('/sse') || serverUrl.includes('sse'); @@ -237,7 +244,7 @@ class MinimalMCPClient { private getAuthHeaders(): HeadersInit { const headers: HeadersInit = { - Authorization: `Bearer ${this.accessToken}`, + ...(this.headers || {}), }; if (!this.isSSE) { diff --git a/src/handlers/services/requestContext.ts b/src/handlers/services/requestContext.ts index 8b479cf6e..9053aecbc 100644 --- a/src/handlers/services/requestContext.ts +++ b/src/handlers/services/requestContext.ts @@ -6,6 +6,8 @@ import { Options, Params, RetrySettings, + McpServer, + McpTool, } from '../../types/requestBody'; import { endpointStrings } from '../../providers/types'; import { HEADER_KEYS, RETRY_STATUS_CODES } from '../../globals'; @@ -230,13 +232,60 @@ export class RequestContext { } shouldHandleMcp(): boolean { + // Should handle MCP if there are MCP servers and the endpoint is chatComplete + // or if there are tools of type `mcp` + const hasMcpTools = + this.params.tools?.some((tool) => tool.type === 'mcp') ?? false; return ( - !!this.params.mcp_servers && - this.params.mcp_servers.length > 0 && - this.endpoint === 'chatComplete' + (!!this.params.mcp_servers && + this.params.mcp_servers.length > 0 && + this.endpoint === 'chatComplete') || + hasMcpTools ); } + get mcpServers(): McpServer[] { + const mcpServers: McpServer[] = []; + if (!!this.params.mcp_servers) { + mcpServers.push( + ...this.params.mcp_servers.map((server) => ({ + type: 'mcp', + server_url: server.url, + server_label: server.name, + ...(server.tool_configuration && { + allowed_tools: server.tool_configuration.allowed_tools, + }), + ...(server.authorization_token && { + headers: { + Authorization: `Bearer ${server.authorization_token}`, + }, + }), + })) + ); + } + + if (!!this.params.tools) { + mcpServers.push( + ...( + this.params.tools.filter((tool) => tool.type === 'mcp') as McpTool[] + ).map((tool) => ({ + type: 'mcp', + server_url: tool.server_url, + server_label: tool.server_label, + ...(tool.allowed_tools && { + allowed_tools: tool.allowed_tools, + }), + ...(tool.require_approval && { + require_approval: tool.require_approval, + }), + ...(tool.headers && { headers: tool.headers }), + })) + ); + } + + return mcpServers; + } + addMcpTools(mcpTools: LLMFunction[]) { if (mcpTools.length > 0) { let newParams = { ...this.params }; diff --git a/src/types/requestBody.ts b/src/types/requestBody.ts index d2c4f75e5..512b2aa66 100644 --- a/src/types/requestBody.ts +++ b/src/types/requestBody.ts @@ -355,7 +355,7 @@ export type ToolChoice = ToolChoiceObject | 'none' | 'auto' | 'required'; */ export interface Tool extends PromptCache { /** The name of the function. */ - type: string; + type: Exclude; /** A description of the function. */ function: Function; computer?: { @@ -394,7 +394,7 @@ export interface Params { context?: string; examples?: Examples[]; top_k?: number; - tools?: Tool[]; + tools?: (Tool | McpTool)[]; tool_choice?: ToolChoice; response_format?: { type: 'json_object' | 'text' | 'json_schema'; @@ -445,6 +445,25 @@ export interface McpServerConfig { }; } +// A type of tool that is an MCP server +export interface McpTool { + type: 'mcp'; + server_url: string; + server_label: string; + allowed_tools?: string[]; + require_approval?: 'never' | 'always'; + headers?: Record; +} + +// Used to store MCP servers in the request context +export interface McpServer { + server_url: string; + server_label: string; + allowed_tools?: string[]; + require_approval?: 'never' | 'always'; + headers?: Record; +} + interface Examples { input?: Message; output?: Message; From 31d583fc45abc3b79cdd110fbe6db6e58855cff6 Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Wed, 25 Jun 2025 13:17:52 +0530 Subject: [PATCH 10/21] chore: support MCP tools --- src/handlers/services/requestContext.ts | 91 +++++++++++++------------ 1 file changed, 48 insertions(+), 43 deletions(-) diff --git a/src/handlers/services/requestContext.ts b/src/handlers/services/requestContext.ts index 9053aecbc..1f2aafb10 100644 --- a/src/handlers/services/requestContext.ts +++ b/src/handlers/services/requestContext.ts @@ -8,6 +8,7 @@ import { RetrySettings, McpServer, McpTool, + McpServerConfig, } from '../../types/requestBody'; import { endpointStrings } from '../../providers/types'; import { HEADER_KEYS, RETRY_STATUS_CODES } from '../../globals'; @@ -232,55 +233,59 @@ export class RequestContext { } shouldHandleMcp(): boolean { - // Should handle MCP if there are MCP servers and the endpoint is chatComplete - // or if there are tools of type `mcp` - const hasMcpTools = - this.params.tools?.some((tool) => tool.type === 'mcp') ?? false; - return ( - (!!this.params.mcp_servers && - this.params.mcp_servers.length > 0 && - this.endpoint === 'chatComplete') || - hasMcpTools - ); + // MCP applies only to chatComplete requests + if (this.endpoint !== 'chatComplete') return false; + + const { mcp_servers = [], tools = [] } = this.params ?? {}; + + if (mcp_servers.length > 0) return true; + + return tools.some((tool) => tool.type === 'mcp'); } get mcpServers(): McpServer[] { + const { mcp_servers = [], tools = [] } = this.params ?? {}; + if (mcp_servers.length === 0 && tools.length === 0) return []; + const mcpServers: McpServer[] = []; - if (!!this.params.mcp_servers) { - mcpServers.push( - ...this.params.mcp_servers.map((server) => ({ - type: 'mcp', - server_url: server.url, - server_label: server.name, - ...(server.tool_configuration && { - allowed_tools: server.tool_configuration.allowed_tools, - }), - ...(server.authorization_token && { - headers: { - Authorization: `Bearer ${server.authorization_token}`, - }, - }), - })) - ); + + if (mcp_servers) { + for (const srv of mcp_servers) { + // Build the one object you actually need + const entry: McpServer = { + server_url: srv.url, + server_label: srv.name, + }; + + // Optional pieces, added only when present — no throw-away spreads + const tc = srv.tool_configuration; + if (tc?.allowed_tools) entry.allowed_tools = tc.allowed_tools; + + if (srv.authorization_token) { + entry.headers = { Authorization: `Bearer ${srv.authorization_token}` }; + } + + mcpServers.push(entry); + } } - if (!!this.params.tools) { - mcpServers.push( - ...( - this.params.tools.filter((tool) => tool.type === 'mcp') as McpTool[] - ).map((tool) => ({ - type: 'mcp', - server_url: tool.server_url, - server_label: tool.server_label, - ...(tool.allowed_tools && { - allowed_tools: tool.allowed_tools, - }), - ...(tool.require_approval && { - require_approval: tool.require_approval, - }), - ...(tool.headers && { headers: tool.headers }), - })) - ); + if (tools) { + for (const tool of tools) { + if (tool.type !== 'mcp') continue; + + //typecast tool to McpTool + const mcpTool = tool as McpTool; + + const entry: McpServer = { + server_url: mcpTool.server_url, + server_label: mcpTool.server_label, + }; + if (mcpTool.allowed_tools) entry.allowed_tools = mcpTool.allowed_tools; + if (mcpTool.require_approval) entry.require_approval = mcpTool.require_approval; + if (mcpTool.headers) entry.headers = mcpTool.headers; + + mcpServers.push(entry); + } } return mcpServers; From 1f1015b909a87e00e84931bdd324404193ae39ba Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Wed, 25 Jun 2025 13:18:00 +0530 Subject: [PATCH 11/21] chore: support MCP tools --- src/handlers/services/requestContext.ts | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/handlers/services/requestContext.ts b/src/handlers/services/requestContext.ts index 1f2aafb10..8012d8aee 100644 --- a/src/handlers/services/requestContext.ts +++ b/src/handlers/services/requestContext.ts @@ -253,18 +253,20 @@ export class RequestContext { for (const srv of mcp_servers) { // Build the one object you actually need const entry: McpServer = { - server_url: srv.url, + server_url: srv.url, server_label: srv.name, }; - + // Optional pieces, added only when present — no throw-away spreads const tc = srv.tool_configuration; if (tc?.allowed_tools) entry.allowed_tools = tc.allowed_tools; - + if (srv.authorization_token) { - entry.headers = { Authorization: `Bearer ${srv.authorization_token}` }; + entry.headers = { + Authorization: `Bearer ${srv.authorization_token}`, + }; } - + mcpServers.push(entry); } } @@ -277,13 +279,14 @@ export class RequestContext { const mcpTool = tool as McpTool; const entry: McpServer = { - server_url: mcpTool.server_url, + server_url: mcpTool.server_url, server_label: mcpTool.server_label, }; - if (mcpTool.allowed_tools) entry.allowed_tools = mcpTool.allowed_tools; - if (mcpTool.require_approval) entry.require_approval = mcpTool.require_approval; - if (mcpTool.headers) entry.headers = mcpTool.headers; - + if (mcpTool.allowed_tools) entry.allowed_tools = mcpTool.allowed_tools; + if (mcpTool.require_approval) + entry.require_approval = mcpTool.require_approval; + if (mcpTool.headers) entry.headers = mcpTool.headers; + mcpServers.push(entry); } } From f1c1c60f7a0f551bd292a089bb11fa29c31f53b0 Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Fri, 18 Jul 2025 19:29:01 +0530 Subject: [PATCH 12/21] Allow parallel tool calls --- src/handlers/handlerUtils.ts | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/src/handlers/handlerUtils.ts b/src/handlers/handlerUtils.ts index 21c184535..b9957a845 100644 --- a/src/handlers/handlerUtils.ts +++ b/src/handlers/handlerUtils.ts @@ -1358,13 +1358,12 @@ export async function beforeRequestHookHandler( }; } -// handlerUtils.ts async function handleMcpToolCalls( requestContext: RequestContext, responseJson: any, mcpService: McpService ): Promise<{ success: boolean; error?: string }> { - let logsService = new LogsService(requestContext.honoContext); + const logsService = new LogsService(requestContext.honoContext); try { const toolCalls = responseJson.choices[0].message.tool_calls; @@ -1373,20 +1372,21 @@ async function handleMcpToolCalls( // Add assistant's response with tool calls to conversation conversation.push(responseJson.choices[0].message); - // Execute each tool call and add results to conversation - for (const toolCall of toolCalls) { + // Execute all tool calls in parallel for better performance + const toolCallPromises = toolCalls.map(async (toolCall: any) => { const start = new Date().getTime(); + try { const toolResult = await mcpService.executeTool( toolCall.function.name, JSON.parse(toolCall.function.arguments) ); - conversation.push({ + const toolResponse = { role: 'tool', tool_call_id: toolCall.id, content: JSON.stringify(toolResult), - }); + }; const toolCallSpan = logsService.createExecuteToolSpan( toolCall, @@ -1397,20 +1397,22 @@ async function handleMcpToolCalls( ); logsService.addRequestLog(toolCallSpan); + + return toolResponse; } catch (toolError: any) { console.error( `MCP tool call failed for ${toolCall.function.name}:`, toolError ); - // Add error message as tool response - conversation.push({ + + const errorResponse = { role: 'tool', tool_call_id: toolCall.id, content: JSON.stringify({ error: 'Tool execution failed', details: toolError.message, }), - }); + }; const toolCallSpan = logsService.createExecuteToolSpan( toolCall, @@ -1421,8 +1423,16 @@ async function handleMcpToolCalls( ); logsService.addRequestLog(toolCallSpan); + + return errorResponse; } - } + }); + + // Wait for all tool calls to complete + const toolResponses = await Promise.all(toolCallPromises); + + // Add all tool responses to conversation + conversation.push(...toolResponses); // Update the existing context requestContext.updateMessages(conversation); From 3cc3aa3abf5654c0af24faed30e0d64d52c2fbfe Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Fri, 18 Jul 2025 19:55:12 +0530 Subject: [PATCH 13/21] Cleanup --- src/handlers/services/mcpService.ts | 4 ++-- src/handlers/services/requestContext.ts | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/handlers/services/mcpService.ts b/src/handlers/services/mcpService.ts index fc899b64a..134dc37aa 100644 --- a/src/handlers/services/mcpService.ts +++ b/src/handlers/services/mcpService.ts @@ -331,7 +331,7 @@ class MinimalMCPClient { // Check if aborted if (this.abortController?.signal.aborted) { - console.log('SSE stream processing aborted'); + // console.log('SSE stream processing aborted'); break; } @@ -708,7 +708,7 @@ class MinimalMCPClient { // For SSE connections, we need to abort any ongoing fetch operations if (this.isSSE && this.abortController) { - console.log('Aborting SSE connection...'); + // console.log('Aborting SSE connection...'); this.abortController.abort(); } diff --git a/src/handlers/services/requestContext.ts b/src/handlers/services/requestContext.ts index 8012d8aee..a4569a216 100644 --- a/src/handlers/services/requestContext.ts +++ b/src/handlers/services/requestContext.ts @@ -297,7 +297,9 @@ export class RequestContext { addMcpTools(mcpTools: LLMFunction[]) { if (mcpTools.length > 0) { let newParams = { ...this.params }; + // Remove any existing tool with type `mcp` newParams.tools = [...(this.params.tools || []), ...mcpTools]; + newParams.tools = newParams.tools?.filter((tool) => tool.type !== 'mcp'); this.params = newParams; } } From 1f612dc87c53186dd5ff874cee40ad9d825912f7 Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Fri, 18 Jul 2025 19:59:59 +0530 Subject: [PATCH 14/21] Handle mcp tool transformations in Anthropic and Bedrock providers. --- src/providers/anthropic/chatComplete.ts | 8 +++++++- src/providers/bedrock/chatComplete.ts | 5 +++++ src/providers/bedrock/uploadFileUtils.ts | 5 +++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/providers/anthropic/chatComplete.ts b/src/providers/anthropic/chatComplete.ts index 9d5a97367..e1df8ce92 100644 --- a/src/providers/anthropic/chatComplete.ts +++ b/src/providers/anthropic/chatComplete.ts @@ -5,6 +5,8 @@ import { ContentType, SYSTEM_MESSAGE_ROLES, PromptCache, + McpTool, + Tool, } from '../../types/requestBody'; import { ChatCompletionResponse, @@ -347,7 +349,11 @@ export const AnthropicChatCompleteConfig: ProviderConfig = { transform: (params: Params) => { let tools: AnthropicTool[] = []; if (params.tools) { - params.tools.forEach((tool) => { + params.tools.forEach((tool: Tool | McpTool) => { + if (tool.type === 'mcp') { + return; + } + tool = tool as Tool; if (tool.function) { tools.push({ name: tool.function.name, diff --git a/src/providers/bedrock/chatComplete.ts b/src/providers/bedrock/chatComplete.ts index a5c673dea..7d9b2b6f4 100644 --- a/src/providers/bedrock/chatComplete.ts +++ b/src/providers/bedrock/chatComplete.ts @@ -10,6 +10,7 @@ import { ToolCall, SYSTEM_MESSAGE_ROLES, ContentType, + Tool, } from '../../types/requestBody'; import { ChatCompletionResponse, @@ -315,6 +316,10 @@ export const BedrockConverseChatCompleteConfig: ProviderConfig = { | { cachePoint: { type: string } } > = []; params.tools?.forEach((tool) => { + if (tool.type === 'mcp') { + return; + } + tool = tool as Tool; if (tool.function) { tools.push({ toolSpec: { diff --git a/src/providers/bedrock/uploadFileUtils.ts b/src/providers/bedrock/uploadFileUtils.ts index dfbc95717..ef02d7ba4 100644 --- a/src/providers/bedrock/uploadFileUtils.ts +++ b/src/providers/bedrock/uploadFileUtils.ts @@ -4,6 +4,7 @@ import { Message, MESSAGE_ROLES, Params, + Tool, } from '../../types/requestBody'; import { ChatCompletionResponse, @@ -226,6 +227,10 @@ const BedrockAnthropicChatCompleteConfig: ProviderConfig = { const tools: AnthropicTool[] = []; if (params.tools) { params.tools.forEach((tool) => { + if (tool.type === 'mcp') { + return; + } + tool = tool as Tool; if (tool.function) { tools.push({ name: tool.function.name, From c3ad7c6bcf2bd17e7eb32b062afe459a57cdf59f Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Sun, 20 Jul 2025 23:41:20 +0530 Subject: [PATCH 15/21] Allow mid step MCP logging. --- src/handlers/handlerUtils.ts | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/handlers/handlerUtils.ts b/src/handlers/handlerUtils.ts index b9957a845..827585672 100644 --- a/src/handlers/handlerUtils.ts +++ b/src/handlers/handlerUtils.ts @@ -1228,6 +1228,11 @@ export async function recursiveAfterRequestHookHandler( // request: { json: requestContext.params }, // }); + logObject + .updateRequestContext(requestContext, options.headers) + .addResponse(mappedResponse, mappedResponseJson) + .log(); + // Construct the base object for the request const fetchOptions: RequestInit = await constructRequest( providerContext, @@ -1358,6 +1363,16 @@ export async function beforeRequestHookHandler( }; } +/** + * Handles MCP tool calls for a given request context and response JSON. + * This function processes tool calls from the response and executes them using the MCP service. + * It updates the request context with the tool responses and transforms the request to the provider's format. + * + * @param requestContext - The request context containing the conversation and parameters + * @param responseJson - The response JSON containing tool calls + * @param mcpService - The MCP service for executing tool calls + * @returns { success: boolean; error?: string } - The result of the MCP tool calls + */ async function handleMcpToolCalls( requestContext: RequestContext, responseJson: any, From ca9a35c74aa8c1f745f45abdee23a0b6fe611815 Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Mon, 21 Jul 2025 18:52:56 +0530 Subject: [PATCH 16/21] Allow regular tool calling to work as usual --- src/handlers/handlerUtils.ts | 6 +++++- src/handlers/services/mcpService.ts | 6 ++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/handlers/handlerUtils.ts b/src/handlers/handlerUtils.ts index 827585672..4f05f0ff1 100644 --- a/src/handlers/handlerUtils.ts +++ b/src/handlers/handlerUtils.ts @@ -1415,6 +1415,10 @@ async function handleMcpToolCalls( return toolResponse; } catch (toolError: any) { + if (toolError.message.includes('MCP_SERVER_TOOL_NOT_FOUND')) { + throw new Error('MCP_SERVER_TOOL_NOT_FOUND'); + } + console.error( `MCP tool call failed for ${toolCall.function.name}:`, toolError @@ -1455,7 +1459,7 @@ async function handleMcpToolCalls( return { success: true }; } catch (error: any) { - console.error('Error in handleMcpToolCalls:', error); + console.warn('Error in handleMcpToolCalls:', error); return { success: false, error: error.message }; } } diff --git a/src/handlers/services/mcpService.ts b/src/handlers/services/mcpService.ts index 134dc37aa..c88a93b6c 100644 --- a/src/handlers/services/mcpService.ts +++ b/src/handlers/services/mcpService.ts @@ -52,8 +52,10 @@ export class McpService { const toolName = functionName.split('_').slice(1).join('_'); const client = this.mcpConnections.get(serverName); // console.log('Current MCP connections are', this.mcpConnections); - if (!client) { - throw new Error(`MCP server ${serverName} not found`); + if (!client || !this.mcpTools.has(serverName)) { + throw new Error( + `MCP_SERVER_TOOL_NOT_FOUND: MCP server ${serverName} not found or tool name not loaded in the mcp server` + ); } // console.log('Executing tool', toolName, toolArgs); return await client.executeTool(toolName, toolArgs); From ca51fb146a185e505dfa91f61e2fcdadfbe54662 Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Mon, 28 Jul 2025 20:06:49 +0530 Subject: [PATCH 17/21] Improved edge cases with tests --- src/handlers/handlerUtils.ts | 29 +++- src/handlers/services/mcpService.ts | 250 ++++++++++++++++++++++------ 2 files changed, 225 insertions(+), 54 deletions(-) diff --git a/src/handlers/handlerUtils.ts b/src/handlers/handlerUtils.ts index 42d073eb5..6aecd3426 100644 --- a/src/handlers/handlerUtils.ts +++ b/src/handlers/handlerUtils.ts @@ -1388,17 +1388,35 @@ async function handleMcpToolCalls( responseJson: any, mcpService: McpService ): Promise<{ success: boolean; error?: string }> { + if (requestContext.endpoint !== 'chatComplete') { + return { + success: false, + error: 'MCP tool calls are only supported for /chat/completions endpoint', + }; + } + const logsService = new LogsService(requestContext.honoContext); try { const toolCalls = responseJson.choices[0].message.tool_calls; - const conversation = [...(requestContext.params.messages || [])]; + const conversation: any[] = [...(requestContext.params.messages || [])]; + + const { mcpToolsMap, nonMcpToolsMap } = mcpService.findMCPTools(toolCalls); + + if (nonMcpToolsMap.size > 0) { + return { + success: false, + error: 'Exiting, since some tool calls are not MCP tools', + }; + } + + const mcpTools = Array.from(mcpToolsMap.values()); // Add assistant's response with tool calls to conversation conversation.push(responseJson.choices[0].message); // Execute all tool calls in parallel for better performance - const toolCallPromises = toolCalls.map(async (toolCall: any) => { + const toolCallPromises = mcpTools.map(async (toolCall: any) => { const start = new Date().getTime(); try { @@ -1458,7 +1476,12 @@ async function handleMcpToolCalls( }); // Wait for all tool calls to complete - const toolResponses = await Promise.all(toolCallPromises); + let toolResponses = await Promise.all(toolCallPromises); + toolResponses = toolResponses.filter((response: any) => response !== null); + + if (toolResponses.length === 0) { + return { success: false, error: 'No tool responses received' }; + } // Add all tool responses to conversation conversation.push(...toolResponses); diff --git a/src/handlers/services/mcpService.ts b/src/handlers/services/mcpService.ts index c88a93b6c..48968a30a 100644 --- a/src/handlers/services/mcpService.ts +++ b/src/handlers/services/mcpService.ts @@ -1,10 +1,12 @@ import { McpServer, McpServerConfig, ToolCall } from '../../types/requestBody'; import { RequestContext } from './requestContext'; +import { GatewayError } from '../../errors/GatewayError'; // services/mcpService.ts export class McpService { private mcpConnections = new Map(); private mcpTools = new Map(); + private mcpToolToServerMap = new Map(); constructor(private requestContext: RequestContext) {} @@ -13,12 +15,14 @@ export class McpService { if (!mcpServers) { return; } + this.validateServerObjects(mcpServers); for (const server of mcpServers) { try { const client = await this.connectToMcpServer(server); if (client) { this.mcpConnections.set(server.server_label, client); let tools = await client.listTools(); + // console.log('MCP tools', tools); if (server.allowed_tools && server.allowed_tools.length) { tools = tools.filter((tool) => server.allowed_tools!.includes(tool.name) @@ -35,29 +39,129 @@ export class McpService { `Error connecting to MCP server ${server.server_url}:`, error ); + throw new GatewayError( + `Error connecting to MCP server \`${server.server_url}\`.` + ); } } return; } + private validateServerObjects(servers: McpServer[]): void { + if (!servers || servers.length === 0) { + return; + } + + // Pre-compile regex patterns for better performance + const labelRegex = /^[a-zA-Z][a-zA-Z0-9-_]*$/; + const urlRegex = /^https?:\/\/[^\s/$.?#].[^\s]*$/i; + + // Private IP ranges and localhost patterns + const privatePatterns = [ + /localhost/i, + /127\.0\.0\.1/, + /::1/, + /0\.0\.0\.0/, + // Additional private IP ranges for comprehensive SSRF protection + /10\.\d{1,3}\.\d{1,3}\.\d{1,3}/, // 10.0.0.0/8 + /172\.(1[6-9]|2\d|3[01])\.\d{1,3}\.\d{1,3}/, // 172.16.0.0/12 + /192\.168\.\d{1,3}\.\d{1,3}/, // 192.168.0.0/16 + /169\.254\.\d{1,3}\.\d{1,3}/, // 169.254.0.0/16 (link-local) + ]; + + const seenLabels = new Set(); + + for (const server of servers) { + // Validate required fields exist + if (!server.server_label) { + throw new GatewayError( + 'MCP_SERVER_LABEL_NOT_FOUND: MCP server label not found' + ); + } + + if (!server.server_url) { + throw new GatewayError( + 'MCP_SERVER_URL_NOT_FOUND: MCP server URL not found' + ); + } + + // Validate label format + if (!labelRegex.test(server.server_label)) { + throw new GatewayError( + 'MCP_SERVER_LABEL_INVALID: MCP server label must start with a letter and can only contain letters, numbers, hyphens and underscores' + ); + } + + // Check label uniqueness (O(1) lookup instead of O(n)) + if (seenLabels.has(server.server_label)) { + throw new GatewayError( + 'MCP_SERVER_LABEL_NOT_UNIQUE: MCP server label must be unique' + ); + } + seenLabels.add(server.server_label); + + // Validate URL format first (fail fast) + if (!urlRegex.test(server.server_url)) { + throw new GatewayError( + 'MCP_SERVER_URL_INVALID: MCP server URL must be a valid URL' + ); + } + + // Check for SSRF vulnerabilities + if (privatePatterns.some((pattern) => pattern.test(server.server_url))) { + throw new GatewayError( + 'MCP_SERVER_URL_INVALID: MCP server URL must not hit private IPs or localhost' + ); + } + } + } + get tools(): LLMFunction[] { return Array.from(this.mcpTools.values()).flat(); } + /** + * Find MCP tools and non-MCP tools from a list of tool calls + * based on the MCP tools loaded in the MCP service + * @param toolCalls - Tool calls to find MCP tools for + * @returns - MCP tools and non-MCP tools + */ + findMCPTools(toolCalls: ToolCall[]): { + mcpToolsMap: Map; + nonMcpToolsMap: Map; + } { + let mcpToolsMap: Map = new Map(), + nonMcpToolsMap: Map = new Map(); + const mcpToolNames = this.tools.map((tool) => tool.function.name); + toolCalls.forEach((toolCall: ToolCall) => { + if (mcpToolNames.includes(toolCall.function.name)) { + mcpToolsMap.set(toolCall.function.name, toolCall); + } else { + nonMcpToolsMap.set(toolCall.function.name, toolCall); + } + }); + return { mcpToolsMap, nonMcpToolsMap }; + } + async executeTool( functionName: string, toolArgs: any ): Promise { - const serverName = functionName.split('_')[0]; - const toolName = functionName.split('_').slice(1).join('_'); + const serverName = this.mcpToolToServerMap.get(functionName); + if (!serverName) { + throw new Error( + `MCP_SERVER_TOOL_NOT_FOUND: MCP server not found for tool ${functionName}` + ); + } const client = this.mcpConnections.get(serverName); - // console.log('Current MCP connections are', this.mcpConnections); + if (!client || !this.mcpTools.has(serverName)) { throw new Error( `MCP_SERVER_TOOL_NOT_FOUND: MCP server ${serverName} not found or tool name not loaded in the mcp server` ); } - // console.log('Executing tool', toolName, toolArgs); + + const toolName = functionName.substring(serverName.length + 1); return await client.executeTool(toolName, toolArgs); } @@ -65,24 +169,28 @@ export class McpService { servername: string, mcpTools: Tool[] ): LLMFunction[] { - return mcpTools.map((tool) => ({ - type: 'function' as const, - function: { - name: `${servername}_${tool.name}`, - description: tool.description, - parameters: { - type: 'object' as const, - properties: tool.inputSchema.properties || {}, - required: tool.inputSchema.required || [], - // Preserve any additional schema properties like additionalProperties, etc. - ...Object.fromEntries( - Object.entries(tool.inputSchema).filter( - ([key]) => !['type', 'properties', 'required'].includes(key) - ) - ), + return mcpTools.map((tool) => { + const functionName = `${servername}_${tool.name}`; + this.mcpToolToServerMap.set(functionName, servername); + return { + type: 'function' as const, + function: { + name: functionName, + description: tool.description, + parameters: { + type: 'object' as const, + properties: tool.inputSchema.properties || {}, + required: tool.inputSchema.required || [], + // Preserve any additional schema properties like additionalProperties, etc. + ...Object.fromEntries( + Object.entries(tool.inputSchema).filter( + ([key]) => !['type', 'properties', 'required'].includes(key) + ) + ), + }, }, - }, - })); + }; + }); } private async connectToMcpServer( @@ -515,6 +623,9 @@ class MinimalMCPClient { headers.set('mcp-session-id', this.sessionId); } + // IMPORTANT: Add Accept header for both JSON and SSE + headers.set('Accept', 'application/json, text/event-stream'); + const response = await fetch(this.url, { method: 'POST', headers, @@ -557,42 +668,79 @@ class MinimalMCPClient { throw new Error('No response body for SSE stream'); } - const reader = response.body - .pipeThrough(new TextDecoderStream()) - .getReader(); + const requestId = this.messageId; // Store the current request ID for matching - try { - while (true) { - const { value, done } = await reader.read(); - if (done) break; - - // Parse SSE format - const lines = value.split('\n'); - for (const line of lines) { - if (line.startsWith('data: ')) { - const data = line.slice(6); - try { - const jsonResponse: JSONRPCResponse = JSON.parse(data); - - if (jsonResponse.error) { - throw new Error( - `MCP Error ${jsonResponse.error.code}: ${jsonResponse.error.message}` - ); - } + return new Promise((resolve, reject) => { + const reader = response + .body!.pipeThrough(new TextDecoderStream()) + .getReader(); - return jsonResponse.result; - } catch (e) { - // Ignore parsing errors for non-JSON data lines - continue; + const processStream = async () => { + try { + let buffer = ''; + + while (true) { + const { value, done } = await reader.read(); + if (done) break; + + buffer += value; + + // Process line by line + let lineEnd; + while ((lineEnd = buffer.indexOf('\n')) !== -1) { + const line = buffer.slice(0, lineEnd); + buffer = buffer.slice(lineEnd + 1); + + // Remove \r if present (for \r\n line endings) + const cleanLine = line.replace(/\r$/, ''); + + if (cleanLine.startsWith('data: ')) { + const data = cleanLine.slice(6); + try { + const jsonResponse: JSONRPCResponse = JSON.parse(data); + + // Check if this response matches our request + if (jsonResponse.id === requestId) { + if (jsonResponse.error) { + reject( + new Error( + `MCP Error ${jsonResponse.error.code}: ${jsonResponse.error.message}` + ) + ); + } else { + resolve(jsonResponse.result); + } + return; // Exit the stream processing + } + + // If it's not our response, it might be a notification + // Pass it to the message handler if available + // if (this.onmessage && (!jsonResponse.id || jsonResponse.id !== requestId)) { + // this.onmessage(jsonResponse); + // } + } catch (e) { + // Ignore parsing errors for non-JSON data lines + continue; + } + } } } + + // If we reach here without getting our response, it's an error + reject(new Error('No matching response received from SSE stream')); + } catch (error) { + reject(error); + } finally { + try { + reader.releaseLock(); + } catch (e) { + // Reader might already be released + } } - } - } finally { - reader.releaseLock(); - } + }; - throw new Error('No valid response received from SSE stream'); + processStream(); + }); } async initialize(): Promise { From 77b79f2a8f4dcb514ed6d18554c403f0d29f7575 Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Tue, 5 Aug 2025 15:11:10 +0530 Subject: [PATCH 18/21] Renamed otlp_span to otel --- src/handlers/services/logsService.ts | 4 ++-- src/middlewares/log/index.ts | 4 ++-- tests/unit/src/handlers/services/logsService.test.ts | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/handlers/services/logsService.ts b/src/handlers/services/logsService.ts index d4587634f..7c1582f50 100644 --- a/src/handlers/services/logsService.ts +++ b/src/handlers/services/logsService.ts @@ -61,7 +61,7 @@ export interface LogObject { } export interface otlpSpanObject { - type: 'otlp_span'; + type: 'otel'; traceId: string; spanId: string; parentSpanId: string; @@ -103,7 +103,7 @@ export class LogsService { spanId?: string ) { return { - type: 'otlp_span', + type: 'otel', traceId: traceId, spanId: spanId ?? crypto.randomUUID(), parentSpanId: parentSpanId, diff --git a/src/middlewares/log/index.ts b/src/middlewares/log/index.ts index a4c932136..ebc6c3bb2 100644 --- a/src/middlewares/log/index.ts +++ b/src/middlewares/log/index.ts @@ -57,8 +57,8 @@ async function processLog(c: Context, start: number) { } for (const requestOption of requestOptionsArray) { - if (requestOption.type === 'otlp_span') { - console.log('otlp_span', JSON.stringify(requestOption)); + if (requestOption.type === 'otel') { + console.log('otel', JSON.stringify(requestOption)); continue; } diff --git a/tests/unit/src/handlers/services/logsService.test.ts b/tests/unit/src/handlers/services/logsService.test.ts index ffdbc73c7..33fd81134 100644 --- a/tests/unit/src/handlers/services/logsService.test.ts +++ b/tests/unit/src/handlers/services/logsService.test.ts @@ -60,7 +60,7 @@ describe('LogsService', () => { ); expect(result).toEqual({ - type: 'otlp_span', + type: 'otel', traceId: 'trace-123', spanId: 'span-789', parentSpanId: 'parent-456', From ba3d5c76a9890c30f9296841bc788c2c81d39021 Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Tue, 5 Aug 2025 17:09:53 +0530 Subject: [PATCH 19/21] Better logging --- src/handlers/handlerUtils.ts | 80 ++++++++++++---------------- src/handlers/services/logsService.ts | 9 +++- 2 files changed, 42 insertions(+), 47 deletions(-) diff --git a/src/handlers/handlerUtils.ts b/src/handlers/handlerUtils.ts index f1099c6f9..5df6a8668 100644 --- a/src/handlers/handlerUtils.ts +++ b/src/handlers/handlerUtils.ts @@ -453,6 +453,8 @@ export async function tryPost( providerContext, hooksService, logObject, + responseService, + cacheResponseObject, mcpService || undefined ); @@ -472,10 +474,7 @@ export async function tryPost( originalResponseJson, }); - logObject - .updateRequestContext(requestContext, fetchOptions.headers) - .addResponse(response, mappedOriginalResponseJson) - .log(); + // The log is handled inside the recursiveAfterRequestHookHandler function return response; } finally { @@ -1149,6 +1148,8 @@ export async function recursiveAfterRequestHookHandler( providerContext: ProviderContext, hooksService: HooksService, logObject: LogObjectBuilder, + responseService: ResponseService, + cacheResponseObject: CacheResponseObject, mcpService?: McpService ): Promise<{ mappedResponse: Response; @@ -1158,11 +1159,7 @@ export async function recursiveAfterRequestHookHandler( }> { const { honoContext: c, - providerOption, isStreaming: isStreamingMode, - params: gatewayParams, - endpoint: fn, - strictOpenAiCompliance, requestTimeout, retryConfig: retry, } = requestContext; @@ -1187,26 +1184,27 @@ export async function recursiveAfterRequestHookHandler( retry.useRetryAfterHeader )); - // Check if sync hooks are available - // This will be used to determine if we need to parse the response body or simply passthrough the response as is - const areSyncHooksAvailable = hooksService.areSyncHooksAvailable; - const { - response: mappedResponse, + response: currentResponse, responseJson: mappedResponseJson, originalResponseJson, - } = await responseHandler( - response, - isStreamingMode, - providerOption, - fn, - url, - false, - gatewayParams, - strictOpenAiCompliance, - c.req.url, - areSyncHooksAvailable - ); + } = await responseService.create({ + response: response, + responseTransformer: undefined, + isResponseAlreadyMapped: false, + cache: { + isCacheHit: false, + cacheStatus: cacheResponseObject.cacheStatus, + cacheKey: cacheResponseObject.cacheKey, + }, + retryAttempt: retryCount || 0, + createdAt, + }); + + logObject + .updateRequestContext(requestContext, options.headers) + .addResponse(currentResponse, originalResponseJson) + .log(); if ( mcpService && @@ -1220,17 +1218,6 @@ export async function recursiveAfterRequestHookHandler( mcpService ); if (mcpResult.success) { - // TODO: the hookspan context might need to be updated here. - // Update hook span context for the new request - // hooksService.hookSpan.updateContext({ - // request: { json: requestContext.params }, - // }); - - logObject - .updateRequestContext(requestContext, options.headers) - .addResponse(mappedResponse, mappedResponseJson) - .log(); - // Construct the base object for the request const fetchOptions: RequestInit = await constructRequest( providerContext, @@ -1246,6 +1233,8 @@ export async function recursiveAfterRequestHookHandler( providerContext, hooksService, logObject, + responseService, + cacheResponseObject, mcpService ); } else { @@ -1259,12 +1248,18 @@ export async function recursiveAfterRequestHookHandler( const arhResponse = await afterRequestHookHandler( c, - mappedResponse, + currentResponse, mappedResponseJson, hookSpanId, retryAttemptsMade ); + logObject + .updateRequestContext(requestContext, options.headers) + .addResponse(arhResponse, originalResponseJson) + .addExecutionTime(createdAt) + .log(); + const remainingRetryCount = (retry?.attempts || 0) - (retryCount || 0) - retryAttemptsMade; @@ -1273,13 +1268,6 @@ export async function recursiveAfterRequestHookHandler( ); if (remainingRetryCount > 0 && !retrySkipped && isRetriableStatusCode) { - // Log the request here since we're about to retry - logObject - .updateRequestContext(requestContext, options.headers) - .addResponse(arhResponse, originalResponseJson) - .addExecutionTime(createdAt) - .log(); - return recursiveAfterRequestHookHandler( requestContext, options, @@ -1287,7 +1275,9 @@ export async function recursiveAfterRequestHookHandler( hookSpanId, providerContext, hooksService, - logObject + logObject, + responseService, + cacheResponseObject ); } diff --git a/src/handlers/services/logsService.ts b/src/handlers/services/logsService.ts index 7c1582f50..5604ebefb 100644 --- a/src/handlers/services/logsService.ts +++ b/src/handlers/services/logsService.ts @@ -90,6 +90,11 @@ export interface otlpSpanObject { }[]; } +function capitaliseSentence(str: string) { + // First letter of each word to uppercase + return str.replace(/\b\w/g, (char) => char.toUpperCase()); +} + export class LogsService { constructor(private honoContext: Context) {} @@ -107,12 +112,12 @@ export class LogsService { traceId: traceId, spanId: spanId ?? crypto.randomUUID(), parentSpanId: parentSpanId, - name: `execute_tool ${toolCall.function.name}`, + name: capitaliseSentence(toolCall.function.name.replaceAll('_', ' ')), kind: 'SPAN_KIND_INTERNAL', startTimeUnixNano: startTimeUnixNano, endTimeUnixNano: endTimeUnixNano, status: { - code: 'STATUS_CODE_OK', + code: 200, }, attributes: [ { From b6889380fc4a362c71f8e49ae777a010f97e138d Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Tue, 5 Aug 2025 18:34:51 +0530 Subject: [PATCH 20/21] bug fix: response transformer cannot be undefined --- src/handlers/handlerUtils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/handlers/handlerUtils.ts b/src/handlers/handlerUtils.ts index 5df6a8668..168afc785 100644 --- a/src/handlers/handlerUtils.ts +++ b/src/handlers/handlerUtils.ts @@ -1190,7 +1190,7 @@ export async function recursiveAfterRequestHookHandler( originalResponseJson, } = await responseService.create({ response: response, - responseTransformer: undefined, + responseTransformer: requestContext.endpoint, isResponseAlreadyMapped: false, cache: { isCacheHit: false, From 7c44e697387dd9242b85f4dc2bce62558e4da989 Mon Sep 17 00:00:00 2001 From: Rohit Agarwal Date: Thu, 14 Aug 2025 19:58:39 +0530 Subject: [PATCH 21/21] Don't wrap tryPost in catch --- src/handlers/handlerUtils.ts | 219 +++++++++++++++++------------------ 1 file changed, 104 insertions(+), 115 deletions(-) diff --git a/src/handlers/handlerUtils.ts b/src/handlers/handlerUtils.ts index 168afc785..cb28929b8 100644 --- a/src/handlers/handlerUtils.ts +++ b/src/handlers/handlerUtils.ts @@ -359,134 +359,123 @@ export async function tryPost( ? new McpService(requestContext) : null; - try { - if (mcpService) { - await mcpService.init(); - // Add MCP tools to the request - const mcpTools = mcpService.tools; - requestContext.addMcpTools(mcpTools); - } + if (mcpService) { + await mcpService.init(); + // Add MCP tools to the request + const mcpTools = mcpService.tools; + requestContext.addMcpTools(mcpTools); + } - // Attach the body of the request - if (!providerContext.hasRequestHandler(requestContext)) { - requestContext.transformToProviderRequestAndSave(); - } + // Attach the body of the request + if (!providerContext.hasRequestHandler(requestContext)) { + requestContext.transformToProviderRequestAndSave(); + } - // Construct the base object for the request - const fetchOptions: RequestInit = await constructRequest( - providerContext, - requestContext - ); + // Construct the base object for the request + const fetchOptions: RequestInit = await constructRequest( + providerContext, + requestContext + ); - // Cache Handler - const cacheService = new CacheService(c, hooksService); - const cacheResponseObject: CacheResponseObject = - await cacheService.getCachedResponse( - requestContext, - fetchOptions.headers || {} - ); - logObject.addCache( - cacheResponseObject.cacheStatus, - cacheResponseObject.cacheKey + // Cache Handler + const cacheService = new CacheService(c, hooksService); + const cacheResponseObject: CacheResponseObject = + await cacheService.getCachedResponse( + requestContext, + fetchOptions.headers || {} ); - if (cacheResponseObject.cacheResponse) { - const { response, originalResponseJson } = await responseService.create({ - response: cacheResponseObject.cacheResponse, - responseTransformer: requestContext.endpoint, - cache: { - isCacheHit: true, - cacheStatus: cacheResponseObject.cacheStatus, - cacheKey: cacheResponseObject.cacheKey, - }, - isResponseAlreadyMapped: false, - retryAttempt: 0, - fetchOptions, - createdAt: cacheResponseObject.createdAt, - executionTime: 0, - }); + logObject.addCache( + cacheResponseObject.cacheStatus, + cacheResponseObject.cacheKey + ); + if (cacheResponseObject.cacheResponse) { + const { response, originalResponseJson } = await responseService.create({ + response: cacheResponseObject.cacheResponse, + responseTransformer: requestContext.endpoint, + cache: { + isCacheHit: true, + cacheStatus: cacheResponseObject.cacheStatus, + cacheKey: cacheResponseObject.cacheKey, + }, + isResponseAlreadyMapped: false, + retryAttempt: 0, + fetchOptions, + createdAt: cacheResponseObject.createdAt, + executionTime: 0, + }); - logObject - .updateRequestContext(requestContext, fetchOptions.headers) - .addResponse(response, originalResponseJson) - .log(); + logObject + .updateRequestContext(requestContext, fetchOptions.headers) + .addResponse(response, originalResponseJson) + .log(); - return response; - } + return response; + } - // Prerequest validator (For virtual key budgets) - const preRequestValidatorService = new PreRequestValidatorService( - c, - requestContext - ); - const preRequestValidatorResponse = - await preRequestValidatorService.getResponse(); - if (preRequestValidatorResponse) { - const { response, originalResponseJson } = await responseService.create({ - response: preRequestValidatorResponse, - responseTransformer: undefined, - isResponseAlreadyMapped: false, - cache: { - isCacheHit: false, - cacheStatus: cacheResponseObject.cacheStatus, - cacheKey: cacheResponseObject.cacheKey, - }, - retryAttempt: 0, - fetchOptions, - createdAt: new Date(), - }); + // Prerequest validator (For virtual key budgets) + const preRequestValidatorService = new PreRequestValidatorService( + c, + requestContext + ); + const preRequestValidatorResponse = + await preRequestValidatorService.getResponse(); + if (preRequestValidatorResponse) { + const { response, originalResponseJson } = await responseService.create({ + response: preRequestValidatorResponse, + responseTransformer: undefined, + isResponseAlreadyMapped: false, + cache: { + isCacheHit: false, + cacheStatus: cacheResponseObject.cacheStatus, + cacheKey: cacheResponseObject.cacheKey, + }, + retryAttempt: 0, + fetchOptions, + createdAt: new Date(), + }); - logObject - .updateRequestContext(requestContext, fetchOptions.headers) - .addResponse(response, originalResponseJson) - .log(); + logObject + .updateRequestContext(requestContext, fetchOptions.headers) + .addResponse(response, originalResponseJson) + .log(); - return response; - } + return response; + } - // Request Handler (Including retries, recursion and hooks) - const { mappedResponse, retryCount, createdAt, originalResponseJson } = - await recursiveAfterRequestHookHandler( - requestContext, - fetchOptions, - 0, - hookSpan.id, - providerContext, - hooksService, - logObject, - responseService, - cacheResponseObject, - mcpService || undefined - ); + // Request Handler (Including retries, recursion and hooks) + const { mappedResponse, retryCount, createdAt, originalResponseJson } = + await recursiveAfterRequestHookHandler( + requestContext, + fetchOptions, + 0, + hookSpan.id, + providerContext, + hooksService, + logObject, + responseService, + cacheResponseObject, + mcpService || undefined + ); - const { response, originalResponseJson: mappedOriginalResponseJson } = - await responseService.create({ - response: mappedResponse, - responseTransformer: undefined, - isResponseAlreadyMapped: true, - cache: { - isCacheHit: false, - cacheStatus: cacheResponseObject.cacheStatus, - cacheKey: cacheResponseObject.cacheKey, - }, - retryAttempt: retryCount, - fetchOptions, - createdAt, - originalResponseJson, - }); + const { response, originalResponseJson: mappedOriginalResponseJson } = + await responseService.create({ + response: mappedResponse, + responseTransformer: undefined, + isResponseAlreadyMapped: true, + cache: { + isCacheHit: false, + cacheStatus: cacheResponseObject.cacheStatus, + cacheKey: cacheResponseObject.cacheKey, + }, + retryAttempt: retryCount, + fetchOptions, + createdAt, + originalResponseJson, + }); - // The log is handled inside the recursiveAfterRequestHookHandler function + // The log is handled inside the recursiveAfterRequestHookHandler function - return response; - } finally { - // Clean up MCP service - if (mcpService) { - try { - await mcpService[Symbol.asyncDispose](); - } catch (error) { - console.error('Error disposing MCP service:', error); - } - } - } + return response; } export async function tryTargetsRecursively(