Skip to content

Commit 1fecce3

Browse files
committed
Merge branch '3.5.x'
2 parents a6dc374 + 6bcdba2 commit 1fecce3

File tree

1 file changed

+18
-3
lines changed
  • framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support

1 file changed

+18
-3
lines changed

framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public class DefaultMcpClient implements McpClient {
7070
private volatile String sessionId;
7171
private volatile ServerSchema serverSchema;
7272
private volatile boolean initialized = false;
73+
private volatile boolean closed = false;
7374
private final List<Tool> tools = new ArrayList<>();
7475
private final Object initializedLock = LockUtils.newSynchronizedLock();
7576
private final Object toolsLock = LockUtils.newSynchronizedLock();
@@ -99,6 +100,9 @@ public DefaultMcpClient(ObjectSerializer jsonSerializer, HttpClassicClient clien
99100

100101
@Override
101102
public void initialize() {
103+
if (this.closed) {
104+
throw new IllegalStateException("The MCP client is closed.");
105+
}
102106
HttpClassicClientRequest request =
103107
this.client.createRequest(HttpRequestMethod.GET, this.baseUri + this.sseEndpoint);
104108
Choir<TextEvent> messages = this.client.exchangeStream(request, TextEvent.class);
@@ -116,8 +120,8 @@ public void initialize() {
116120
.build();
117121
messages.subscribeOn(threadPool).subscribe(subscription -> {
118122
log.info("Prepare to create SSE channel.");
119-
subscription.request(Long.MAX_VALUE);
120123
this.subscription = subscription;
124+
subscription.request(Long.MAX_VALUE);
121125
},
122126
(subscription, textEvent) -> this.consumeTextEvent(textEvent),
123127
subscription -> log.info("SSE channel is completed."),
@@ -242,6 +246,9 @@ private void recordServerSchema(JsonRpc.Response<Long> response) {
242246

243247
@Override
244248
public List<Tool> getTools() {
249+
if (this.closed) {
250+
throw new IllegalStateException("The MCP client is closed.");
251+
}
245252
if (this.isNotInitialized()) {
246253
throw new IllegalStateException("MCP client is not initialized. Please wait a moment.");
247254
}
@@ -278,6 +285,9 @@ private void getTools0(JsonRpc.Response<Long> response) {
278285

279286
@Override
280287
public Object callTool(String name, Map<String, Object> arguments) {
288+
if (this.closed) {
289+
throw new IllegalStateException("The MCP client is closed.");
290+
}
281291
if (this.isNotInitialized()) {
282292
throw new IllegalStateException("MCP client is not initialized. Please wait a moment.");
283293
}
@@ -383,9 +393,14 @@ private boolean waitInitialized() {
383393

384394
@Override
385395
public void close() throws IOException {
386-
this.subscription.cancel();
396+
this.closed = true;
397+
if (this.subscription != null) {
398+
this.subscription.cancel();
399+
}
387400
try {
388-
this.pingScheduler.shutdown();
401+
if (this.pingScheduler != null) {
402+
this.pingScheduler.shutdown();
403+
}
389404
} catch (InterruptedException e) {
390405
Thread.currentThread().interrupt();
391406
throw new IOException(e);

0 commit comments

Comments
 (0)