Skip to content

Commit c38ae25

Browse files
author
3200105739
committed
修改工厂类逻辑
1 parent ae0ec6b commit c38ae25

File tree

8 files changed

+211
-110
lines changed

8 files changed

+211
-110
lines changed

framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,15 @@
66

77
package modelengine.fel.tool.mcp.client.support;
88

9+
import io.modelcontextprotocol.spec.McpSchema;
910
import modelengine.fel.tool.mcp.client.McpClient;
1011
import modelengine.fel.tool.mcp.client.McpClientFactory;
1112
import modelengine.fitframework.annotation.Component;
1213
import modelengine.fitframework.annotation.Value;
1314

15+
import java.util.function.Consumer;
16+
import java.util.function.Function;
17+
1418
/**
1519
* Represents a factory for creating instances of the {@link DefaultMcpStreamableClient}.
1620
* This class is responsible for initializing and configuring.
@@ -28,11 +32,32 @@ public class DefaultMcpClientFactory implements McpClientFactory {
2832
* @param requestTimeoutSeconds The timeout duration of requests. Units: seconds.
2933
*/
3034
public DefaultMcpClientFactory(@Value("${mcp.client.request.timeout-seconds}") int requestTimeoutSeconds) {
31-
this.requestTimeoutSeconds = requestTimeoutSeconds;
35+
this.requestTimeoutSeconds = requestTimeoutSeconds > 0 ? requestTimeoutSeconds : 180;
3236
}
3337

3438
@Override
3539
public McpClient create(String baseUri, String sseEndpoint) {
36-
return new DefaultMcpStreamableClient(baseUri, sseEndpoint, requestTimeoutSeconds);
40+
return create(baseUri, sseEndpoint, DefaultMcpClientMessageHandler::defaultLoggingMessageHandler, null);
41+
}
42+
43+
@Override
44+
public McpClient create(String baseUri, String sseEndpoint, Consumer<McpSchema.LoggingMessageNotification> loggingConsumer) {
45+
return create(baseUri, sseEndpoint, loggingConsumer, null);
46+
}
47+
48+
@Override
49+
public McpClient create(String baseUri, String sseEndpoint, Function<McpSchema.ElicitRequest, McpSchema.ElicitResult> elicitationHandler) {
50+
return create(baseUri, sseEndpoint, DefaultMcpClientMessageHandler::defaultLoggingMessageHandler, elicitationHandler);
51+
}
52+
53+
@Override
54+
public McpClient create(String baseUri, String sseEndpoint,
55+
Consumer<McpSchema.LoggingMessageNotification> loggingConsumer,
56+
Function<McpSchema.ElicitRequest, McpSchema.ElicitResult> elicitationHandler) {
57+
return new DefaultMcpStreamableClient(baseUri,
58+
sseEndpoint,
59+
requestTimeoutSeconds,
60+
loggingConsumer,
61+
elicitationHandler);
3762
}
3863
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*---------------------------------------------------------------------------------------------
2+
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
3+
* This file is a part of the ModelEngine Project.
4+
* Licensed under the MIT License. See License.txt in the project root for license information.
5+
*--------------------------------------------------------------------------------------------*/
6+
7+
package modelengine.fel.tool.mcp.client.support;
8+
9+
import io.modelcontextprotocol.spec.McpSchema;
10+
import modelengine.fitframework.annotation.Component;
11+
import modelengine.fitframework.log.Logger;
12+
13+
import java.util.HashMap;
14+
import java.util.Map;
15+
import java.util.Scanner;
16+
17+
/**
18+
* Handles MCP client messages received from MCP server,
19+
* including logging notifications and elicitation requests.
20+
*
21+
* @author 黄可欣
22+
* @since 2025-11-03
23+
*/
24+
@Component
25+
public class DefaultMcpClientMessageHandler {
26+
private static final Logger log = Logger.get(DefaultMcpClientMessageHandler.class);
27+
28+
/**
29+
* Handles logging messages received from the MCP server.
30+
*
31+
* @param notification The {@link McpSchema.LoggingMessageNotification} containing the log level and data.
32+
*/
33+
public static void defaultLoggingMessageHandler(McpSchema.LoggingMessageNotification notification) {
34+
log.info("[Client] log: {}-{}", notification.level(), notification.data());
35+
}
36+
}

framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java

Lines changed: 96 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import java.util.HashMap;
2525
import java.util.List;
2626
import java.util.Map;
27+
import java.util.UUID;
28+
import java.util.function.Consumer;
29+
import java.util.function.Function;
2730
import java.util.stream.Collectors;
2831

2932
/**
@@ -35,6 +38,7 @@
3538
public class DefaultMcpStreamableClient implements McpClient {
3639
private static final Logger log = Logger.get(DefaultMcpStreamableClient.class);
3740

41+
private final String clientId;
3842
private final McpSyncClient mcpSyncClient;
3943
private volatile boolean initialized = false;
4044
private volatile boolean closed = false;
@@ -45,22 +49,44 @@ public class DefaultMcpStreamableClient implements McpClient {
4549
* @param baseUri The base URI of the MCP server.
4650
* @param sseEndpoint The endpoint for the Server-Sent Events (SSE) connection.
4751
* @param requestTimeoutSeconds The timeout duration of requests. Units: seconds.
52+
* @param loggingConsumer The consumer to handle logging messages from the MCP server.
53+
* @param elicitationHandler The function to handle elicitation requests from the MCP server.
4854
*/
49-
public DefaultMcpStreamableClient(String baseUri, String sseEndpoint, int requestTimeoutSeconds) {
55+
public DefaultMcpStreamableClient(String baseUri, String sseEndpoint, int requestTimeoutSeconds,
56+
Consumer<McpSchema.LoggingMessageNotification> loggingConsumer,
57+
Function<McpSchema.ElicitRequest, McpSchema.ElicitResult> elicitationHandler) {
58+
this.clientId = UUID.randomUUID().toString();
5059
notBlank(baseUri, "The MCP server base URI cannot be blank.");
5160
notBlank(sseEndpoint, "The MCP server SSE endpoint cannot be blank.");
61+
log.info("Creating MCP client [{}] for server: {}", clientId, baseUri);
5262
ObjectMapper mapper = new ObjectMapper();
5363
HttpClientStreamableHttpTransport transport = HttpClientStreamableHttpTransport.builder(baseUri)
5464
.jsonMapper(new JacksonMcpJsonMapper(mapper))
5565
.endpoint(sseEndpoint)
5666
.build();
57-
this.mcpSyncClient = io.modelcontextprotocol.client.McpClient.sync(transport)
58-
.requestTimeout(Duration.ofSeconds(requestTimeoutSeconds > 0 ? requestTimeoutSeconds : 5))
59-
.capabilities(McpSchema.ClientCapabilities.builder().elicitation().build())
60-
.loggingConsumer(McpClientMessageHandler::handleLoggingMessage)
61-
.elicitation(McpClientMessageHandler::handleElicitationRequest)
62-
.jsonSchemaValidator(new DefaultJsonSchemaValidator(mapper))
63-
.build();
67+
68+
if (elicitationHandler != null) {
69+
this.mcpSyncClient = io.modelcontextprotocol.client.McpClient.sync(transport)
70+
.requestTimeout(Duration.ofSeconds(requestTimeoutSeconds))
71+
.capabilities(McpSchema.ClientCapabilities.builder().elicitation().build())
72+
.loggingConsumer(loggingConsumer)
73+
.elicitation(elicitationHandler)
74+
.jsonSchemaValidator(new DefaultJsonSchemaValidator(mapper))
75+
.build();
76+
} else {
77+
this.mcpSyncClient = io.modelcontextprotocol.client.McpClient.sync(transport)
78+
.requestTimeout(Duration.ofSeconds(requestTimeoutSeconds))
79+
.capabilities(McpSchema.ClientCapabilities.builder().build())
80+
.loggingConsumer(loggingConsumer)
81+
.jsonSchemaValidator(new DefaultJsonSchemaValidator(mapper))
82+
.build();
83+
}
84+
85+
}
86+
87+
@Override
88+
public String getClientId() {
89+
return clientId;
6490
}
6591

6692
/**
@@ -70,12 +96,10 @@ public DefaultMcpStreamableClient(String baseUri, String sseEndpoint, int reques
7096
*/
7197
@Override
7298
public void initialize() {
73-
if (this.closed) {
74-
throw new IllegalStateException("The MCP client is closed.");
75-
}
99+
ensureNotClosed();
76100
mcpSyncClient.initialize();
77101
this.initialized = true;
78-
log.info("MCP client initialized successfully.");
102+
log.info("MCP client [{}] initialized successfully.", clientId);
79103
}
80104

81105
/**
@@ -87,27 +111,22 @@ public void initialize() {
87111
*/
88112
@Override
89113
public List<Tool> getTools() {
90-
if (this.closed) {
91-
throw new IllegalStateException("The MCP client is closed.");
92-
}
93-
if (!this.initialized) {
94-
throw new IllegalStateException("MCP client is not initialized. Please wait a moment.");
95-
}
114+
ensureReady();
96115

97116
try {
98117
McpSchema.ListToolsResult result = this.mcpSyncClient.listTools();
99118
if (result == null || result.tools() == null) {
100-
log.warn("Failed to get tools from MCP server: result is null.");
119+
log.warn("MCP client [{}] failed to get tools: result is null.", clientId);
101120
throw new IllegalStateException("Failed to get tools from MCP server: result is null.");
102121
}
103122

104123
List<Tool> tools = result.tools().stream().map(this::convertToFelTool).collect(Collectors.toList());
105124

106-
log.info("Successfully retrieved {} tools from MCP server.", tools.size());
107-
tools.forEach(tool -> log.info("Tool - Name: {}, Description: {}", tool.getName(), tool.getDescription()));
125+
log.info("MCP client [{}] successfully retrieved {} tools.", clientId, tools.size());
126+
tools.forEach(tool -> log.debug("Tool - Name: {}, Description: {}", tool.getName(), tool.getDescription()));
108127
return tools;
109128
} catch (Exception e) {
110-
log.error("Failed to get tools from MCP server: {}", e);
129+
log.error("MCP client [{}] failed to get tools: {}", clientId, e);
111130
throw new IllegalStateException("Failed to get tools from MCP server: " + e.getMessage(), e);
112131
}
113132
}
@@ -125,28 +144,43 @@ public List<Tool> getTools() {
125144
*/
126145
@Override
127146
public Object callTool(String name, Map<String, Object> arguments) {
128-
if (this.closed) {
129-
throw new IllegalStateException("The MCP client is closed.");
130-
}
131-
if (!this.initialized) {
132-
throw new IllegalStateException("MCP client is not initialized. Please wait a moment.");
133-
}
147+
ensureReady();
134148
try {
135-
log.info("Calling tool: {} with arguments: {}", name, arguments);
149+
log.info("MCP client [{}] calling tool: {} with arguments: {}", clientId, name, arguments);
136150
McpSchema.CallToolResult result =
137151
this.mcpSyncClient.callTool(new McpSchema.CallToolRequest(name, arguments));
138152

139153
if (result == null) {
140-
log.error("Failed to call tool '{}': result is null.", name);
154+
log.error("MCP client [{}] failed to call tool '{}': result is null.", clientId, name);
141155
throw new IllegalStateException("Failed to call tool '" + name + "': result is null.");
142156
}
143157
return processToolResult(result, name);
144158
} catch (Exception e) {
145-
log.error("Failed to call tool '{}' from MCP server.", name, e);
159+
log.error("MCP client [{}] failed to call tool '{}': {}", clientId, name, e);
146160
throw new IllegalStateException("Failed to call tool '" + name + "': " + e.getMessage(), e);
147161
}
148162
}
149163

164+
/**
165+
* Builds an error message from tool result content.
166+
*
167+
* @param name The name of the tool that was called.
168+
* @param content The content list from the tool result.
169+
* @return The formatted error message.
170+
*/
171+
private String buildToolErrorMessage(String name, List<McpSchema.Content> content) {
172+
String errorMsg = "Tool '" + name + "' returned an error";
173+
if (content != null && !content.isEmpty()) {
174+
McpSchema.Content errorContent = content.get(0);
175+
if (errorContent instanceof McpSchema.TextContent textContent) {
176+
errorMsg += ": " + textContent.text();
177+
} else {
178+
errorMsg += ": " + errorContent;
179+
}
180+
}
181+
return errorMsg;
182+
}
183+
150184
/**
151185
* Processes the tool call result and extracts the content.
152186
* Handles error cases and different content types (text, image, etc.).
@@ -160,33 +194,25 @@ public Object callTool(String name, Map<String, Object> arguments) {
160194
*/
161195
private Object processToolResult(McpSchema.CallToolResult result, String name) {
162196
if (result.isError() != null && result.isError()) {
163-
String errorMsg = "Tool '" + name + "' returned an error";
164-
if (result.content() != null && !result.content().isEmpty()) {
165-
Object errorContent = result.content().get(0);
166-
if (errorContent instanceof McpSchema.TextContent textContent) {
167-
errorMsg += ": " + textContent.text();
168-
} else {
169-
errorMsg += ": " + errorContent;
170-
}
171-
}
172-
log.error(errorMsg);
197+
String errorMsg = buildToolErrorMessage(name, result.content());
198+
log.error("MCP client [{}]: {}", clientId, errorMsg);
173199
throw new IllegalStateException(errorMsg);
174200
}
175201

176202
if (result.content() == null || result.content().isEmpty()) {
177-
log.warn("Tool '{}' returned empty content.", name);
203+
log.warn("MCP client [{}] tool '{}' returned empty content.", clientId, name);
178204
return null;
179205
}
180206

181207
Object content = result.content().get(0);
182208
if (content instanceof McpSchema.TextContent textContent) {
183-
log.info("Successfully called tool '{}', result: {}", name, textContent.text());
209+
log.info("MCP client [{}] successfully called tool '{}', result: {}", clientId, name, textContent.text());
184210
return textContent.text();
185211
} else if (content instanceof McpSchema.ImageContent imageContent) {
186-
log.info("Successfully called tool '{}', returned image content.", name);
212+
log.info("MCP client [{}] successfully called tool '{}', returned image content.", clientId, name);
187213
return imageContent;
188214
} else {
189-
log.info("Successfully called tool '{}', content type: {}", name, content.getClass().getSimpleName());
215+
log.info("MCP client [{}] successfully called tool '{}', content type: {}", clientId, name, content.getClass().getSimpleName());
190216
return content;
191217
}
192218
}
@@ -198,9 +224,10 @@ private Object processToolResult(McpSchema.CallToolResult result, String name) {
198224
*/
199225
@Override
200226
public void close() throws IOException {
227+
ensureNotClosed();
201228
this.closed = true;
202229
this.mcpSyncClient.closeGracefully();
203-
log.info("MCP client closed.");
230+
log.info("MCP client [{}] closed.", clientId);
204231
}
205232

206233
/**
@@ -230,4 +257,27 @@ private Tool convertToFelTool(McpSchema.Tool mcpTool) {
230257

231258
return tool;
232259
}
260+
261+
/**
262+
* Ensures the MCP client is not closed.
263+
*
264+
* @throws IllegalStateException if the client is closed.
265+
*/
266+
private void ensureNotClosed() {
267+
if (this.closed) {
268+
throw new IllegalStateException("The MCP client is closed.");
269+
}
270+
}
271+
272+
/**
273+
* Ensures the MCP client is ready for operations (not closed and initialized).
274+
*
275+
* @throws IllegalStateException if the client is closed or not initialized.
276+
*/
277+
private void ensureReady() {
278+
ensureNotClosed();
279+
if (!this.initialized) {
280+
throw new IllegalStateException("MCP client is not initialized. Please wait a moment.");
281+
}
282+
}
233283
}

framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/McpClientMessageHandler.java

Lines changed: 0 additions & 59 deletions
This file was deleted.

0 commit comments

Comments
 (0)