Skip to content

Commit 05d70eb

Browse files
committed
增加SSE客户端支持
1 parent cc1be9c commit 05d70eb

File tree

4 files changed

+60
-15
lines changed

4 files changed

+60
-15
lines changed

framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@
66

77
package modelengine.fel.tool.mcp.client.support;
88

9+
import com.fasterxml.jackson.databind.ObjectMapper;
10+
11+
import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport;
12+
import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport;
13+
import io.modelcontextprotocol.json.jackson.JacksonMcpJsonMapper;
914
import modelengine.fel.tool.mcp.client.McpClient;
1015
import modelengine.fel.tool.mcp.client.McpClientFactory;
1116
import modelengine.fitframework.annotation.Component;
@@ -32,7 +37,26 @@ public DefaultMcpClientFactory(@Value("${mcp.client.request.timeout-seconds}") i
3237
}
3338

3439
@Override
40+
public McpClient createStreamable(String baseUri, String sseEndpoint) {
41+
HttpClientStreamableHttpTransport transport = HttpClientStreamableHttpTransport.builder(baseUri)
42+
.jsonMapper(new JacksonMcpJsonMapper(new ObjectMapper()))
43+
.endpoint(sseEndpoint)
44+
.build();
45+
return new DefaultMcpStreamableClient(baseUri, sseEndpoint, this.requestTimeoutSeconds, transport);
46+
}
47+
48+
@Override
49+
public McpClient createSse(String baseUri, String sseEndpoint) {
50+
HttpClientSseClientTransport transport = HttpClientSseClientTransport.builder(baseUri)
51+
.jsonMapper(new JacksonMcpJsonMapper(new ObjectMapper()))
52+
.sseEndpoint(sseEndpoint)
53+
.build();
54+
return new DefaultMcpStreamableClient(baseUri, sseEndpoint, this.requestTimeoutSeconds, transport);
55+
}
56+
57+
@Override
58+
@Deprecated
3559
public McpClient create(String baseUri, String sseEndpoint) {
36-
return new DefaultMcpStreamableClient(baseUri, sseEndpoint, requestTimeoutSeconds);
60+
return this.createStreamable(baseUri, sseEndpoint);
3761
}
3862
}

framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,8 @@
1111
import com.fasterxml.jackson.databind.ObjectMapper;
1212

1313
import io.modelcontextprotocol.client.McpSyncClient;
14-
import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport;
15-
import io.modelcontextprotocol.json.jackson.JacksonMcpJsonMapper;
1614
import io.modelcontextprotocol.json.schema.jackson.DefaultJsonSchemaValidator;
15+
import io.modelcontextprotocol.spec.McpClientTransport;
1716
import io.modelcontextprotocol.spec.McpSchema;
1817
import modelengine.fel.tool.mcp.client.McpClient;
1918
import modelengine.fel.tool.mcp.entity.Tool;
@@ -54,23 +53,18 @@ public class DefaultMcpStreamableClient implements McpClient {
5453
* @param sseEndpoint The endpoint for the Server-Sent Events (SSE) connection.
5554
* @param requestTimeoutSeconds The timeout duration of requests. Units: seconds.
5655
*/
57-
public DefaultMcpStreamableClient(String baseUri, String sseEndpoint, int requestTimeoutSeconds) {
56+
public DefaultMcpStreamableClient(String baseUri, String sseEndpoint, int requestTimeoutSeconds,
57+
McpClientTransport transport) {
5858
this.clientId = UuidUtils.randomUuidString();
5959
notBlank(baseUri, "The MCP server base URI cannot be blank.");
6060
notBlank(sseEndpoint, "The MCP server SSE endpoint cannot be blank.");
6161
log.info("Creating MCP client. [clientId={}, baseUri={}]", this.clientId, baseUri);
62-
ObjectMapper mapper = new ObjectMapper();
63-
HttpClientStreamableHttpTransport transport = HttpClientStreamableHttpTransport.builder(baseUri)
64-
.jsonMapper(new JacksonMcpJsonMapper(mapper))
65-
.endpoint(sseEndpoint)
66-
.build();
67-
6862
this.logHandler = new DefaultMcpClientLogHandler(this.clientId);
6963
this.mcpSyncClient = io.modelcontextprotocol.client.McpClient.sync(transport)
7064
.requestTimeout(Duration.ofSeconds(requestTimeoutSeconds))
7165
.capabilities(McpSchema.ClientCapabilities.builder().build())
7266
.loggingConsumer(this.logHandler::handleLoggingMessage)
73-
.jsonSchemaValidator(new DefaultJsonSchemaValidator(mapper))
67+
.jsonSchemaValidator(new DefaultJsonSchemaValidator(new ObjectMapper()))
7468
.build();
7569
}
7670

framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,17 @@ public TestController(McpClientFactory mcpClientFactory) {
5151
* @return A string indicating that the initialization was successful.
5252
*/
5353
@PostMapping(path = "/initialize")
54-
public String initialize(@RequestQuery(name = "baseUri") String baseUri,
54+
public String initializeStreamable(@RequestQuery(name = "baseUri") String baseUri,
5555
@RequestQuery(name = "sseEndpoint") String sseEndpoint) {
56-
this.client = this.mcpClientFactory.create(baseUri, sseEndpoint);
56+
this.client = this.mcpClientFactory.createStreamable(baseUri, sseEndpoint);
57+
this.client.initialize();
58+
return "Initialized";
59+
}
60+
61+
@PostMapping(path = "/initialize-sse")
62+
public String initializeSse(@RequestQuery(name = "baseUri") String baseUri,
63+
@RequestQuery(name = "sseEndpoint") String sseEndpoint) {
64+
this.client = this.mcpClientFactory.createSse(baseUri, sseEndpoint);
5765
this.client.initialize();
5866
return "Initialized";
5967
}

framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClientFactory.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,30 @@
1616
*/
1717
public interface McpClientFactory {
1818
/**
19-
* Creates a {@link McpClient} instance.
19+
* Creates a {@link McpClient} instance with streamable HTTP transport.
2020
*
2121
* @param baseUri The base URI of the MCP server.
2222
* @param sseEndpoint The SSE endpoint of the MCP server.
2323
* @return The connected {@link McpClient} instance.
2424
*/
25-
McpClient create(String baseUri, String sseEndpoint);
25+
public McpClient createStreamable(String baseUri, String sseEndpoint);
26+
27+
/**
28+
* Creates a {@link McpClient} instance with SSE transport.
29+
*
30+
* @param baseUri The base URI of the MCP server.
31+
* @param sseEndpoint The SSE endpoint of the MCP server.
32+
* @return The connected {@link McpClient} instance.
33+
*/
34+
public McpClient createSse(String baseUri, String sseEndpoint);
35+
36+
/**
37+
* Creates a {@link McpClient} instance with streamable HTTP transport.
38+
*
39+
* @param baseUri The base URI of the MCP server.
40+
* @param sseEndpoint The SSE endpoint of the MCP server.
41+
* @return The connected {@link McpClient} instance.
42+
*/
43+
@Deprecated
44+
public McpClient create(String baseUri, String sseEndpoint);
2645
}

0 commit comments

Comments
 (0)