Skip to content
This repository was archived by the owner on Feb 14, 2025. It is now read-only.

Commit 7abf118

Browse files
committed
refactor(transport): improve MCP transport layer design
- Rename DefaultMcpTransport to AbstractMcpTransport and make it abstract - Add graceful shutdown support via closeGracefully() method - Enhance McpTransport interface with default close() implementation - Improve documentation across transport layer classes
1 parent 1b6d661 commit 7abf118

File tree

7 files changed

+339
-249
lines changed

7 files changed

+339
-249
lines changed

mcp/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
<properties>
2525
<java.version>21</java.version>
2626
<maven.compiler.release>21</maven.compiler.release>
27-
<!-- <spring-ai.version>1.0.0-M4</spring-ai.version> -->
2827
</properties>
2928
<dependencies>
3029
<dependency>
Lines changed: 84 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
/*
2-
* Copyright 2024 - 2024 the original author or authors.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* https://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
2+
* Copyright 2024 - 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
1616
package spring.ai.experimental.mcp.client;
1717

1818
import java.time.Duration;
@@ -21,41 +21,85 @@
2121
import spring.ai.experimental.mcp.spec.McpTransport;
2222

2323
/**
24-
* The MCP client is the main entry point for interacting with the Model Context Protocol
25-
* (MCP) server.
24+
* Factory class providing static methods for creating Model Context Protocol (MCP) clients.
25+
* This class serves as the main entry point for establishing connections with MCP servers,
26+
* offering both synchronous and asynchronous client implementations.
27+
*
28+
* <p>The class provides factory methods to create either:
29+
* <ul>
30+
* <li>{@link McpAsyncClient} for non-blocking operations
31+
* <li>{@link McpSyncClient} for blocking operations
32+
* </ul>
33+
*
34+
* <p>Each client type can be instantiated with default settings or with custom configuration
35+
* including request timeout and JSON object mapping.
36+
*
37+
* <p>Future implementations will introduce a builder pattern for more flexible client configuration.
2638
*
2739
* @author Christian Tzolov
2840
* @author Dariusz Jędrzejczyk
2941
*/
3042
public class McpClient {
3143

32-
private McpClient() {
33-
}
34-
35-
// TODO: introduce a builder like:
36-
// McpClient.using(transport)
37-
// .withRequestTimeout(Duration.ofSeconds(5))
38-
// .withObjectMapper(objectMapper); <- or even a more sophisticated
39-
// JSONtoPOJOCodec type
40-
// .sync();
41-
42-
public static McpAsyncClient async(McpTransport transport) {
43-
return new McpAsyncClient(transport);
44-
}
44+
/**
45+
* Private constructor to prevent instantiation as this is a utility class
46+
* containing only static factory methods.
47+
*/
48+
private McpClient() {
49+
}
4550

46-
public static McpAsyncClient async(McpTransport transport, Duration requestTimeout,
47-
ObjectMapper objectMapper) {
48-
return new McpAsyncClient(transport, requestTimeout, objectMapper);
49-
}
51+
// TODO: introduce a builder like:
52+
// McpClient.using(transport)
53+
// .withRequestTimeout(Duration.ofSeconds(5))
54+
// .withObjectMapper(objectMapper); <- or even a more sophisticated
55+
// JSONtoPOJOCodec type
56+
// .sync();
5057

51-
public static McpSyncClient sync(McpTransport transport) {
52-
return new McpSyncClient(async(transport));
53-
}
58+
/**
59+
* Creates an asynchronous MCP client with default configuration.
60+
*
61+
* @param transport The transport layer implementation for MCP communication
62+
* @return A new instance of {@link McpAsyncClient}
63+
*/
64+
public static McpAsyncClient async(McpTransport transport) {
65+
return new McpAsyncClient(transport);
66+
}
5467

55-
public static McpSyncClient sync(McpTransport transport, Duration requestTimeout,
56-
ObjectMapper objectMapper) {
57-
return new McpSyncClient(async(transport, requestTimeout, objectMapper));
58-
}
68+
/**
69+
* Creates an asynchronous MCP client with custom configuration.
70+
*
71+
* @param transport The transport layer implementation for MCP communication
72+
* @param requestTimeout The duration to wait before timing out requests
73+
* @param objectMapper The Jackson object mapper for JSON serialization/deserialization
74+
* @return A new instance of {@link McpAsyncClient}
75+
*/
76+
public static McpAsyncClient async(McpTransport transport, Duration requestTimeout,
77+
ObjectMapper objectMapper) {
78+
return new McpAsyncClient(transport, requestTimeout, objectMapper);
79+
}
5980

81+
/**
82+
* Creates a synchronous MCP client with default configuration.
83+
* This method wraps an asynchronous client to provide synchronous operations.
84+
*
85+
* @param transport The transport layer implementation for MCP communication
86+
* @return A new instance of {@link McpSyncClient}
87+
*/
88+
public static McpSyncClient sync(McpTransport transport) {
89+
return new McpSyncClient(async(transport));
90+
}
6091

92+
/**
93+
* Creates a synchronous MCP client with custom configuration.
94+
* This method wraps an asynchronous client to provide synchronous operations.
95+
*
96+
* @param transport The transport layer implementation for MCP communication
97+
* @param requestTimeout The duration to wait before timing out requests
98+
* @param objectMapper The Jackson object mapper for JSON serialization/deserialization
99+
* @return A new instance of {@link McpSyncClient}
100+
*/
101+
public static McpSyncClient sync(McpTransport transport, Duration requestTimeout,
102+
ObjectMapper objectMapper) {
103+
return new McpSyncClient(async(transport, requestTimeout, objectMapper));
104+
}
61105
}

mcp/src/main/java/spring/ai/experimental/mcp/client/stdio/StdioServerTransport.java

Lines changed: 40 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@
1010

1111
import com.fasterxml.jackson.core.type.TypeReference;
1212
import com.fasterxml.jackson.databind.ObjectMapper;
13+
import reactor.core.publisher.Mono;
1314
import reactor.core.scheduler.Scheduler;
1415
import reactor.core.scheduler.Schedulers;
1516
import spring.ai.experimental.mcp.client.util.Assert;
16-
import spring.ai.experimental.mcp.spec.DefaultMcpTransport;
17+
import spring.ai.experimental.mcp.spec.AbstractMcpTransport;
1718
import spring.ai.experimental.mcp.spec.McpSchema.JSONRPCMessage;
1819
import spring.ai.experimental.mcp.spec.McpSchema.JSONRPCNotification;
1920
import spring.ai.experimental.mcp.spec.McpSchema.JSONRPCRequest;
@@ -25,7 +26,7 @@
2526
* @author Christian Tzolov
2627
* @author Dariusz Jędrzejczyk
2728
*/
28-
public class StdioServerTransport extends DefaultMcpTransport {
29+
public class StdioServerTransport extends AbstractMcpTransport {
2930

3031
private final static TypeReference<HashMap<String, Object>> MAP_TYPE_REF = new TypeReference<HashMap<String, Object>>() {
3132
};
@@ -194,47 +195,46 @@ private void startOutboundProcessing() {
194195
.subscribe();
195196
}
196197

197-
// TODO: provide a non-blocking variant with graceful option
198-
public void stop() {
199-
this.outboundScheduler.dispose();
200-
this.inboundScheduler.dispose();
201-
this.errorScheduler.dispose();
202-
203-
// Destroy process
204-
if (this.process != null) {
205-
this.process.destroyForcibly();
206-
}
207-
}
208-
209198
@Override
210-
public void close() {
199+
public Mono<Void> closeGracefully() {
211200

212-
this.stop();
201+
this.isRunning = false;
213202

214-
super.close(); // Do we need this?
215-
216-
// Close resources
217-
if (this.processReader != null) {
218-
try {
219-
this.processReader.close();
220-
} catch (IOException e) {
221-
e.printStackTrace();
222-
}
223-
}
224-
if (this.processWriter != null) {
225-
try {
226-
this.processWriter.close();
227-
} catch (IOException e) {
228-
e.printStackTrace();
229-
}
230-
}
231-
if (this.processErrorReader != null) {
232-
try {
233-
this.processErrorReader.close();
234-
} catch (IOException e) {
235-
e.printStackTrace();
236-
}
237-
}
203+
return Mono.whenDelayError(
204+
Mono.fromRunnable(() -> {
205+
try {
206+
this.processErrorReader.close();
207+
} catch (IOException e) {
208+
throw new RuntimeException(e);
209+
}
210+
}).subscribeOn(errorScheduler),
211+
Mono.fromRunnable(() -> {
212+
try {
213+
this.processReader.close();
214+
} catch (IOException e) {
215+
throw new RuntimeException(e);
216+
}
217+
}).subscribeOn(inboundScheduler),
218+
Mono.fromRunnable(() -> {
219+
try {
220+
this.processWriter.close();
221+
} catch (IOException e) {
222+
throw new RuntimeException(e);
223+
}
224+
}).subscribeOn(outboundScheduler))
225+
.then(Mono.whenDelayError(inboundScheduler.disposeGracefully(),
226+
outboundScheduler.disposeGracefully(),
227+
errorScheduler.disposeGracefully()))
228+
.then(Mono.fromRunnable(() -> {
229+
if (this.process != null) {
230+
var process = this.process.destroyForcibly();
231+
if (process.exitValue() != 0) {
232+
throw new RuntimeException("Failed to destroy process");
233+
}
234+
}
235+
}))
236+
.then()
237+
.subscribeOn(Schedulers.boundedElastic());
238238
}
239239

240240
}

mcp/src/main/java/spring/ai/experimental/mcp/spec/DefaultMcpTransport.java renamed to mcp/src/main/java/spring/ai/experimental/mcp/spec/AbstractMcpTransport.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
* @author Christian Tzolov
2828
* @author Dariusz Jędrzejczyk
2929
*/
30-
public class DefaultMcpTransport implements McpTransport {
30+
public abstract class AbstractMcpTransport implements McpTransport {
3131

3232
protected final ObjectMapper objectMapper;
3333

@@ -39,11 +39,11 @@ public class DefaultMcpTransport implements McpTransport {
3939
.println("Received message: " + message);
4040
private Consumer<String> errorHandler = error -> System.err.println("Received error: " + error);
4141

42-
public DefaultMcpTransport() {
42+
public AbstractMcpTransport() {
4343
this(new ObjectMapper());
4444
}
4545

46-
public DefaultMcpTransport(ObjectMapper objectMapper) {
46+
public AbstractMcpTransport(ObjectMapper objectMapper) {
4747

4848
Assert.notNull(objectMapper, "ObjectMapper must not be null");
4949
this.objectMapper = objectMapper;
@@ -73,7 +73,7 @@ private void handleIncomingMessages() {
7373
private void handleIncomingErrors() {
7474
this.errorSink.asFlux().subscribe(e -> {
7575
this.errorHandler.accept(e);
76-
System.err.println(e);
76+
// System.err.println(e); TODO: log the error
7777
});
7878
}
7979

@@ -103,12 +103,6 @@ public void start() {
103103
this.handleIncomingMessages();
104104
}
105105

106-
// Close the transport
107-
@Override
108-
public void close() {
109-
// this.onClose();
110-
}
111-
112106
@Override
113107
public Mono<Void> sendMessage(JSONRPCMessage message) {
114108
if (this.outboundSink.tryEmitNext(message).isSuccess()) {

mcp/src/main/java/spring/ai/experimental/mcp/spec/McpSession.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,6 @@ public Mono<Void> sendNotification(String method, Map<String, Object> params) {
111111
McpSchema.JSONRPCNotification jsonrpcNotification = new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION,
112112
method, params);
113113
try {
114-
// TODO: make it non-blocking
115114
this.transport.sendMessage(jsonrpcNotification);
116115
} catch (Exception e) {
117116
return Mono.error(new McpError(e));

0 commit comments

Comments
 (0)