1
+ # -*- coding: utf-8 -*-
1
2
import asyncio
2
3
import json
3
4
import logging
18
19
19
20
class MCPTool (BaseTool ):
20
21
"""Langchain tool wrapper for MCP tools"""
21
-
22
+
22
23
name : str = Field (..., description = "Tool name" )
23
24
description : str = Field (..., description = "Tool description" )
24
25
mcp_client : MCPClient = Field (..., description = "MCP client instance" )
25
26
tool_id : str = Field (..., description = "MCP tool ID" )
26
-
27
+
27
28
class Config :
28
29
arbitrary_types_allowed = True
29
-
30
+
30
31
def _run (self , ** kwargs ) -> str :
31
32
"""Synchronous tool execution"""
32
33
try :
@@ -35,7 +36,7 @@ def _run(self, **kwargs) -> str:
35
36
except Exception as e :
36
37
logger .error (f"Tool { self .tool_id } execution failed: { e } " )
37
38
return f"Error executing tool: { str (e )} "
38
-
39
+
39
40
async def _arun (self , ** kwargs ) -> str :
40
41
"""Asynchronous tool execution"""
41
42
# Run in thread pool since MCP client might not be async
@@ -44,27 +45,27 @@ async def _arun(self, **kwargs) -> str:
44
45
45
46
class LangchainMCPAgent :
46
47
"""Langchain agent that integrates with MCP Gateway"""
47
-
48
+
48
49
def __init__ (self , config : AgentConfig ):
49
50
self .config = config
50
51
self .mcp_client = MCPClient .from_env (config .mcp_gateway_url )
51
52
self .mcp_client .debug = config .debug_mode
52
-
53
+
53
54
self .llm = ChatOpenAI (
54
55
model = config .default_model ,
55
56
temperature = config .temperature ,
56
57
streaming = config .streaming_enabled
57
58
)
58
-
59
+
59
60
self .tools : List [MCPTool ] = []
60
61
self .agent_executor : Optional [AgentExecutor ] = None
61
62
self ._initialized = False
62
-
63
+
63
64
@classmethod
64
65
def from_config (cls , config : AgentConfig ) -> "LangchainMCPAgent" :
65
66
"""Create agent from configuration"""
66
67
return cls (config )
67
-
68
+
68
69
async def initialize (self ):
69
70
"""Initialize the agent and load tools"""
70
71
try :
@@ -77,24 +78,24 @@ async def initialize(self):
77
78
# Auto-discover from gateway
78
79
logger .info ("Auto-discovering tools from MCP Gateway" )
79
80
await self ._load_mcp_tools ()
80
-
81
+
81
82
# Create the agent
82
83
await self ._create_agent ()
83
-
84
+
84
85
self ._initialized = True
85
86
logger .info (f"Agent initialized with { len (self .tools )} tools" )
86
-
87
+
87
88
except Exception as e :
88
89
logger .error (f"Failed to initialize agent: { e } " )
89
90
raise
90
-
91
+
91
92
async def _load_allowlisted_tools (self ):
92
93
"""Load only tools specified in the allowlist (no autodiscovery)"""
93
94
try :
94
95
# Clean the allowlist
95
96
allowlist = [tool .strip () for tool in self .config .tools_allowlist if tool .strip ()]
96
97
logger .info (f"Loading allowlisted tools: { allowlist } " )
97
-
98
+
98
99
self .tools = []
99
100
for tool_id in allowlist :
100
101
# Create a basic tool definition for allowlisted tools
@@ -107,27 +108,27 @@ async def _load_allowlisted_tools(self):
107
108
)
108
109
self .tools .append (mcp_tool )
109
110
logger .info (f"Added allowlisted tool: { tool_id } " )
110
-
111
+
111
112
except Exception as e :
112
113
logger .error (f"Failed to load allowlisted tools: { e } " )
113
114
raise
114
-
115
+
115
116
async def _load_mcp_tools (self ):
116
117
"""Load tools from MCP Gateway"""
117
118
try :
118
119
# Add debug info about the connection
119
120
logger .info (f"Connecting to MCP Gateway at: { self .mcp_client .base_url } " )
120
121
logger .info (f"Using token: { 'Yes' if self .mcp_client .token else 'No' } " )
121
-
122
+
122
123
tool_defs = self .mcp_client .list_tools ()
123
124
logger .info (f"Found { len (tool_defs )} tools from MCP Gateway" )
124
-
125
+
125
126
if len (tool_defs ) == 0 :
126
127
logger .warning ("No tools found from MCP Gateway. Check if:" )
127
128
logger .warning (" 1. Gateway is running on the expected URL" )
128
129
logger .warning (" 2. Authentication token is valid" )
129
130
logger .warning (" 3. Gateway has tools configured" )
130
-
131
+
131
132
self .tools = []
132
133
for tool_def in tool_defs :
133
134
mcp_tool = MCPTool (
@@ -138,17 +139,17 @@ async def _load_mcp_tools(self):
138
139
)
139
140
self .tools .append (mcp_tool )
140
141
logger .info (f"Loaded tool: { tool_def .id } ({ tool_def .name } )" )
141
-
142
+
142
143
except Exception as e :
143
144
logger .error (f"Failed to load MCP tools: { e } " )
144
145
raise
145
-
146
+
146
147
async def _create_agent (self ):
147
148
"""Create the Langchain agent executor"""
148
149
try :
149
150
# Define the system prompt
150
151
system_prompt = """You are a helpful AI assistant with access to various tools through the MCP (Model Context Protocol) Gateway.
151
-
152
+
152
153
Use the available tools to help answer questions and complete tasks. When using tools:
153
154
1. Read tool descriptions carefully to understand their purpose
154
155
2. Provide the correct arguments as specified in the tool schema
@@ -158,22 +159,22 @@ async def _create_agent(self):
158
159
Available tools: {tool_names}
159
160
160
161
Always strive to be helpful, accurate, and honest in your responses."""
161
-
162
+
162
163
# Create prompt template
163
164
prompt = ChatPromptTemplate .from_messages ([
164
165
("system" , system_prompt ),
165
166
MessagesPlaceholder (variable_name = "chat_history" ),
166
167
("human" , "{input}" ),
167
168
MessagesPlaceholder (variable_name = "agent_scratchpad" ),
168
169
])
169
-
170
+
170
171
# Create the agent
171
172
agent = create_openai_functions_agent (
172
173
llm = self .llm ,
173
174
tools = self .tools ,
174
175
prompt = prompt
175
176
)
176
-
177
+
177
178
# Create agent executor
178
179
self .agent_executor = AgentExecutor (
179
180
agent = agent ,
@@ -182,29 +183,29 @@ async def _create_agent(self):
182
183
verbose = self .config .debug_mode ,
183
184
return_intermediate_steps = True
184
185
)
185
-
186
+
186
187
logger .info ("Langchain agent created successfully" )
187
-
188
+
188
189
except Exception as e :
189
190
logger .error (f"Failed to create agent: { e } " )
190
191
raise
191
-
192
+
192
193
def is_initialized (self ) -> bool :
193
194
"""Check if agent is initialized"""
194
195
return self ._initialized
195
-
196
+
196
197
async def check_readiness (self ) -> bool :
197
198
"""Check if agent is ready to handle requests"""
198
199
try :
199
200
return (
200
- self ._initialized and
201
- self .agent_executor is not None and
201
+ self ._initialized and
202
+ self .agent_executor is not None and
202
203
len (self .tools ) >= 0 and # Allow 0 tools for testing
203
204
await self .test_gateway_connection ()
204
205
)
205
206
except Exception :
206
207
return False
207
-
208
+
208
209
async def test_gateway_connection (self ) -> bool :
209
210
"""Test connection to MCP Gateway"""
210
211
try :
@@ -214,17 +215,17 @@ async def test_gateway_connection(self) -> bool:
214
215
except Exception as e :
215
216
logger .error (f"Gateway connection test failed: { e } " )
216
217
return False
217
-
218
+
218
219
def get_available_tools (self ) -> List [ToolDef ]:
219
220
"""Get list of available tools"""
220
221
try :
221
222
return self .mcp_client .list_tools ()
222
223
except Exception :
223
224
return []
224
-
225
+
225
226
async def run_async (
226
- self ,
227
- messages : List [Dict [str , str ]],
227
+ self ,
228
+ messages : List [Dict [str , str ]],
228
229
model : Optional [str ] = None ,
229
230
max_tokens : Optional [int ] = None ,
230
231
temperature : Optional [float ] = None ,
@@ -233,15 +234,15 @@ async def run_async(
233
234
"""Run the agent asynchronously"""
234
235
if not self ._initialized :
235
236
raise RuntimeError ("Agent not initialized. Call initialize() first." )
236
-
237
+
237
238
try :
238
239
# Convert messages to input format
239
240
if messages :
240
241
latest_message = messages [- 1 ]
241
242
input_text = latest_message .get ("content" , "" )
242
243
else :
243
244
input_text = ""
244
-
245
+
245
246
# Prepare chat history (all messages except the last one)
246
247
chat_history = []
247
248
for msg in messages [:- 1 ]:
@@ -251,20 +252,20 @@ async def run_async(
251
252
chat_history .append (AIMessage (content = msg ["content" ]))
252
253
elif msg ["role" ] == "system" :
253
254
chat_history .append (SystemMessage (content = msg ["content" ]))
254
-
255
+
255
256
# Run the agent
256
257
result = await self .agent_executor .ainvoke ({
257
258
"input" : input_text ,
258
259
"chat_history" : chat_history ,
259
260
"tool_names" : [tool .name for tool in self .tools ]
260
261
})
261
-
262
+
262
263
return result ["output" ]
263
-
264
+
264
265
except Exception as e :
265
266
logger .error (f"Agent execution failed: { e } " )
266
267
return f"I encountered an error while processing your request: { str (e )} "
267
-
268
+
268
269
async def stream_async (
269
270
self ,
270
271
messages : List [Dict [str , str ]],
0 commit comments