Skip to content

Commit 6417d6e

Browse files
authored
[fel] Add close support in McpClient and release resources properly (#144)
- Added Closeable interface to McpClient to support proper resource cleanup. - Implemented close() method in DefaultMcpClient to: - Cancel SSE subscription. - Shutdown ping scheduler gracefully. - Log client closure with context. - Added /close endpoint in TestController for testing purposes to trigger client shutdown via HTTP. This change ensures that the MCP client releases all held resources when no longer needed, improving reliability and preventing leaks.
1 parent 1d73b5d commit 6417d6e

File tree

3 files changed

+38
-3
lines changed
  • framework/fel/java

3 files changed

+38
-3
lines changed

framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import modelengine.fit.http.entity.TextEvent;
2323
import modelengine.fit.http.protocol.HttpRequestMethod;
2424
import modelengine.fitframework.flowable.Choir;
25+
import modelengine.fitframework.flowable.Subscription;
2526
import modelengine.fitframework.log.Logger;
2627
import modelengine.fitframework.schedule.ExecutePolicy;
2728
import modelengine.fitframework.schedule.Task;
@@ -76,6 +77,9 @@ public class DefaultMcpClient implements McpClient {
7677
private final Map<Long, Boolean> pendingRequests = new ConcurrentHashMap<>();
7778
private final Map<Long, Object> pendingResults = new ConcurrentHashMap<>();
7879

80+
private volatile Subscription subscription;
81+
private volatile ThreadPoolScheduler pingScheduler;
82+
7983
/**
8084
* Constructs a new instance of the DefaultMcpClient.
8185
*
@@ -113,11 +117,12 @@ public void initialize() {
113117
messages.subscribeOn(threadPool).subscribe(subscription -> {
114118
log.info("Prepare to create SSE channel.");
115119
subscription.request(Long.MAX_VALUE);
120+
this.subscription = subscription;
116121
},
117122
(subscription, textEvent) -> this.consumeTextEvent(textEvent),
118123
subscription -> log.info("SSE channel is completed."),
119124
(subscription, cause) -> log.error("SSE channel is failed.", cause));
120-
ThreadPoolScheduler pingScheduler = ThreadPoolScheduler.custom()
125+
this.pingScheduler = ThreadPoolScheduler.custom()
121126
.threadPoolName("mcp-client-ping-" + this.name)
122127
.awaitTermination(3, TimeUnit.SECONDS)
123128
.isImmediateShutdown(true)
@@ -127,7 +132,7 @@ public void initialize() {
127132
.workQueueCapacity(Integer.MAX_VALUE)
128133
.isDaemonThread(true)
129134
.build();
130-
pingScheduler.schedule(Task.builder()
135+
this.pingScheduler.schedule(Task.builder()
131136
.runnable(this::pingServer)
132137
.policy(ExecutePolicy.fixedDelay(DELAY_MILLIS))
133138
.build(), DELAY_MILLIS);
@@ -375,4 +380,16 @@ private boolean waitInitialized() {
375380
}
376381
return this.initialized;
377382
}
383+
384+
@Override
385+
public void close() throws IOException {
386+
this.subscription.cancel();
387+
try {
388+
this.pingScheduler.shutdown();
389+
} catch (InterruptedException e) {
390+
Thread.currentThread().interrupt();
391+
throw new IOException(e);
392+
}
393+
log.info("Close MCP client. [name={}, sessionId={}]", this.name, this.sessionId);
394+
}
378395
}

framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import modelengine.fit.http.annotation.RequestQuery;
1717
import modelengine.fitframework.annotation.Component;
1818

19+
import java.io.IOException;
1920
import java.util.List;
2021
import java.util.Map;
2122

@@ -55,6 +56,22 @@ public String initialize(@RequestQuery(name = "baseUri") String baseUri,
5556
return "Initialized";
5657
}
5758

59+
/**
60+
* Closes the MCP client and releases any resources associated with it.
61+
* This method ensures that the MCP client is properly closed and resources are released.
62+
*
63+
* @return A string indicating that the close operation was successful.
64+
*/
65+
@PostMapping(path = "/close")
66+
public String close() {
67+
try {
68+
this.client.close();
69+
} catch (IOException e) {
70+
throw new IllegalStateException("Failed to close.", e);
71+
}
72+
return "Closed";
73+
}
74+
5875
/**
5976
* Retrieves a list of available tools from the MCP server.
6077
* This method calls the MCP client to fetch the list of tools and returns it to the caller.

framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import modelengine.fel.tool.mcp.entity.Tool;
1010

11+
import java.io.Closeable;
1112
import java.util.List;
1213
import java.util.Map;
1314

@@ -20,7 +21,7 @@
2021
* @author 季聿阶
2122
* @since 2025-05-21
2223
*/
23-
public interface McpClient {
24+
public interface McpClient extends Closeable {
2425
/**
2526
* Initializes the MCP Client.
2627
*/

0 commit comments

Comments
 (0)