3
3
from mcp_module import mcp_service
4
4
from utils .logger import logger
5
5
import inspect
6
+ import asyncio
7
+ import time
8
+ import hashlib
9
+ import json
6
10
from agent .tools .utils .mcp_connection_manager import MCPConnectionManager
7
11
from agent .tools .utils .custom_mcp_handler import CustomMCPHandler
8
12
from agent .tools .utils .dynamic_tool_builder import DynamicToolBuilder
9
13
from agent .tools .utils .mcp_tool_executor import MCPToolExecutor
14
+ from services import redis as redis_service
10
15
11
16
17
+ class MCPSchemaRedisCache :
18
+ def __init__ (self , ttl_seconds : int = 3600 , key_prefix : str = "mcp_schema:" ):
19
+ self ._ttl = ttl_seconds
20
+ self ._key_prefix = key_prefix
21
+ self ._redis_client = None
22
+
23
+ async def _ensure_redis (self ):
24
+ if not self ._redis_client :
25
+ try :
26
+ self ._redis_client = await redis_service .get_client ()
27
+ except Exception as e :
28
+ logger .warning (f"Redis not available for MCP cache: { e } " )
29
+ return False
30
+ return True
31
+
32
+ def _get_cache_key (self , config : Dict [str , Any ]) -> str :
33
+ config_str = json .dumps (config , sort_keys = True )
34
+ config_hash = hashlib .md5 (config_str .encode ()).hexdigest ()
35
+ return f"{ self ._key_prefix } { config_hash } "
36
+
37
+ async def get (self , config : Dict [str , Any ]) -> Optional [Dict [str , Any ]]:
38
+ if not await self ._ensure_redis ():
39
+ return None
40
+
41
+ try :
42
+ key = self ._get_cache_key (config )
43
+ cached_data = await self ._redis_client .get (key )
44
+
45
+ if cached_data :
46
+ logger .debug (f"⚡ Redis cache hit for MCP: { config .get ('name' , config .get ('qualifiedName' , 'Unknown' ))} " )
47
+ return json .loads (cached_data )
48
+ else :
49
+ logger .debug (f"Redis cache miss for MCP: { config .get ('name' , config .get ('qualifiedName' , 'Unknown' ))} " )
50
+ return None
51
+
52
+ except Exception as e :
53
+ logger .warning (f"Error reading from Redis cache: { e } " )
54
+ return None
55
+
56
+ async def set (self , config : Dict [str , Any ], data : Dict [str , Any ]):
57
+ if not await self ._ensure_redis ():
58
+ return
59
+
60
+ try :
61
+ key = self ._get_cache_key (config )
62
+ serialized_data = json .dumps (data )
63
+
64
+ await self ._redis_client .setex (key , self ._ttl , serialized_data )
65
+ logger .debug (f"✅ Cached MCP schema in Redis for { config .get ('name' , config .get ('qualifiedName' , 'Unknown' ))} (TTL: { self ._ttl } s)" )
66
+
67
+ except Exception as e :
68
+ logger .warning (f"Error writing to Redis cache: { e } " )
69
+
70
+ async def clear_pattern (self , pattern : Optional [str ] = None ):
71
+ if not await self ._ensure_redis ():
72
+ return
73
+ try :
74
+ if pattern :
75
+ search_pattern = f"{ self ._key_prefix } { pattern } *"
76
+ else :
77
+ search_pattern = f"{ self ._key_prefix } *"
78
+
79
+ keys = []
80
+ async for key in self ._redis_client .scan_iter (match = search_pattern ):
81
+ keys .append (key )
82
+
83
+ if keys :
84
+ await self ._redis_client .delete (* keys )
85
+ logger .info (f"Cleared { len (keys )} MCP schema cache entries from Redis" )
86
+
87
+ except Exception as e :
88
+ logger .warning (f"Error clearing Redis cache: { e } " )
89
+
90
+ async def get_stats (self ) -> Dict [str , Any ]:
91
+ if not await self ._ensure_redis ():
92
+ return {"available" : False }
93
+ try :
94
+ count = 0
95
+ async for _ in self ._redis_client .scan_iter (match = f"{ self ._key_prefix } *" ):
96
+ count += 1
97
+
98
+ return {
99
+ "available" : True ,
100
+ "cached_schemas" : count ,
101
+ "ttl_seconds" : self ._ttl ,
102
+ "key_prefix" : self ._key_prefix
103
+ }
104
+ except Exception as e :
105
+ logger .warning (f"Error getting cache stats: { e } " )
106
+ return {"available" : False , "error" : str (e )}
107
+
108
+
109
+ _redis_cache = MCPSchemaRedisCache (ttl_seconds = 3600 )
110
+
12
111
class MCPToolWrapper (Tool ):
13
- def __init__ (self , mcp_configs : Optional [List [Dict [str , Any ]]] = None ):
112
+ def __init__ (self , mcp_configs : Optional [List [Dict [str , Any ]]] = None , use_cache : bool = True ):
14
113
self .mcp_manager = mcp_service
15
114
self .mcp_configs = mcp_configs or []
16
115
self ._initialized = False
17
116
self ._schemas : Dict [str , List [ToolSchema ]] = {}
18
117
self ._dynamic_tools = {}
19
118
self ._custom_tools = {}
119
+ self .use_cache = use_cache
20
120
21
121
self .connection_manager = MCPConnectionManager ()
22
122
self .custom_handler = CustomMCPHandler (self .connection_manager )
@@ -32,23 +132,109 @@ async def _ensure_initialized(self):
32
132
self ._initialized = True
33
133
34
134
async def _initialize_servers (self ):
135
+ start_time = time .time ()
136
+
35
137
standard_configs = [cfg for cfg in self .mcp_configs if not cfg .get ('isCustom' , False )]
36
138
custom_configs = [cfg for cfg in self .mcp_configs if cfg .get ('isCustom' , False )]
37
139
140
+ cached_configs = []
141
+ cached_tools_data = []
142
+
143
+ initialization_tasks = []
144
+
38
145
if standard_configs :
39
- await self ._initialize_standard_servers (standard_configs )
146
+ for config in standard_configs :
147
+ if self .use_cache :
148
+ cached_data = await _redis_cache .get (config )
149
+ if cached_data :
150
+ cached_configs .append (config .get ('qualifiedName' , 'Unknown' ))
151
+ cached_tools_data .append (cached_data )
152
+ continue
153
+
154
+ task = self ._initialize_single_standard_server (config )
155
+ initialization_tasks .append (('standard' , config , task ))
40
156
41
157
if custom_configs :
42
- await self .custom_handler .initialize_custom_mcps (custom_configs )
158
+ for config in custom_configs :
159
+ if self .use_cache :
160
+ cached_data = await _redis_cache .get (config )
161
+ if cached_data :
162
+ cached_configs .append (config .get ('name' , 'Unknown' ))
163
+ cached_tools_data .append (cached_data )
164
+ continue
165
+
166
+ task = self ._initialize_single_custom_mcp (config )
167
+ initialization_tasks .append (('custom' , config , task ))
168
+
169
+ if cached_tools_data :
170
+ logger .info (f"⚡ Loaded { len (cached_configs )} MCP schemas from Redis cache: { ', ' .join (cached_configs )} " )
171
+ for cached_data in cached_tools_data :
172
+ try :
173
+ if cached_data .get ('type' ) == 'standard' :
174
+ logger .debug ("Standard MCP tools found in cache but require connection to restore" )
175
+ elif cached_data .get ('type' ) == 'custom' :
176
+ custom_tools = cached_data .get ('tools' , {})
177
+ if custom_tools :
178
+ self .custom_handler .custom_tools .update (custom_tools )
179
+ logger .debug (f"Restored { len (custom_tools )} custom tools from cache" )
180
+ except Exception as e :
181
+ logger .warning (f"Failed to restore cached tools: { e } " )
182
+
183
+ if initialization_tasks :
184
+ logger .info (f"🚀 Initializing { len (initialization_tasks )} MCP servers in parallel (cache enabled: { self .use_cache } )..." )
185
+
186
+ tasks = [task for _ , _ , task in initialization_tasks ]
187
+ results = await asyncio .gather (* tasks , return_exceptions = True )
188
+
189
+ successful = 0
190
+ failed = 0
191
+
192
+ for i , result in enumerate (results ):
193
+ task_type , config , _ = initialization_tasks [i ]
194
+ if isinstance (result , Exception ):
195
+ failed += 1
196
+ config_name = config .get ('name' , config .get ('qualifiedName' , 'Unknown' ))
197
+ logger .error (f"Failed to initialize MCP server '{ config_name } ': { result } " )
198
+ else :
199
+ successful += 1
200
+ if self .use_cache and result :
201
+ await _redis_cache .set (config , result )
202
+
203
+ elapsed_time = time .time () - start_time
204
+ logger .info (f"⚡ MCP initialization completed in { elapsed_time :.2f} s - { successful } successful, { failed } failed, { len (cached_configs )} from cache" )
205
+ else :
206
+ if cached_configs :
207
+ elapsed_time = time .time () - start_time
208
+ logger .info (f"⚡ All { len (cached_configs )} MCP schemas loaded from Redis cache in { elapsed_time :.2f} s - instant startup!" )
209
+ else :
210
+ logger .info ("No MCP servers to initialize" )
211
+
212
+ async def _initialize_single_standard_server (self , config : Dict [str , Any ]):
213
+ try :
214
+ logger .debug (f"Connecting to standard MCP server: { config ['qualifiedName' ]} " )
215
+ await self .mcp_manager .connect_server (config )
216
+ logger .debug (f"✓ Connected to MCP server: { config ['qualifiedName' ]} " )
217
+
218
+ tools_info = self .mcp_manager .get_all_tools_openapi ()
219
+ return {'tools' : tools_info , 'type' : 'standard' , 'timestamp' : time .time ()}
220
+ except Exception as e :
221
+ logger .error (f"✗ Failed to connect to MCP server { config ['qualifiedName' ]} : { e } " )
222
+ raise e
223
+
224
+ async def _initialize_single_custom_mcp (self , config : Dict [str , Any ]):
225
+ try :
226
+ logger .debug (f"Initializing custom MCP: { config .get ('name' , 'Unknown' )} " )
227
+ await self .custom_handler ._initialize_single_custom_mcp (config )
228
+ logger .debug (f"✓ Initialized custom MCP: { config .get ('name' , 'Unknown' )} " )
229
+
230
+ custom_tools = self .custom_handler .get_custom_tools ()
231
+ return {'tools' : custom_tools , 'type' : 'custom' , 'timestamp' : time .time ()}
232
+ except Exception as e :
233
+ logger .error (f"✗ Failed to initialize custom MCP { config .get ('name' , 'Unknown' )} : { e } " )
234
+ raise e
43
235
44
236
async def _initialize_standard_servers (self , standard_configs : List [Dict [str , Any ]]):
45
- for config in standard_configs :
46
- try :
47
- logger .info (f"Attempting to connect to MCP server: { config ['qualifiedName' ]} " )
48
- await self .mcp_manager .connect_server (config )
49
- logger .info (f"Successfully connected to MCP server: { config ['qualifiedName' ]} " )
50
- except Exception as e :
51
- logger .error (f"Failed to connect to MCP server { config ['qualifiedName' ]} : { e } " )
237
+ pass
52
238
53
239
async def _create_dynamic_tools (self ):
54
240
try :
@@ -77,7 +263,6 @@ async def _create_dynamic_tools(self):
77
263
78
264
logger .info (f"Created { len (self ._dynamic_tools )} dynamic MCP tool methods" )
79
265
80
- # Re-register schemas to pick up the dynamic methods
81
266
self ._register_schemas ()
82
267
logger .info (f"Re-registered schemas after creating dynamic tools - total: { len (self ._schemas )} " )
83
268
0 commit comments