Skip to content

Commit dff4acc

Browse files
authored
[fel] Refactor DefaultMcpClient to centralize request logic, improve error handling, and enhance initialization robustness (#143)
- Extracted common HTTP + JSON-RPC request logic into a reusable post2McpServer method to reduce duplication across ping, initialize, getTools, and callTool. - Introduced BiConsumer<HttpClassicClientRequest, Long> for custom per-method setup (e.g., session ID extraction, response handler registration). - Replaced polling loop in initialize() with explicit failure handling via IllegalStateException when initialization fails. - Used Event.ENDPOINT.code() instead of hardcoded "endpoint" for better type safety and maintainability. - Centralized and improved logging and exception handling; wrapped IO exceptions with contextual messages. - Added timeout support in waitInitialized() using wait(60_000L) to prevent indefinite waiting during initialization. - Simplified and unified RPC method calling pattern across the class. These changes increase code maintainability, improve error visibility, and make future enhancements easier.
1 parent 73ec891 commit dff4acc

File tree

1 file changed

+58
-113
lines changed
  • framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support

1 file changed

+58
-113
lines changed

framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java

Lines changed: 58 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import modelengine.fel.tool.mcp.client.McpClient;
1212
import modelengine.fel.tool.mcp.entity.ClientSchema;
13+
import modelengine.fel.tool.mcp.entity.Event;
1314
import modelengine.fel.tool.mcp.entity.JsonRpc;
1415
import modelengine.fel.tool.mcp.entity.Method;
1516
import modelengine.fel.tool.mcp.entity.ServerSchema;
@@ -43,6 +44,7 @@
4344
import java.util.concurrent.ConcurrentHashMap;
4445
import java.util.concurrent.TimeUnit;
4546
import java.util.concurrent.atomic.AtomicLong;
47+
import java.util.function.BiConsumer;
4648
import java.util.function.Consumer;
4749

4850
/**
@@ -129,8 +131,8 @@ public void initialize() {
129131
.runnable(this::pingServer)
130132
.policy(ExecutePolicy.fixedDelay(DELAY_MILLIS))
131133
.build(), DELAY_MILLIS);
132-
while (!this.waitInitialized()) {
133-
ThreadUtils.sleep(100);
134+
if (!this.waitInitialized()) {
135+
throw new IllegalStateException("Failed to initialize.");
134136
}
135137
}
136138

@@ -142,7 +144,7 @@ private void consumeTextEvent(TextEvent textEvent) {
142144
if (StringUtils.isBlank(textEvent.event()) || StringUtils.isBlank((String) textEvent.data())) {
143145
return;
144146
}
145-
if (Objects.equals(textEvent.event(), "endpoint")) {
147+
if (Objects.equals(textEvent.event(), Event.ENDPOINT.code())) {
146148
this.initializeMcpServer(textEvent);
147149
return;
148150
}
@@ -173,64 +175,20 @@ private void pingServer() {
173175
log.info("MCP client is not initialized and {} method will be delayed.", Method.PING.code());
174176
return;
175177
}
176-
HttpClassicClientRequest request =
177-
this.client.createRequest(HttpRequestMethod.POST, this.baseUri + this.messageEndpoint);
178-
long currentId = this.getNextId();
179-
JsonRpc.Request<Long> rpcRequest = JsonRpc.createRequest(currentId, Method.PING.code());
180-
request.entity(Entity.createObject(request, rpcRequest));
181-
log.info("Send {} method to MCP server. [sessionId={}, request={}]",
182-
Method.PING.code(),
183-
this.sessionId,
184-
rpcRequest);
185-
try (HttpClassicClientResponse<Object> exchange = request.exchange(Object.class)) {
186-
if (exchange.statusCode() >= 200 && exchange.statusCode() < 300) {
187-
log.info("Send {} method to MCP server successfully. [sessionId={}, statusCode={}]",
188-
Method.PING.code(),
189-
this.sessionId,
190-
exchange.statusCode());
191-
} else {
192-
log.error("Failed to {} MCP server. [sessionId={}, statusCode={}]",
193-
Method.PING.code(),
194-
this.sessionId,
195-
exchange.statusCode());
196-
}
197-
} catch (IOException e) {
198-
throw new IllegalStateException(e);
199-
}
178+
this.post2McpServer(Method.PING, null, null);
200179
}
201180

202181
private void initializeMcpServer(TextEvent textEvent) {
203182
this.messageEndpoint = textEvent.data().toString();
204-
HttpClassicClientRequest request =
205-
this.client.createRequest(HttpRequestMethod.POST, this.baseUri + this.messageEndpoint);
206-
this.sessionId =
207-
request.queries().first("session_id").orElseThrow(() -> new IllegalStateException("no session_id"));
208-
long currentId = this.getNextId();
209-
this.responseConsumers.put(currentId, this::initializedMcpServer);
210183
ClientSchema schema = new ClientSchema("2024-11-05",
211184
new ClientSchema.Capabilities(),
212185
new ClientSchema.Info("FIT MCP Client", "3.5.0-SNAPSHOT"));
213-
JsonRpc.Request<Long> rpcRequest = JsonRpc.createRequest(currentId, Method.INITIALIZE.code(), schema);
214-
request.entity(Entity.createObject(request, rpcRequest));
215-
log.info("Send {} method to MCP server. [sessionId={}, request={}]",
216-
Method.INITIALIZE.code(),
217-
this.sessionId,
218-
rpcRequest);
219-
try (HttpClassicClientResponse<Object> exchange = request.exchange(Object.class)) {
220-
if (exchange.statusCode() >= 200 && exchange.statusCode() < 300) {
221-
log.info("Send {} method to MCP server successfully. [sessionId={}, statusCode={}]",
222-
Method.INITIALIZE.code(),
223-
this.sessionId,
224-
exchange.statusCode());
225-
} else {
226-
log.error("Failed to {} MCP server. [sessionId={}, statusCode={}]",
227-
Method.INITIALIZE.code(),
228-
this.sessionId,
229-
exchange.statusCode());
230-
}
231-
} catch (IOException e) {
232-
throw new IllegalStateException(e);
233-
}
186+
this.post2McpServer(Method.INITIALIZE, schema, (request, currentId) -> {
187+
this.sessionId = request.queries()
188+
.first("session_id")
189+
.orElseThrow(() -> new IllegalStateException("The session_id cannot be empty."));
190+
this.responseConsumers.put(currentId, this::initializedMcpServer);
191+
});
234192
}
235193

236194
private void initializedMcpServer(JsonRpc.Response<Long> response) {
@@ -282,33 +240,11 @@ public List<Tool> getTools() {
282240
if (this.isNotInitialized()) {
283241
throw new IllegalStateException("MCP client is not initialized. Please wait a moment.");
284242
}
285-
HttpClassicClientRequest request =
286-
this.client.createRequest(HttpRequestMethod.POST, this.baseUri + this.messageEndpoint);
287-
long currentId = this.getNextId();
288-
this.responseConsumers.put(currentId, this::getTools0);
289-
this.pendingRequests.put(currentId, true);
290-
JsonRpc.Request<Long> rpcRequest = JsonRpc.createRequest(currentId, Method.TOOLS_LIST.code());
291-
request.entity(Entity.createObject(request, rpcRequest));
292-
log.info("Send {} method to MCP server. [sessionId={}, request={}]",
293-
Method.TOOLS_LIST.code(),
294-
this.sessionId,
295-
rpcRequest);
296-
try (HttpClassicClientResponse<Object> exchange = request.exchange(Object.class)) {
297-
if (exchange.statusCode() >= 200 && exchange.statusCode() < 300) {
298-
log.info("Send {} method to MCP server successfully. [sessionId={}, statusCode={}]",
299-
Method.TOOLS_LIST.code(),
300-
this.sessionId,
301-
exchange.statusCode());
302-
} else {
303-
log.error("Failed to {} MCP server. [sessionId={}, statusCode={}]",
304-
Method.TOOLS_LIST.code(),
305-
this.sessionId,
306-
exchange.statusCode());
307-
}
308-
} catch (IOException e) {
309-
throw new IllegalStateException(e);
310-
}
311-
while (this.pendingRequests.get(currentId)) {
243+
long requestId = this.post2McpServer(Method.TOOLS_LIST, null, (request, currentId) -> {
244+
this.responseConsumers.put(currentId, this::getTools0);
245+
this.pendingRequests.put(currentId, true);
246+
});
247+
while (this.pendingRequests.get(requestId)) {
312248
ThreadUtils.sleep(100);
313249
}
314250
synchronized (this.toolsLock) {
@@ -340,38 +276,16 @@ public Object callTool(String name, Map<String, Object> arguments) {
340276
if (this.isNotInitialized()) {
341277
throw new IllegalStateException("MCP client is not initialized. Please wait a moment.");
342278
}
343-
HttpClassicClientRequest request =
344-
this.client.createRequest(HttpRequestMethod.POST, this.baseUri + this.messageEndpoint);
345-
long currentId = this.getNextId();
346-
this.responseConsumers.put(currentId, this::callTools0);
347-
this.pendingRequests.put(currentId, true);
348-
JsonRpc.Request<Long> rpcRequest = JsonRpc.createRequest(currentId,
349-
Method.TOOLS_CALL.code(),
350-
MapBuilder.<String, Object>get().put("name", name).put("arguments", arguments).build());
351-
request.entity(Entity.createObject(request, rpcRequest));
352-
log.info("Send {} method to MCP server. [sessionId={}, request={}]",
353-
Method.TOOLS_CALL.code(),
354-
this.sessionId,
355-
rpcRequest);
356-
try (HttpClassicClientResponse<Object> exchange = request.exchange(Object.class)) {
357-
if (exchange.statusCode() >= 200 && exchange.statusCode() < 300) {
358-
log.info("Send {} method to MCP server successfully. [sessionId={}, statusCode={}]",
359-
Method.TOOLS_CALL.code(),
360-
this.sessionId,
361-
exchange.statusCode());
362-
} else {
363-
log.error("Failed to {} MCP server. [sessionId={}, statusCode={}]",
364-
Method.TOOLS_CALL.code(),
365-
this.sessionId,
366-
exchange.statusCode());
367-
}
368-
} catch (IOException e) {
369-
throw new IllegalStateException(e);
370-
}
371-
while (this.pendingRequests.get(currentId)) {
279+
long requestId = this.post2McpServer(Method.TOOLS_CALL,
280+
MapBuilder.<String, Object>get().put("name", name).put("arguments", arguments).build(),
281+
(request, currentId) -> {
282+
this.responseConsumers.put(currentId, this::callTools0);
283+
this.pendingRequests.put(currentId, true);
284+
});
285+
while (this.pendingRequests.get(requestId)) {
372286
ThreadUtils.sleep(100);
373287
}
374-
return this.pendingResults.get(currentId);
288+
return this.pendingResults.get(requestId);
375289
}
376290

377291
private void callTools0(JsonRpc.Response<Long> response) {
@@ -400,6 +314,37 @@ private void callTools0(JsonRpc.Response<Long> response) {
400314
this.pendingRequests.put(response.id(), false);
401315
}
402316

317+
private long post2McpServer(Method method, Object requestParams,
318+
BiConsumer<HttpClassicClientRequest, Long> requestConsumer) {
319+
HttpClassicClientRequest request =
320+
this.client.createRequest(HttpRequestMethod.POST, this.baseUri + this.messageEndpoint);
321+
long currentId = this.getNextId();
322+
if (requestConsumer != null) {
323+
requestConsumer.accept(request, currentId);
324+
}
325+
JsonRpc.Request<Long> rpcRequest = JsonRpc.createRequest(currentId, method.code(), requestParams);
326+
request.entity(Entity.createObject(request, rpcRequest));
327+
log.info("Send {} method to MCP server. [sessionId={}, request={}]", method.code(), this.sessionId, rpcRequest);
328+
try (HttpClassicClientResponse<Object> exchange = request.exchange(Object.class)) {
329+
if (exchange.statusCode() >= 200 && exchange.statusCode() < 300) {
330+
log.info("Send {} method to MCP server successfully. [sessionId={}, statusCode={}]",
331+
method.code(),
332+
this.sessionId,
333+
exchange.statusCode());
334+
} else {
335+
log.error("Failed to {} MCP server. [sessionId={}, statusCode={}]",
336+
method.code(),
337+
this.sessionId,
338+
exchange.statusCode());
339+
}
340+
} catch (IOException e) {
341+
throw new IllegalStateException(StringUtils.format("Failed to {0} MCP server. [sessionId={1}]",
342+
method.code(),
343+
this.sessionId), e);
344+
}
345+
return currentId;
346+
}
347+
403348
private long getNextId() {
404349
long tmpId = this.id.getAndIncrement();
405350
if (tmpId < 0) {
@@ -422,10 +367,10 @@ private boolean waitInitialized() {
422367
return true;
423368
}
424369
try {
425-
this.initializedLock.wait();
370+
this.initializedLock.wait(60_000L);
426371
} catch (InterruptedException e) {
427372
Thread.currentThread().interrupt();
428-
throw new IllegalStateException(e);
373+
throw new IllegalStateException("Failed to initialize.", e);
429374
}
430375
}
431376
return this.initialized;

0 commit comments

Comments
 (0)