Skip to content
This repository was archived by the owner on Feb 14, 2025. It is now read-only.

Commit 9081b63

Browse files
chemicLtzolov
authored andcommitted
refactor: improve process handling and graceful shutdown
- Enhance process handling in StdioServerTransport with proper cleanup - Refactor graceful shutdown across McpSession implementations - Add proper JSON serialization annotations for Role and LoggingLevel enums - Fix typo in setInboundMessageHandler method name - Improve test coverage using StepVerifier - Add documentation and cleanup code
1 parent 912e815 commit 9081b63

File tree

10 files changed

+109
-114
lines changed

10 files changed

+109
-114
lines changed

spring-ai-mcp-core/src/main/java/org/springframework/ai/mcp/client/McpSyncClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public McpSyncClient(McpAsyncClient delegate) {
4444

4545
@Override
4646
public void close() {
47-
this.delegate.closeGracefully(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)).block();
47+
this.delegate.closeGracefully().block(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
4848
}
4949

5050
/**

spring-ai-mcp-core/src/main/java/org/springframework/ai/mcp/client/stdio/StdioServerTransport.java

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.ArrayList;
2323
import java.util.HashMap;
2424
import java.util.List;
25+
import java.util.concurrent.CompletableFuture;
2526
import java.util.concurrent.Executors;
2627

2728
import com.fasterxml.jackson.core.type.TypeReference;
@@ -87,6 +88,11 @@ public StdioServerTransport(ServerParameters params, ObjectMapper objectMapper)
8788

8889
@Override
8990
public void start() {
91+
// Let's kick off the abstraction layer that will consume the logical messages
92+
// pushed via sinks. The code that follows is actually feeding the sinks with
93+
// data.
94+
super.start();
95+
9096
// Prepare command and environment
9197
List<String> fullCommand = new ArrayList<>();
9298
fullCommand.add(params.getCommand());
@@ -234,39 +240,43 @@ public Mono<Void> closeGracefully() {
234240

235241
this.isRunning = false;
236242

237-
return Mono.whenDelayError(Mono.fromRunnable(() -> {
243+
return Mono.fromFuture(() -> {
244+
System.out.println("Sending TERM to process");
245+
if (this.process != null) {
246+
this.process.destroy();
247+
return process.onExit();
248+
}
249+
else {
250+
return CompletableFuture.failedFuture(new RuntimeException("Process not started"));
251+
}
252+
}).doOnNext(process -> {
253+
if (process.exitValue() != 0) {
254+
System.out.println("Process terminated with code " + process.exitValue());
255+
}
256+
}).then(Mono.whenDelayError(Mono.fromRunnable(() -> {
238257
try {
239258
this.processErrorReader.close();
240259
}
241260
catch (IOException e) {
242261
throw new RuntimeException(e);
243262
}
244-
}).subscribeOn(errorScheduler), Mono.fromRunnable(() -> {
263+
}), Mono.fromRunnable(() -> {
245264
try {
246265
this.processReader.close();
247266
}
248267
catch (IOException e) {
249268
throw new RuntimeException(e);
250269
}
251-
}).subscribeOn(inboundScheduler), Mono.fromRunnable(() -> {
270+
}), Mono.fromRunnable(() -> {
252271
try {
253272
this.processWriter.close();
254273
}
255274
catch (IOException e) {
256275
throw new RuntimeException(e);
257276
}
258-
}).subscribeOn(outboundScheduler))
277+
})))
259278
.then(Mono.whenDelayError(inboundScheduler.disposeGracefully(), outboundScheduler.disposeGracefully(),
260279
errorScheduler.disposeGracefully()))
261-
.then(Mono.fromRunnable(() -> {
262-
if (this.process != null) {
263-
var process = this.process.destroyForcibly();
264-
if (process.exitValue() != 0) {
265-
throw new RuntimeException("Failed to destroy process");
266-
}
267-
}
268-
}))
269-
.then()
270280
.subscribeOn(Schedulers.boundedElastic());
271281
}
272282

spring-ai-mcp-core/src/main/java/org/springframework/ai/mcp/spec/AbstractMcpTransport.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
* @author Christian Tzolov
3030
* @author Dariusz Jędrzejczyk
3131
*/
32+
// TODO: The API is not finalized. Implementing SSE will dictate a more concrete
33+
// structure and relationship between the base and the actual implementation including
34+
// proper encapsulation and state management.
3235
public abstract class AbstractMcpTransport implements McpTransport {
3336

3437
protected final ObjectMapper objectMapper;
@@ -53,17 +56,9 @@ public AbstractMcpTransport(ObjectMapper objectMapper) {
5356
Assert.notNull(objectMapper, "ObjectMapper must not be null");
5457
this.objectMapper = objectMapper;
5558

56-
// TODO: consider the effects of buffering here -> the inter-process pipes are
57-
// independent and the notifications can flood the client/server.
58-
// Potentially, the interest in reading could be communicated from one party
59-
// to the other so the Blocking IO Threads can pause consuming the stream
60-
// buffers when there is no expectation for reading.
61-
6259
this.errorSink = Sinks.many().unicast().onBackpressureBuffer();
6360
this.inboundSink = Sinks.many().unicast().onBackpressureBuffer();
6461
this.outboundSink = Sinks.many().unicast().onBackpressureBuffer();
65-
66-
this.handleIncomingMessages();
6762
}
6863

6964
public ObjectMapper getObjectMapper() {
@@ -77,7 +72,6 @@ private void handleIncomingMessages() {
7772
private void handleIncomingErrors() {
7873
this.errorSink.asFlux().subscribe(e -> {
7974
this.errorHandler.accept(e);
80-
// System.err.println(e); TODO: log the error
8175
});
8276
}
8377

@@ -93,7 +87,7 @@ protected Sinks.Many<String> getErrorSink() {
9387
return errorSink;
9488
}
9589

96-
public void setInboudMessageHandler(Consumer<JSONRPCMessage> inboundMessageHandler) {
90+
public void setInboundMessageHandler(Consumer<JSONRPCMessage> inboundMessageHandler) {
9791
this.inboundMessageHandler = inboundMessageHandler;
9892
}
9993

spring-ai-mcp-core/src/main/java/org/springframework/ai/mcp/spec/DefaultMcpSession.java

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public DefaultMcpSession(Duration requestTimeout, ObjectMapper objectMapper, Mcp
5454
this.objectMapper = objectMapper;
5555
this.transport = transport;
5656

57-
this.transport.setInboudMessageHandler(message -> {
57+
this.transport.setInboundMessageHandler(message -> {
5858

5959
if (message instanceof McpSchema.JSONRPCResponse response) {
6060
var sink = pendingResponses.remove(response.id());
@@ -80,33 +80,27 @@ else if (message instanceof McpSchema.JSONRPCNotification notification) {
8080

8181
@Override
8282
public <T> Mono<T> sendRequest(String method, Object requestParams, TypeReference<T> typeRef) {
83+
// TODO: UUID API is blocking. Consider non-blocking alternatives to generate
84+
// the ID.
8385
String requestId = UUID.randomUUID().toString();
8486

8587
return Mono.<McpSchema.JSONRPCResponse>create(sink -> {
8688
this.pendingResponses.put(requestId, sink);
8789
McpSchema.JSONRPCRequest jsonrpcRequest = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, method,
8890
requestId, requestParams);
89-
try {
90-
// TODO: This is non-blocking, but it's actually a synchronous call,
91-
// perhaps there's no need to make it return Mono?
92-
this.transport.sendMessage(jsonrpcRequest)
93-
// TODO: It's most efficient to create a dedicated
94-
// Subscriber here
95-
.subscribe(v -> {
96-
}, e -> {
97-
this.pendingResponses.remove(requestId);
98-
sink.error(e);
99-
});
100-
}
101-
catch (Exception e) {
102-
sink.error(e);
103-
}
91+
this.transport.sendMessage(jsonrpcRequest)
92+
// TODO: It's most efficient to create a dedicated Subscriber here
93+
.subscribe(v -> {
94+
}, e -> {
95+
this.pendingResponses.remove(requestId);
96+
sink.error(e);
97+
});
10498
}).timeout(this.requestTimeout).handle((jsonRpcResponse, s) -> {
10599
if (jsonRpcResponse.error() != null) {
106100
s.error(new McpError(jsonRpcResponse.error()));
107101
}
108102
else {
109-
if (typeRef.getType().getTypeName().equals("java.lang.Void")) {
103+
if (typeRef.getType().equals(Void.class)) {
110104
s.complete();
111105
}
112106
else {
@@ -120,19 +114,17 @@ public <T> Mono<T> sendRequest(String method, Object requestParams, TypeReferenc
120114
public Mono<Void> sendNotification(String method, Map<String, Object> params) {
121115
McpSchema.JSONRPCNotification jsonrpcNotification = new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION,
122116
method, params);
123-
try {
124-
this.transport.sendMessage(jsonrpcNotification);
125-
}
126-
catch (Exception e) {
127-
return Mono.error(new McpError(e));
128-
}
129-
return Mono.empty();
117+
return this.transport.sendMessage(jsonrpcNotification);
118+
}
119+
120+
@Override
121+
public Mono<Void> closeGracefully() {
122+
return transport.closeGracefully();
130123
}
131124

132125
@Override
133-
public Mono<Void> closeGracefully(Duration timeout) {
134-
// TODO handle the timeout in transport
135-
return Mono.fromRunnable(this.transport::close);
126+
public void close() {
127+
transport.close();
136128
}
137129

138130
}

spring-ai-mcp-core/src/main/java/org/springframework/ai/mcp/spec/McpSchema.java

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -164,17 +164,24 @@ public record Implementation(// @formatter:off
164164
} // @formatter:on
165165

166166
// Existing Enums and Base Types (from previous implementation)
167-
public enum Role {
167+
public enum Role {// @formatter:off
168168

169-
USER, ASSISTANT
170-
171-
}
169+
@JsonProperty("user") USER,
170+
@JsonProperty("assistant") ASSISTANT
171+
}// @formatter:on
172172

173-
public enum LoggingLevel {
173+
public enum LoggingLevel {// @formatter:off
174174

175-
DEBUG, INFO, NOTICE, WARNING, ERROR, CRITICAL, ALERT, EMERGENCY
175+
@JsonProperty("debug") DEBUG,
176+
@JsonProperty("info") INFO,
177+
@JsonProperty("notice") NOTICE,
178+
@JsonProperty("warning") WARNING,
179+
@JsonProperty("error") ERROR,
180+
@JsonProperty("critical") CRITICAL,
181+
@JsonProperty("alert") ALERT,
182+
@JsonProperty("emergency") EMERGENCY
176183

177-
}
184+
} // @formatter:on
178185

179186
// ---------------------------
180187
// Resource Interfaces
@@ -448,20 +455,22 @@ public record CreateMessageRequest(List<SamplingMessage> messages, ModelPreferen
448455
String systemPrompt, ContextInclusionStrategy includeContext, Double temperature, int maxTokens,
449456
List<String> stopSequences, Map<String, Object> metadata) implements Request {
450457

451-
public enum ContextInclusionStrategy {
452-
453-
NONE, THIS_SERVER, ALL_SERVERS
454-
455-
}
458+
public enum ContextInclusionStrategy {// @formatter:off
459+
@JsonProperty("none") NONE,
460+
@JsonProperty("this_server") THIS_SERVER,
461+
@JsonProperty("all_server") ALL_SERVERS
462+
} // @formatter:on
456463
}
457464

458465
@JsonInclude(JsonInclude.Include.NON_ABSENT)
459466
public record CreateMessageResult(Role role, Content content, String model, StopReason stopReason) {
460467
public enum StopReason {
461468

462-
END_TURN, STOP_SEQUENCE, MAX_TOKENS
463-
464-
}
469+
// @formatter:off
470+
@JsonProperty("end_turn") END_TURN,
471+
@JsonProperty("stop_sequence") STOP_SEQUENCE,
472+
@JsonProperty("max_tokens") MAX_TOKENS
473+
} // @formatter:on
465474
}
466475

467476
// ---------------------------

spring-ai-mcp-core/src/main/java/org/springframework/ai/mcp/spec/McpSession.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,14 @@ default Mono<Void> sendNotification(String method) {
8181
Mono<Void> sendNotification(String method, Map<String, Object> params);
8282

8383
/**
84-
* Gracefully closes the session with a specified timeout.
85-
*
86-
* <p>
87-
* This method attempts to close the session cleanly, waiting for any pending
88-
* operations to complete up to the specified timeout duration.
89-
* </p>
90-
* @param timeout the maximum duration to wait for the session to close
91-
* @return a Mono that completes when the session has been closed
84+
* Closes the session and releases any associated resources asynchronously.
85+
* @return a {@link Mono<Void>} that completes when the session has been closed.
86+
*/
87+
Mono<Void> closeGracefully();
88+
89+
/**
90+
* Closes the session and releases any associated resources.
9291
*/
93-
Mono<Void> closeGracefully(Duration timeout);
92+
void close();
9493

9594
}

spring-ai-mcp-core/src/main/java/org/springframework/ai/mcp/spec/McpTransport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ default void close() {
9191
* @param inboundMessageHandler a consumer that processes incoming
9292
* {@link JSONRPCMessage}s
9393
*/
94-
void setInboudMessageHandler(Consumer<JSONRPCMessage> inboundMessageHandler);
94+
void setInboundMessageHandler(Consumer<JSONRPCMessage> inboundMessageHandler);
9595

9696
/**
9797
* Sets the handler for processing transport-level errors.

0 commit comments

Comments
 (0)