Skip to content

Commit 1d72d81

Browse files
authored
[fel] Enhanced MCP Client and Server Interaction with Tool Management Improvements (#141)
This commit introduces several enhancements to the MCP (Model Communication Protocol) client and server interaction, focusing on improving tool management and communication capabilities. Key changes include: 1. ToolInfo Interface Expansion: - Added a new static method parseIdentifier to parse tool identifiers into namespace and tool name components, enhancing tool identification and management. 2. MCP Client Enhancements: - Updated the DefaultMcpClient class to support more robust initialization and communication with the MCP server. - Introduced new methods for handling Server-Sent Events (SSE) and improved error handling for tool calls. - Enhanced the callTool method to return results directly and manage pending requests more efficiently. 3. MCP Server Adjustments: - Modified the McpServer interface and its implementations to better support tool addition and retrieval. - Updated the DefaultMcpServer to provide a more structured server schema and improved tool registration logic. 4. Weather Service Example: - Introduced a new weather service example (WeatherService and WeatherServiceImpl) to demonstrate tool implementation and usage within the MCP framework. 5. Code Structure and Cleanup: - Reorganized and cleaned up code in various classes to improve readability and maintainability. - Removed deprecated or unused code related to the MCP server entity. 6. Dependency and Plugin Updates: - Added new dependencies and updated plugins in the pom.xml file to support the enhanced MCP functionality. 7. Test Code Updates: - Expanded test cases for the MCP client and server to ensure compatibility and functionality of the new features. These changes collectively enhance the MCP framework's ability to manage and interact with tools, providing a more robust and flexible platform for model communication and execution.
1 parent 43566e8 commit 1d72d81

File tree

38 files changed

+593
-435
lines changed

38 files changed

+593
-435
lines changed

framework/fel/java/fel-core/src/main/java/modelengine/fel/core/tool/ToolInfo.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,4 +141,21 @@ static String identify(ToolInfo toolInfo) {
141141
static String identify(String namespace, String toolName) {
142142
return StringUtils.format("{0}:{1}", namespace, toolName);
143143
}
144+
145+
/**
146+
* Parses the tool identifier.
147+
*
148+
* @param identifier The identifier to be parsed.
149+
* @return An array containing the namespace and tool name.
150+
*/
151+
static String[] parseIdentifier(String identifier) {
152+
if (identifier == null || identifier.isEmpty()) {
153+
throw new IllegalArgumentException("Identifier cannot be null or empty.");
154+
}
155+
String[] parts = identifier.split(":", 2);
156+
if (parts.length != 2) {
157+
throw new IllegalArgumentException("Invalid identifier format. Expected 'namespace:name'.");
158+
}
159+
return parts;
160+
}
144161
}

framework/fel/java/plugins/tool-executor/src/main/java/modelengine/fel/tool/support/DefaultToolExecutor.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@
88

99
import static modelengine.fitframework.inspection.Validation.notNull;
1010

11-
import modelengine.fel.tool.ToolInfoEntity;
11+
import modelengine.fel.core.tool.ToolInfo;
1212
import modelengine.fel.tool.Tool;
1313
import modelengine.fel.tool.ToolFactory;
1414
import modelengine.fel.tool.ToolFactoryRepository;
15+
import modelengine.fel.tool.ToolInfoEntity;
1516
import modelengine.fel.tool.service.ToolExecuteService;
1617
import modelengine.fel.tool.service.ToolRepository;
1718
import modelengine.fitframework.annotation.Component;
@@ -70,13 +71,15 @@ public String execute(String group, String toolName, Map<String, Object> jsonObj
7071
@Override
7172
@Fitable(id = "standard")
7273
public String execute(String uniqueName, String jsonArgs) {
73-
return this.execute("Common", uniqueName, jsonArgs);
74+
String[] strings = ToolInfo.parseIdentifier(uniqueName);
75+
return this.execute(strings[0], strings[1], jsonArgs);
7476
}
7577

7678
@Override
7779
@Fitable(id = "standard")
7880
public String execute(String uniqueName, Map<String, Object> jsonObject) {
79-
return this.execute("Common", uniqueName, jsonObject);
81+
String[] strings = ToolInfo.parseIdentifier(uniqueName);
82+
return this.execute(strings[0], strings[1], jsonObject);
8083
}
8184

8285
private Tool getTool(String group, String toolName) {

framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java

Lines changed: 107 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@
99
import static modelengine.fitframework.util.ObjectUtils.cast;
1010

1111
import modelengine.fel.tool.mcp.client.McpClient;
12+
import modelengine.fel.tool.mcp.entity.ClientSchema;
1213
import modelengine.fel.tool.mcp.entity.JsonRpc;
1314
import modelengine.fel.tool.mcp.entity.Method;
14-
import modelengine.fel.tool.mcp.entity.Server;
15+
import modelengine.fel.tool.mcp.entity.ServerSchema;
1516
import modelengine.fel.tool.mcp.entity.Tool;
1617
import modelengine.fit.http.client.HttpClassicClient;
1718
import modelengine.fit.http.client.HttpClassicClientRequest;
@@ -26,8 +27,11 @@
2627
import modelengine.fitframework.schedule.ThreadPoolExecutor;
2728
import modelengine.fitframework.schedule.ThreadPoolScheduler;
2829
import modelengine.fitframework.serialization.ObjectSerializer;
30+
import modelengine.fitframework.util.CollectionUtils;
2931
import modelengine.fitframework.util.LockUtils;
32+
import modelengine.fitframework.util.MapBuilder;
3033
import modelengine.fitframework.util.ObjectUtils;
34+
import modelengine.fitframework.util.StringUtils;
3135
import modelengine.fitframework.util.ThreadUtils;
3236
import modelengine.fitframework.util.UuidUtils;
3337

@@ -54,37 +58,43 @@ public class DefaultMcpClient implements McpClient {
5458

5559
private final ObjectSerializer jsonSerializer;
5660
private final HttpClassicClient client;
57-
private final String connectionString;
61+
private final String baseUri;
62+
private final String sseEndpoint;
5863
private final String name;
5964
private final AtomicLong id = new AtomicLong(0);
6065

61-
private volatile String messageUrl;
66+
private volatile String messageEndpoint;
6267
private volatile String sessionId;
63-
private volatile Server server;
68+
private volatile ServerSchema serverSchema;
6469
private volatile boolean initialized = false;
6570
private final List<Tool> tools = new ArrayList<>();
6671
private final Object initializedLock = LockUtils.newSynchronizedLock();
6772
private final Object toolsLock = LockUtils.newSynchronizedLock();
6873
private final Map<Long, Consumer<JsonRpc.Response<Long>>> responseConsumers = new ConcurrentHashMap<>();
6974
private final Map<Long, Boolean> pendingRequests = new ConcurrentHashMap<>();
75+
private final Map<Long, Object> pendingResults = new ConcurrentHashMap<>();
7076

7177
/**
7278
* Constructs a new instance of the DefaultMcpClient.
7379
*
7480
* @param jsonSerializer The serializer used for JSON serialization and deserialization.
7581
* @param client The HTTP client used for communication with the MCP server.
76-
* @param connectionString The connection string used to establish the initial connection.
82+
* @param baseUri The base URI of the MCP server.
83+
* @param sseEndpoint The endpoint for the Server-Sent Events (SSE) connection.
7784
*/
78-
public DefaultMcpClient(ObjectSerializer jsonSerializer, HttpClassicClient client, String connectionString) {
85+
public DefaultMcpClient(ObjectSerializer jsonSerializer, HttpClassicClient client, String baseUri,
86+
String sseEndpoint) {
7987
this.jsonSerializer = jsonSerializer;
8088
this.client = client;
81-
this.connectionString = connectionString;
89+
this.baseUri = baseUri;
90+
this.sseEndpoint = sseEndpoint;
8291
this.name = UuidUtils.randomUuidString();
8392
}
8493

8594
@Override
8695
public void initialize() {
87-
HttpClassicClientRequest request = this.client.createRequest(HttpRequestMethod.GET, connectionString);
96+
HttpClassicClientRequest request =
97+
this.client.createRequest(HttpRequestMethod.GET, this.baseUri + this.sseEndpoint);
8898
Choir<TextEvent> messages = this.client.exchangeStream(request, TextEvent.class);
8999
ThreadPoolExecutor threadPool = ThreadPoolExecutor.custom()
90100
.threadPoolName("mcp-client-" + this.name)
@@ -125,7 +135,13 @@ public void initialize() {
125135
}
126136

127137
private void consumeTextEvent(TextEvent textEvent) {
128-
log.info("Receive message from MCP server. [message={}]", textEvent.data());
138+
log.info("Receive message from MCP server. [id={}, event={}, message={}]",
139+
textEvent.id(),
140+
textEvent.event(),
141+
textEvent.data());
142+
if (StringUtils.isBlank(textEvent.event()) || StringUtils.isBlank((String) textEvent.data())) {
143+
return;
144+
}
129145
if (Objects.equals(textEvent.event(), "endpoint")) {
130146
this.initializeMcpServer(textEvent);
131147
return;
@@ -157,7 +173,8 @@ private void pingServer() {
157173
log.info("MCP client is not initialized and {} method will be delayed.", Method.PING.code());
158174
return;
159175
}
160-
HttpClassicClientRequest request = this.client.createRequest(HttpRequestMethod.POST, this.messageUrl);
176+
HttpClassicClientRequest request =
177+
this.client.createRequest(HttpRequestMethod.POST, this.baseUri + this.messageEndpoint);
161178
long currentId = this.getNextId();
162179
JsonRpc.Request<Long> rpcRequest = JsonRpc.createRequest(currentId, Method.PING.code());
163180
request.entity(Entity.createObject(request, rpcRequest));
@@ -183,12 +200,17 @@ private void pingServer() {
183200
}
184201

185202
private void initializeMcpServer(TextEvent textEvent) {
186-
this.messageUrl = textEvent.data().toString();
187-
this.sessionId = textEvent.id();
188-
HttpClassicClientRequest request = this.client.createRequest(HttpRequestMethod.POST, this.messageUrl);
203+
this.messageEndpoint = textEvent.data().toString();
204+
HttpClassicClientRequest request =
205+
this.client.createRequest(HttpRequestMethod.POST, this.baseUri + this.messageEndpoint);
206+
this.sessionId =
207+
request.queries().first("session_id").orElseThrow(() -> new IllegalStateException("no session_id"));
189208
long currentId = this.getNextId();
190209
this.responseConsumers.put(currentId, this::initializedMcpServer);
191-
JsonRpc.Request<Long> rpcRequest = JsonRpc.createRequest(currentId, Method.INITIALIZE.code());
210+
ClientSchema schema = new ClientSchema("2024-11-05",
211+
new ClientSchema.Capabilities(),
212+
new ClientSchema.Info("FIT MCP Client", "3.5.0-SNAPSHOT"));
213+
JsonRpc.Request<Long> rpcRequest = JsonRpc.createRequest(currentId, Method.INITIALIZE.code(), schema);
192214
request.entity(Entity.createObject(request, rpcRequest));
193215
log.info("Send {} method to MCP server. [sessionId={}, request={}]",
194216
Method.INITIALIZE.code(),
@@ -223,9 +245,9 @@ private void initializedMcpServer(JsonRpc.Response<Long> response) {
223245
this.initialized = true;
224246
this.initializedLock.notifyAll();
225247
}
226-
this.server = ObjectUtils.toCustomObject(response.result(), Server.class);
227-
log.info("MCP server has initialized. [server={}]", this.server);
228-
HttpClassicClientRequest request = this.client.createRequest(HttpRequestMethod.POST, this.messageUrl);
248+
this.recordServerSchema(response);
249+
HttpClassicClientRequest request =
250+
this.client.createRequest(HttpRequestMethod.POST, this.baseUri + this.messageEndpoint);
229251
JsonRpc.Notification notification = JsonRpc.createNotification(Method.NOTIFICATION_INITIALIZED.code());
230252
request.entity(Entity.createObject(request, notification));
231253
log.info("Send {} method to MCP server. [sessionId={}, notification={}]",
@@ -249,12 +271,19 @@ private void initializedMcpServer(JsonRpc.Response<Long> response) {
249271
}
250272
}
251273

274+
private void recordServerSchema(JsonRpc.Response<Long> response) {
275+
Map<String, Object> mapResult = cast(response.result());
276+
this.serverSchema = ServerSchema.create(mapResult);
277+
log.info("MCP server has initialized. [server={}]", this.serverSchema);
278+
}
279+
252280
@Override
253281
public List<Tool> getTools() {
254282
if (this.isNotInitialized()) {
255283
throw new IllegalStateException("MCP client is not initialized. Please wait a moment.");
256284
}
257-
HttpClassicClientRequest request = this.client.createRequest(HttpRequestMethod.POST, this.messageUrl);
285+
HttpClassicClientRequest request =
286+
this.client.createRequest(HttpRequestMethod.POST, this.baseUri + this.messageEndpoint);
258287
long currentId = this.getNextId();
259288
this.responseConsumers.put(currentId, this::getTools0);
260289
this.pendingRequests.put(currentId, true);
@@ -292,6 +321,7 @@ private void getTools0(JsonRpc.Response<Long> response) {
292321
log.error("Failed to get tools list from MCP server. [sessionId={}, response={}]",
293322
this.sessionId,
294323
response);
324+
this.pendingRequests.put(response.id(), false);
295325
return;
296326
}
297327
Map<String, Object> result = cast(response.result());
@@ -301,16 +331,73 @@ private void getTools0(JsonRpc.Response<Long> response) {
301331
this.tools.addAll(rawTools.stream()
302332
.map(rawTool -> ObjectUtils.<Tool>toCustomObject(rawTool, Tool.class))
303333
.toList());
304-
this.pendingRequests.put(response.id(), false);
305334
}
335+
this.pendingRequests.put(response.id(), false);
306336
}
307337

308338
@Override
309339
public Object callTool(String name, Map<String, Object> arguments) {
310340
if (this.isNotInitialized()) {
311341
throw new IllegalStateException("MCP client is not initialized. Please wait a moment.");
312342
}
313-
return null;
343+
HttpClassicClientRequest request =
344+
this.client.createRequest(HttpRequestMethod.POST, this.baseUri + this.messageEndpoint);
345+
long currentId = this.getNextId();
346+
this.responseConsumers.put(currentId, this::callTools0);
347+
this.pendingRequests.put(currentId, true);
348+
JsonRpc.Request<Long> rpcRequest = JsonRpc.createRequest(currentId,
349+
Method.TOOLS_CALL.code(),
350+
MapBuilder.<String, Object>get().put("name", name).put("arguments", arguments).build());
351+
request.entity(Entity.createObject(request, rpcRequest));
352+
log.info("Send {} method to MCP server. [sessionId={}, request={}]",
353+
Method.TOOLS_CALL.code(),
354+
this.sessionId,
355+
rpcRequest);
356+
try (HttpClassicClientResponse<Object> exchange = request.exchange(Object.class)) {
357+
if (exchange.statusCode() >= 200 && exchange.statusCode() < 300) {
358+
log.info("Send {} method to MCP server successfully. [sessionId={}, statusCode={}]",
359+
Method.TOOLS_CALL.code(),
360+
this.sessionId,
361+
exchange.statusCode());
362+
} else {
363+
log.error("Failed to {} MCP server. [sessionId={}, statusCode={}]",
364+
Method.TOOLS_CALL.code(),
365+
this.sessionId,
366+
exchange.statusCode());
367+
}
368+
} catch (IOException e) {
369+
throw new IllegalStateException(e);
370+
}
371+
while (this.pendingRequests.get(currentId)) {
372+
ThreadUtils.sleep(100);
373+
}
374+
return this.pendingResults.get(currentId);
375+
}
376+
377+
private void callTools0(JsonRpc.Response<Long> response) {
378+
if (response.error() != null) {
379+
log.error("Failed to call tool from MCP server. [sessionId={}, response={}]", this.sessionId, response);
380+
this.pendingRequests.put(response.id(), false);
381+
return;
382+
}
383+
Map<String, Object> result = cast(response.result());
384+
boolean isError = cast(result.get("isError"));
385+
if (isError) {
386+
log.error("Failed to call tool from MCP server. [sessionId={}, result={}]", this.sessionId, result);
387+
this.pendingRequests.put(response.id(), false);
388+
return;
389+
}
390+
List<Map<String, Object>> rawContents = cast(result.get("content"));
391+
if (CollectionUtils.isEmpty(rawContents)) {
392+
log.error("Failed to call tool from MCP server: no result returned. [sessionId={}, result={}]",
393+
this.sessionId,
394+
result);
395+
this.pendingRequests.put(response.id(), false);
396+
return;
397+
}
398+
Map<String, Object> rawContent = rawContents.get(0);
399+
this.pendingResults.put(response.id(), rawContent.get("text"));
400+
this.pendingRequests.put(response.id(), false);
314401
}
315402

316403
private long getNextId() {

framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public DefaultMcpClientFactory(HttpClassicClientFactory clientFactory,
4444
}
4545

4646
@Override
47-
public McpClient create(String connectionString) {
48-
return new DefaultMcpClient(this.jsonSerializer, this.client, connectionString);
47+
public McpClient create(String baseUri, String sseEndpoint) {
48+
return new DefaultMcpClient(this.jsonSerializer, this.client, baseUri, sseEndpoint);
4949
}
5050
}

framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServer.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
package modelengine.fel.tool.mcp.server;
88

9-
import modelengine.fel.tool.mcp.entity.Server;
9+
import modelengine.fel.tool.mcp.entity.ServerSchema;
1010
import modelengine.fel.tool.mcp.entity.Tool;
1111

1212
import java.util.List;
@@ -20,21 +20,21 @@
2020
*/
2121
public interface McpServer {
2222
/**
23-
* Gets MCP Server Info.
23+
* Gets MCP server schema.
2424
*
25-
* @return The MCP Server Info as a {@link Map}{@code <}{@link String}{@code , }{@link Object}{@code >}.
25+
* @return The MCP server schema as a {@link ServerSchema}.
2626
*/
27-
Server getInfo();
27+
ServerSchema getSchema();
2828

2929
/**
30-
* Gets MCP Server Tools.
30+
* Gets MCP server tools.
3131
*
32-
* @return The MCP Server Tools as a {@link List}{@code <}{@link Tool}{@code >}.
32+
* @return The MCP server tools as a {@link List}{@code <}{@link Tool}{@code >}.
3333
*/
3434
List<Tool> getTools();
3535

3636
/**
37-
* Calls MCP Server Tool.
37+
* Calls MCP server tool.
3838
*
3939
* @param name The tool name as a {@link String}.
4040
* @param arguments The tool arguments as a {@link Map}{@code <}{@link String}{@code , }{@link Object}{@code >}.
@@ -43,18 +43,18 @@ public interface McpServer {
4343
Object callTool(String name, Map<String, Object> arguments);
4444

4545
/**
46-
* Registers MCP Server Tools Changed Observer.
46+
* Registers MCP server tools changed observer.
4747
*
48-
* @param observer The MCP Server Tools Changed Observer as a {@link ToolsChangedObserver}.
48+
* @param observer The MCP server tools changed observer as a {@link ToolsChangedObserver}.
4949
*/
5050
void registerToolsChangedObserver(ToolsChangedObserver observer);
5151

5252
/**
53-
* Represents the MCP Server Tools Changed Observer.
53+
* Represents the MCP server tools changed observer.
5454
*/
5555
interface ToolsChangedObserver {
5656
/**
57-
* Called when MCP Server Tools changed.
57+
* Called when MCP server tools changed.
5858
*/
5959
void onToolsChanged();
6060
}

0 commit comments

Comments
 (0)