Skip to content

Commit 7a1c520

Browse files
committed
feat: implement MCP-compliant keep-alive functionality for server transports
- Add KeepAliveScheduler utility class for configurable periodic session pings - Integrate keep-alive support in WebFlux, WebMVC, and HttpServlet SSE transport providers - Add keepAliveInterval configuration option to all transport provider builders - Deprecate existing constructors in favor of builder pattern with enhanced configuration - Update graceful shutdown to properly clean up keep-alive schedulers - Add unit tests for KeepAliveScheduler functionality Implements MCP specification recommendations for connection health detection: - Configurable ping frequency to suit different network environments - Optional keep-alive (disabled by default) to avoid excessive network overhead - Proper resource cleanup to prevent connection leaks https://modelcontextprotocol.io/specification/2025-06-18/basic/utilities/ping#implementation-considerations Signed-off-by: Christian Tzolov <[email protected]>
1 parent bde1b6b commit 7a1c520

File tree

5 files changed

+809
-17
lines changed

5 files changed

+809
-17
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransportProvider.java

Lines changed: 72 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.modelcontextprotocol.server.transport;
22

33
import java.io.IOException;
4+
import java.time.Duration;
45
import java.util.concurrent.ConcurrentHashMap;
56

67
import com.fasterxml.jackson.core.type.TypeReference;
@@ -11,6 +12,8 @@
1112
import io.modelcontextprotocol.spec.McpServerTransport;
1213
import io.modelcontextprotocol.spec.McpServerTransportProvider;
1314
import io.modelcontextprotocol.util.Assert;
15+
import io.modelcontextprotocol.util.KeepAliveScheduler;
16+
1417
import org.slf4j.Logger;
1518
import org.slf4j.LoggerFactory;
1619
import reactor.core.Exceptions;
@@ -109,6 +112,12 @@ public class WebFluxSseServerTransportProvider implements McpServerTransportProv
109112
*/
110113
private volatile boolean isClosing = false;
111114

115+
/**
116+
* Keep-alive scheduler for managing session pings. Activated if keepAliveInterval is
117+
* set. Disabled by default.
118+
*/
119+
private KeepAliveScheduler keepAliveScheduler;
120+
112121
/**
113122
* Constructs a new WebFlux SSE server transport provider instance with the default
114123
* SSE endpoint.
@@ -118,7 +127,10 @@ public class WebFluxSseServerTransportProvider implements McpServerTransportProv
118127
* messages. This endpoint will be communicated to clients during SSE connection
119128
* setup. Must not be null.
120129
* @throws IllegalArgumentException if either parameter is null
130+
* @deprecated Use the builder {@link #builder()} instead for better configuration
131+
* options.
121132
*/
133+
@Deprecated
122134
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint) {
123135
this(objectMapper, messageEndpoint, DEFAULT_SSE_ENDPOINT);
124136
}
@@ -131,7 +143,10 @@ public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messa
131143
* messages. This endpoint will be communicated to clients during SSE connection
132144
* setup. Must not be null.
133145
* @throws IllegalArgumentException if either parameter is null
146+
* @deprecated Use the builder {@link #builder()} instead for better configuration
147+
* options.
134148
*/
149+
@Deprecated
135150
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint, String sseEndpoint) {
136151
this(objectMapper, DEFAULT_BASE_URL, messageEndpoint, sseEndpoint);
137152
}
@@ -145,9 +160,32 @@ public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messa
145160
* messages. This endpoint will be communicated to clients during SSE connection
146161
* setup. Must not be null.
147162
* @throws IllegalArgumentException if either parameter is null
163+
* @deprecated Use the builder {@link #builder()} instead for better configuration
164+
* options.
148165
*/
166+
@Deprecated
149167
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String baseUrl, String messageEndpoint,
150168
String sseEndpoint) {
169+
this(objectMapper, baseUrl, messageEndpoint, sseEndpoint, null);
170+
}
171+
172+
/**
173+
* Constructs a new WebFlux SSE server transport provider instance.
174+
* @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
175+
* of MCP messages. Must not be null.
176+
* @param baseUrl webflux message base path
177+
* @param messageEndpoint The endpoint URI where clients should send their JSON-RPC
178+
* messages. This endpoint will be communicated to clients during SSE connection
179+
* setup. Must not be null.
180+
* @param sseEndpoint The SSE endpoint path. Must not be null.
181+
* @param keepAliveInterval The interval for sending keep-alive pings to clients.
182+
* @throws IllegalArgumentException if either parameter is null
183+
* @deprecated Use the builder {@link #builder()} instead for better configuration
184+
* options.
185+
*/
186+
@Deprecated
187+
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String baseUrl, String messageEndpoint,
188+
String sseEndpoint, Duration keepAliveInterval) {
151189
Assert.notNull(objectMapper, "ObjectMapper must not be null");
152190
Assert.notNull(baseUrl, "Message base path must not be null");
153191
Assert.notNull(messageEndpoint, "Message endpoint must not be null");
@@ -161,6 +199,17 @@ public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String baseU
161199
.GET(this.sseEndpoint, this::handleSseConnection)
162200
.POST(this.messageEndpoint, this::handleMessage)
163201
.build();
202+
203+
if (keepAliveInterval != null) {
204+
205+
this.keepAliveScheduler = KeepAliveScheduler
206+
.builder(() -> (isClosing) ? Flux.empty() : Flux.fromIterable(sessions.values()))
207+
.initialDelay(keepAliveInterval)
208+
.interval(keepAliveInterval)
209+
.build();
210+
211+
this.keepAliveScheduler.start();
212+
}
164213
}
165214

166215
@Override
@@ -209,23 +258,21 @@ public Mono<Void> notifyClients(String method, Object params) {
209258
/**
210259
* Initiates a graceful shutdown of all the sessions. This method ensures all active
211260
* sessions are properly closed and cleaned up.
212-
*
213-
* <p>
214-
* The shutdown process:
215-
* <ul>
216-
* <li>Marks the transport as closing to prevent new connections</li>
217-
* <li>Closes each active session</li>
218-
* <li>Removes closed sessions from the sessions map</li>
219-
* <li>Times out after 5 seconds if shutdown takes too long</li>
220-
* </ul>
221261
* @return A Mono that completes when all sessions have been closed
222262
*/
223263
@Override
224264
public Mono<Void> closeGracefully() {
225265
return Flux.fromIterable(sessions.values())
226266
.doFirst(() -> logger.debug("Initiating graceful shutdown with {} active sessions", sessions.size()))
227267
.flatMap(McpServerSession::closeGracefully)
228-
.then();
268+
.then()
269+
.doOnSuccess(v -> {
270+
logger.debug("Graceful shutdown completed");
271+
sessions.clear();
272+
if (this.keepAliveScheduler != null) {
273+
this.keepAliveScheduler.shutdown();
274+
}
275+
});
229276
}
230277

231278
/**
@@ -396,6 +443,8 @@ public static class Builder {
396443

397444
private String sseEndpoint = DEFAULT_SSE_ENDPOINT;
398445

446+
private Duration keepAliveInterval;
447+
399448
/**
400449
* Sets the ObjectMapper to use for JSON serialization/deserialization of MCP
401450
* messages.
@@ -446,6 +495,17 @@ public Builder sseEndpoint(String sseEndpoint) {
446495
return this;
447496
}
448497

498+
/**
499+
* Sets the interval for sending keep-alive pings to clients.
500+
* @param keepAliveInterval The keep-alive interval duration. If null, keep-alive
501+
* is disabled.
502+
* @return this builder instance
503+
*/
504+
public Builder keepAliveInterval(Duration keepAliveInterval) {
505+
this.keepAliveInterval = keepAliveInterval;
506+
return this;
507+
}
508+
449509
/**
450510
* Builds a new instance of {@link WebFluxSseServerTransportProvider} with the
451511
* configured settings.
@@ -456,7 +516,8 @@ public WebFluxSseServerTransportProvider build() {
456516
Assert.notNull(objectMapper, "ObjectMapper must be set");
457517
Assert.notNull(messageEndpoint, "Message endpoint must be set");
458518

459-
return new WebFluxSseServerTransportProvider(objectMapper, baseUrl, messageEndpoint, sseEndpoint);
519+
return new WebFluxSseServerTransportProvider(objectMapper, baseUrl, messageEndpoint, sseEndpoint,
520+
keepAliveInterval);
460521
}
461522

462523
}

mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcSseServerTransportProvider.java

Lines changed: 154 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import io.modelcontextprotocol.spec.McpServerTransportProvider;
1919
import io.modelcontextprotocol.spec.McpServerSession;
2020
import io.modelcontextprotocol.util.Assert;
21+
import io.modelcontextprotocol.util.KeepAliveScheduler;
22+
2123
import org.slf4j.Logger;
2224
import org.slf4j.LoggerFactory;
2325
import reactor.core.publisher.Flux;
@@ -107,6 +109,8 @@ public class WebMvcSseServerTransportProvider implements McpServerTransportProvi
107109
*/
108110
private volatile boolean isClosing = false;
109111

112+
private KeepAliveScheduler keepAliveScheduler;
113+
110114
/**
111115
* Constructs a new WebMvcSseServerTransportProvider instance with the default SSE
112116
* endpoint.
@@ -116,7 +120,10 @@ public class WebMvcSseServerTransportProvider implements McpServerTransportProvi
116120
* messages via HTTP POST. This endpoint will be communicated to clients through the
117121
* SSE connection's initial endpoint event.
118122
* @throws IllegalArgumentException if either objectMapper or messageEndpoint is null
123+
* @deprecated Use the builder {@link #builder()} instead for better configuration
124+
* options.
119125
*/
126+
@Deprecated
120127
public WebMvcSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint) {
121128
this(objectMapper, messageEndpoint, DEFAULT_SSE_ENDPOINT);
122129
}
@@ -130,7 +137,10 @@ public WebMvcSseServerTransportProvider(ObjectMapper objectMapper, String messag
130137
* SSE connection's initial endpoint event.
131138
* @param sseEndpoint The endpoint URI where clients establish their SSE connections.
132139
* @throws IllegalArgumentException if any parameter is null
140+
* @deprecated Use the builder {@link #builder()} instead for better configuration
141+
* options.
133142
*/
143+
@Deprecated
134144
public WebMvcSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint, String sseEndpoint) {
135145
this(objectMapper, "", messageEndpoint, sseEndpoint);
136146
}
@@ -146,9 +156,33 @@ public WebMvcSseServerTransportProvider(ObjectMapper objectMapper, String messag
146156
* SSE connection's initial endpoint event.
147157
* @param sseEndpoint The endpoint URI where clients establish their SSE connections.
148158
* @throws IllegalArgumentException if any parameter is null
159+
* @deprecated Use the builder {@link #builder()} instead for better configuration
160+
* options.
149161
*/
162+
@Deprecated
150163
public WebMvcSseServerTransportProvider(ObjectMapper objectMapper, String baseUrl, String messageEndpoint,
151164
String sseEndpoint) {
165+
this(objectMapper, baseUrl, messageEndpoint, sseEndpoint, null);
166+
}
167+
168+
/**
169+
* Constructs a new WebMvcSseServerTransportProvider instance.
170+
* @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
171+
* of messages.
172+
* @param baseUrl The base URL for the message endpoint, used to construct the full
173+
* endpoint URL for clients.
174+
* @param messageEndpoint The endpoint URI where clients should send their JSON-RPC
175+
* messages via HTTP POST. This endpoint will be communicated to clients through the
176+
* SSE connection's initial endpoint event.
177+
* @param sseEndpoint The endpoint URI where clients establish their SSE connections.
178+
* * @param keepAliveInterval The interval for sending keep-alive messages to
179+
* @throws IllegalArgumentException if any parameter is null
180+
* @deprecated Use the builder {@link #builder()} instead for better configuration
181+
* options.
182+
*/
183+
@Deprecated
184+
public WebMvcSseServerTransportProvider(ObjectMapper objectMapper, String baseUrl, String messageEndpoint,
185+
String sseEndpoint, Duration keepAliveInterval) {
152186
Assert.notNull(objectMapper, "ObjectMapper must not be null");
153187
Assert.notNull(baseUrl, "Message base URL must not be null");
154188
Assert.notNull(messageEndpoint, "Message endpoint must not be null");
@@ -162,6 +196,17 @@ public WebMvcSseServerTransportProvider(ObjectMapper objectMapper, String baseUr
162196
.GET(this.sseEndpoint, this::handleSseConnection)
163197
.POST(this.messageEndpoint, this::handleMessage)
164198
.build();
199+
200+
if (keepAliveInterval != null) {
201+
202+
this.keepAliveScheduler = KeepAliveScheduler
203+
.builder(() -> (isClosing) ? Flux.empty() : Flux.fromIterable(sessions.values()))
204+
.initialDelay(keepAliveInterval)
205+
.interval(keepAliveInterval)
206+
.build();
207+
208+
this.keepAliveScheduler.start();
209+
}
165210
}
166211

167212
@Override
@@ -209,10 +254,13 @@ public Mono<Void> closeGracefully() {
209254
return Flux.fromIterable(sessions.values()).doFirst(() -> {
210255
this.isClosing = true;
211256
logger.debug("Initiating graceful shutdown with {} active sessions", sessions.size());
212-
})
213-
.flatMap(McpServerSession::closeGracefully)
214-
.then()
215-
.doOnSuccess(v -> logger.debug("Graceful shutdown completed"));
257+
}).flatMap(McpServerSession::closeGracefully).then().doOnSuccess(v -> {
258+
logger.debug("Graceful shutdown completed");
259+
sessions.clear();
260+
if (this.keepAliveScheduler != null) {
261+
this.keepAliveScheduler.shutdown();
262+
}
263+
});
216264
}
217265

218266
/**
@@ -435,4 +483,106 @@ public void close() {
435483

436484
}
437485

486+
/**
487+
* Creates a new Builder instance for configuring and creating instances of
488+
* WebMvcSseServerTransportProvider.
489+
* @return A new Builder instance
490+
*/
491+
public static Builder builder() {
492+
return new Builder();
493+
}
494+
495+
/**
496+
* Builder for creating instances of WebMvcSseServerTransportProvider.
497+
* <p>
498+
* This builder provides a fluent API for configuring and creating instances of
499+
* WebMvcSseServerTransportProvider with custom settings.
500+
*/
501+
public static class Builder {
502+
503+
private ObjectMapper objectMapper = new ObjectMapper();
504+
505+
private String baseUrl = "";
506+
507+
private String messageEndpoint;
508+
509+
private String sseEndpoint = DEFAULT_SSE_ENDPOINT;
510+
511+
private Duration keepAliveInterval;
512+
513+
/**
514+
* Sets the JSON object mapper to use for message serialization/deserialization.
515+
* @param objectMapper The object mapper to use
516+
* @return This builder instance for method chaining
517+
*/
518+
public Builder objectMapper(ObjectMapper objectMapper) {
519+
Assert.notNull(objectMapper, "ObjectMapper must not be null");
520+
this.objectMapper = objectMapper;
521+
return this;
522+
}
523+
524+
/**
525+
* Sets the base URL for the server transport.
526+
* @param baseUrl The base URL to use
527+
* @return This builder instance for method chaining
528+
*/
529+
public Builder baseUrl(String baseUrl) {
530+
Assert.notNull(baseUrl, "Base URL must not be null");
531+
this.baseUrl = baseUrl;
532+
return this;
533+
}
534+
535+
/**
536+
* Sets the endpoint path where clients will send their messages.
537+
* @param messageEndpoint The message endpoint path
538+
* @return This builder instance for method chaining
539+
*/
540+
public Builder messageEndpoint(String messageEndpoint) {
541+
Assert.hasText(messageEndpoint, "Message endpoint must not be empty");
542+
this.messageEndpoint = messageEndpoint;
543+
return this;
544+
}
545+
546+
/**
547+
* Sets the endpoint path where clients will establish SSE connections.
548+
* <p>
549+
* If not specified, the default value of {@link #DEFAULT_SSE_ENDPOINT} will be
550+
* used.
551+
* @param sseEndpoint The SSE endpoint path
552+
* @return This builder instance for method chaining
553+
*/
554+
public Builder sseEndpoint(String sseEndpoint) {
555+
Assert.hasText(sseEndpoint, "SSE endpoint must not be empty");
556+
this.sseEndpoint = sseEndpoint;
557+
return this;
558+
}
559+
560+
/**
561+
* Sets the interval for keep-alive pings.
562+
* <p>
563+
* If not specified, keep-alive pings will be disabled.
564+
* @param keepAliveInterval The interval duration for keep-alive pings
565+
* @return This builder instance for method chaining
566+
*/
567+
public Builder keepAliveInterval(Duration keepAliveInterval) {
568+
this.keepAliveInterval = keepAliveInterval;
569+
return this;
570+
}
571+
572+
/**
573+
* Builds a new instance of WebMvcSseServerTransportProvider with the configured
574+
* settings.
575+
* @return A new WebMvcSseServerTransportProvider instance
576+
* @throws IllegalStateException if objectMapper or messageEndpoint is not set
577+
*/
578+
public WebMvcSseServerTransportProvider build() {
579+
if (messageEndpoint == null) {
580+
throw new IllegalStateException("MessageEndpoint must be set");
581+
}
582+
return new WebMvcSseServerTransportProvider(objectMapper, baseUrl, messageEndpoint, sseEndpoint,
583+
keepAliveInterval);
584+
}
585+
586+
}
587+
438588
}

0 commit comments

Comments
 (0)