Skip to content

Commit 6bcdba2

Browse files
authored
[fel] Improve McpClient lifecycle management with closed state tracking (#146)
- Added a closed flag to DefaultMcpClient to track client lifecycle state. - Prevent method invocations after closure by adding checks in initialize(), getTools(), and callTool(). - Safely handle potential nulls in close() to avoid exceptions during partial initialization. - Ensures consistent behavior and prevents resource leaks or usage after the client has been closed. This change improves reliability and correctness by enforcing proper usage of the MCP client throughout its lifecycle.
1 parent 6417d6e commit 6bcdba2

File tree

1 file changed

+18
-3
lines changed
  • framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support

1 file changed

+18
-3
lines changed

framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public class DefaultMcpClient implements McpClient {
7070
private volatile String sessionId;
7171
private volatile ServerSchema serverSchema;
7272
private volatile boolean initialized = false;
73+
private volatile boolean closed = false;
7374
private final List<Tool> tools = new ArrayList<>();
7475
private final Object initializedLock = LockUtils.newSynchronizedLock();
7576
private final Object toolsLock = LockUtils.newSynchronizedLock();
@@ -99,6 +100,9 @@ public DefaultMcpClient(ObjectSerializer jsonSerializer, HttpClassicClient clien
99100

100101
@Override
101102
public void initialize() {
103+
if (this.closed) {
104+
throw new IllegalStateException("The MCP client is closed.");
105+
}
102106
HttpClassicClientRequest request =
103107
this.client.createRequest(HttpRequestMethod.GET, this.baseUri + this.sseEndpoint);
104108
Choir<TextEvent> messages = this.client.exchangeStream(request, TextEvent.class);
@@ -116,8 +120,8 @@ public void initialize() {
116120
.build();
117121
messages.subscribeOn(threadPool).subscribe(subscription -> {
118122
log.info("Prepare to create SSE channel.");
119-
subscription.request(Long.MAX_VALUE);
120123
this.subscription = subscription;
124+
subscription.request(Long.MAX_VALUE);
121125
},
122126
(subscription, textEvent) -> this.consumeTextEvent(textEvent),
123127
subscription -> log.info("SSE channel is completed."),
@@ -242,6 +246,9 @@ private void recordServerSchema(JsonRpc.Response<Long> response) {
242246

243247
@Override
244248
public List<Tool> getTools() {
249+
if (this.closed) {
250+
throw new IllegalStateException("The MCP client is closed.");
251+
}
245252
if (this.isNotInitialized()) {
246253
throw new IllegalStateException("MCP client is not initialized. Please wait a moment.");
247254
}
@@ -278,6 +285,9 @@ private void getTools0(JsonRpc.Response<Long> response) {
278285

279286
@Override
280287
public Object callTool(String name, Map<String, Object> arguments) {
288+
if (this.closed) {
289+
throw new IllegalStateException("The MCP client is closed.");
290+
}
281291
if (this.isNotInitialized()) {
282292
throw new IllegalStateException("MCP client is not initialized. Please wait a moment.");
283293
}
@@ -383,9 +393,14 @@ private boolean waitInitialized() {
383393

384394
@Override
385395
public void close() throws IOException {
386-
this.subscription.cancel();
396+
this.closed = true;
397+
if (this.subscription != null) {
398+
this.subscription.cancel();
399+
}
387400
try {
388-
this.pingScheduler.shutdown();
401+
if (this.pingScheduler != null) {
402+
this.pingScheduler.shutdown();
403+
}
389404
} catch (InterruptedException e) {
390405
Thread.currentThread().interrupt();
391406
throw new IOException(e);

0 commit comments

Comments
 (0)