Skip to content

Commit 27ef5a6

Browse files
committed
Support server to client notifications from the stateless transport
The MCP spec allows stateless servers to send notifications to the client during a request. The response needs to be upgraded to SSE and the notifications are send in a stream until the final result is sent. This commit adds a `sendNotification` method to the transport context allowing each transport implementation to implement it or not. In this commit, HttpServletStatelessServerTransport implements the method and when the caller first sends a notification, the response is changed to `TEXT_EVENT_STREAM` and events are then streamed until the final result. This change will allow future features such as logging, list changes, etc. should we ever decide to support sessions in some manner. Even if we don't support sessions, sending progress notifications is a useful feature by itself.
1 parent a14ef42 commit 27ef5a6

File tree

4 files changed

+213
-18
lines changed

4 files changed

+213
-18
lines changed

mcp/src/main/java/io/modelcontextprotocol/server/McpTransportContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,13 @@ public interface McpTransportContext {
4747
*/
4848
McpTransportContext copy();
4949

50+
/**
51+
* Sends a notification from the server to the client.
52+
* @param method notification method name
53+
* @param params any parameters or {@code null}
54+
*/
55+
default void sendNotification(String method, Object params) {
56+
throw new UnsupportedOperationException("Not supported in this implementation of MCP transport context");
57+
}
58+
5059
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2024-2025 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.server;
6+
7+
import java.util.function.BiConsumer;
8+
9+
public class StatelessMcpTransportContext implements McpTransportContext {
10+
11+
private final McpTransportContext delegate;
12+
13+
private final BiConsumer<String, Object> notificationHandler;
14+
15+
/**
16+
* Create an empty instance.
17+
*/
18+
public StatelessMcpTransportContext(BiConsumer<String, Object> notificationHandler) {
19+
this(new DefaultMcpTransportContext(), notificationHandler);
20+
}
21+
22+
private StatelessMcpTransportContext(McpTransportContext delegate, BiConsumer<String, Object> notificationHandler) {
23+
this.delegate = delegate;
24+
this.notificationHandler = notificationHandler;
25+
}
26+
27+
@Override
28+
public Object get(String key) {
29+
return this.delegate.get(key);
30+
}
31+
32+
@Override
33+
public void put(String key, Object value) {
34+
this.delegate.put(key, value);
35+
}
36+
37+
public McpTransportContext copy() {
38+
return new StatelessMcpTransportContext(delegate.copy(), notificationHandler);
39+
}
40+
41+
@Override
42+
public void sendNotification(String method, Object params) {
43+
notificationHandler.accept(method, params);
44+
}
45+
46+
}

mcp/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStatelessServerTransport.java

Lines changed: 66 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,11 @@
44

55
package io.modelcontextprotocol.server.transport;
66

7-
import java.io.BufferedReader;
8-
import java.io.IOException;
9-
import java.io.PrintWriter;
10-
11-
import org.slf4j.Logger;
12-
import org.slf4j.LoggerFactory;
13-
147
import com.fasterxml.jackson.databind.ObjectMapper;
15-
16-
import io.modelcontextprotocol.server.DefaultMcpTransportContext;
178
import io.modelcontextprotocol.server.McpStatelessServerHandler;
189
import io.modelcontextprotocol.server.McpTransportContext;
1910
import io.modelcontextprotocol.server.McpTransportContextExtractor;
11+
import io.modelcontextprotocol.server.StatelessMcpTransportContext;
2012
import io.modelcontextprotocol.spec.McpError;
2113
import io.modelcontextprotocol.spec.McpSchema;
2214
import io.modelcontextprotocol.spec.McpStatelessServerTransport;
@@ -26,8 +18,17 @@
2618
import jakarta.servlet.http.HttpServlet;
2719
import jakarta.servlet.http.HttpServletRequest;
2820
import jakarta.servlet.http.HttpServletResponse;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
2923
import reactor.core.publisher.Mono;
3024

25+
import java.io.BufferedReader;
26+
import java.io.IOException;
27+
import java.io.PrintWriter;
28+
import java.util.concurrent.atomic.AtomicBoolean;
29+
import java.util.concurrent.atomic.AtomicInteger;
30+
import java.util.function.BiConsumer;
31+
3132
/**
3233
* Implementation of an HttpServlet based {@link McpStatelessServerTransport}.
3334
*
@@ -123,7 +124,11 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response)
123124
return;
124125
}
125126

126-
McpTransportContext transportContext = this.contextExtractor.extract(request, new DefaultMcpTransportContext());
127+
AtomicInteger nextId = new AtomicInteger(0);
128+
AtomicBoolean upgradedToSse = new AtomicBoolean(false);
129+
BiConsumer<String, Object> notificationHandler = buildNotificationHandler(response, upgradedToSse, nextId);
130+
McpTransportContext transportContext = this.contextExtractor.extract(request,
131+
new StatelessMcpTransportContext(notificationHandler));
127132

128133
String accept = request.getHeader(ACCEPT);
129134
if (accept == null || !(accept.contains(APPLICATION_JSON) && accept.contains(TEXT_EVENT_STREAM))) {
@@ -149,14 +154,19 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response)
149154
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext))
150155
.block();
151156

152-
response.setContentType(APPLICATION_JSON);
153-
response.setCharacterEncoding(UTF_8);
154-
response.setStatus(HttpServletResponse.SC_OK);
155-
156157
String jsonResponseText = objectMapper.writeValueAsString(jsonrpcResponse);
157-
PrintWriter writer = response.getWriter();
158-
writer.write(jsonResponseText);
159-
writer.flush();
158+
if (upgradedToSse.get()) {
159+
sendEvent(response.getWriter(), jsonResponseText, nextId.getAndIncrement());
160+
}
161+
else {
162+
response.setContentType(APPLICATION_JSON);
163+
response.setCharacterEncoding(UTF_8);
164+
response.setStatus(HttpServletResponse.SC_OK);
165+
166+
PrintWriter writer = response.getWriter();
167+
writer.write(jsonResponseText);
168+
writer.flush();
169+
}
160170
}
161171
catch (Exception e) {
162172
logger.error("Failed to handle request: {}", e.getMessage());
@@ -303,4 +313,43 @@ public HttpServletStatelessServerTransport build() {
303313

304314
}
305315

316+
private BiConsumer<String, Object> buildNotificationHandler(HttpServletResponse response,
317+
AtomicBoolean upgradedToSse, AtomicInteger nextId) {
318+
AtomicBoolean responseInitialized = new AtomicBoolean(false);
319+
320+
return (notificationMethod, params) -> {
321+
upgradedToSse.set(true);
322+
323+
if (responseInitialized.compareAndSet(false, true)) {
324+
response.setContentType(TEXT_EVENT_STREAM);
325+
response.setCharacterEncoding(UTF_8);
326+
response.setStatus(HttpServletResponse.SC_OK);
327+
}
328+
329+
McpSchema.JSONRPCNotification notification = new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION,
330+
notificationMethod, params);
331+
try {
332+
sendEvent(response.getWriter(), objectMapper.writeValueAsString(notification),
333+
nextId.getAndIncrement());
334+
}
335+
catch (IOException e) {
336+
logger.error("Failed to handle notification: {}", e.getMessage());
337+
throw new McpError(new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INTERNAL_ERROR,
338+
e.getMessage(), null));
339+
}
340+
};
341+
}
342+
343+
private void sendEvent(PrintWriter writer, String data, int id) throws IOException {
344+
// tested with MCP inspector. Event must consist of these two fields and only
345+
// these two fields
346+
writer.write("id: " + id + "\n");
347+
writer.write("data: " + data + "\n\n");
348+
writer.flush();
349+
350+
if (writer.checkError()) {
351+
throw new IOException("Client disconnected");
352+
}
353+
}
354+
306355
}

mcp/src/test/java/io/modelcontextprotocol/server/HttpServletStatelessIntegrationTests.java

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,19 @@
3535
import org.springframework.mock.web.MockHttpServletResponse;
3636
import org.springframework.web.client.RestClient;
3737

38+
import java.net.URI;
39+
import java.net.http.HttpClient;
40+
import java.net.http.HttpRequest;
41+
import java.net.http.HttpResponse;
3842
import java.time.Duration;
43+
import java.util.Iterator;
3944
import java.util.List;
4045
import java.util.Map;
46+
import java.util.UUID;
4147
import java.util.concurrent.ConcurrentHashMap;
4248
import java.util.concurrent.atomic.AtomicReference;
4349
import java.util.function.BiFunction;
50+
import java.util.stream.Stream;
4451

4552
import static io.modelcontextprotocol.server.transport.HttpServletStatelessServerTransport.APPLICATION_JSON;
4653
import static io.modelcontextprotocol.server.transport.HttpServletStatelessServerTransport.TEXT_EVENT_STREAM;
@@ -61,10 +68,13 @@ class HttpServletStatelessIntegrationTests {
6168

6269
private Tomcat tomcat;
6370

71+
private ObjectMapper objectMapper;
72+
6473
@BeforeEach
6574
public void before() {
75+
objectMapper = new ObjectMapper();
6676
this.mcpStatelessServerTransport = HttpServletStatelessServerTransport.builder()
67-
.objectMapper(new ObjectMapper())
77+
.objectMapper(objectMapper)
6878
.messageEndpoint(CUSTOM_MESSAGE_ENDPOINT)
6979
.build();
7080

@@ -219,6 +229,87 @@ void testCompletionShouldReturnExpectedSuggestions(String clientType) {
219229
mcpServer.close();
220230
}
221231

232+
@Test
233+
void testNotifications() throws Exception {
234+
235+
Tool tool = Tool.builder().name("test").build();
236+
237+
final int PROGRESS_QTY = 1000;
238+
final String progressMessage = "We're working on it...";
239+
240+
var progressToken = UUID.randomUUID().toString();
241+
var callResponse = new CallToolResult(List.of(), null, null, Map.of("progressToken", progressToken));
242+
McpStatelessServerFeatures.SyncToolSpecification toolSpecification = new McpStatelessServerFeatures.SyncToolSpecification(
243+
tool, (transportContext, request) -> {
244+
// Simulate sending progress notifications - send enough to ensure
245+
// that cunked transfer encoding is used
246+
for (int i = 0; i < PROGRESS_QTY; i++) {
247+
transportContext.sendNotification(McpSchema.METHOD_NOTIFICATION_PROGRESS,
248+
new McpSchema.ProgressNotification(progressToken, i, 5.0, progressMessage));
249+
}
250+
return callResponse;
251+
});
252+
253+
var mcpServer = McpServer.sync(mcpStatelessServerTransport)
254+
.capabilities(ServerCapabilities.builder().tools(true).build())
255+
.tools(toolSpecification)
256+
.build();
257+
258+
HttpClient client = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build();
259+
HttpRequest request = HttpRequest.newBuilder()
260+
.method("POST",
261+
HttpRequest.BodyPublishers.ofString(
262+
objectMapper.writeValueAsString(new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION,
263+
"tools/call", "1", new McpSchema.CallToolRequest("test", Map.of())))))
264+
.header("Content-Type", APPLICATION_JSON)
265+
.header("Accept", APPLICATION_JSON + "," + TEXT_EVENT_STREAM)
266+
.uri(URI.create("http://localhost:" + PORT + CUSTOM_MESSAGE_ENDPOINT))
267+
.build();
268+
269+
HttpResponse<Stream<String>> response = client.send(request, HttpResponse.BodyHandlers.ofLines());
270+
assertThat(response.headers().firstValue("Transfer-Encoding")).contains("chunked");
271+
272+
List<String> responseBody = response.body().toList();
273+
274+
assertThat(responseBody).hasSize((PROGRESS_QTY + 1) * 4); // 4 lines per progress
275+
// notification + 4
276+
// for
277+
// the call result
278+
279+
Iterator<String> iterator = responseBody.iterator();
280+
for (int i = 0; i < PROGRESS_QTY; ++i) {
281+
String eventLine = iterator.next();
282+
String idLine = iterator.next();
283+
String dataLine = iterator.next();
284+
String blankLine = iterator.next();
285+
286+
McpSchema.ProgressNotification expectedNotification = new McpSchema.ProgressNotification(progressToken, i,
287+
5.0, progressMessage);
288+
McpSchema.JSONRPCNotification expectedJsonRpcNotification = new McpSchema.JSONRPCNotification(
289+
McpSchema.JSONRPC_VERSION, McpSchema.METHOD_NOTIFICATION_PROGRESS, expectedNotification);
290+
291+
assertThat(eventLine).isEqualTo("event: notification");
292+
assertThat(idLine).isEqualTo("id: " + i);
293+
assertThat(dataLine).isEqualTo("data: " + objectMapper.writeValueAsString(expectedJsonRpcNotification));
294+
assertThat(blankLine).isBlank();
295+
}
296+
297+
String eventLine = iterator.next();
298+
String idLine = iterator.next();
299+
String dataLine = iterator.next();
300+
String blankLine = iterator.next();
301+
302+
assertThat(eventLine).isEqualTo("event: result");
303+
assertThat(idLine).isEqualTo("id: " + PROGRESS_QTY);
304+
assertThat(dataLine).isEqualTo("data: " + objectMapper
305+
.writeValueAsString(new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, "1", callResponse, null)));
306+
assertThat(blankLine).isBlank();
307+
308+
assertThat(iterator.hasNext()).isFalse();
309+
310+
mcpServer.close();
311+
}
312+
222313
// ---------------------------------------
223314
// Tool Structured Output Schema Tests
224315
// ---------------------------------------

0 commit comments

Comments
 (0)