Skip to content

Commit 3f83ef2

Browse files
committed
Support server to client notifications from the stateless transport
The MCP spec allows stateless servers to send notifications to the client during a request. The response needs to be upgraded to SSE and the notifications are send in a stream until the final result is sent. This commit adds a `sendNotification` method to the transport context allowing each transport implementation to implement it or not. In this commit, HttpServletStatelessServerTransport implements the method and when the caller first sends a notification, the response is changed to `TEXT_EVENT_STREAM` and events are then streamed until the final result. This change will allow future features such as logging, list changes, etc. should we ever decide to support sessions in some manner. Even if we don't support sessions, sending progress notifications is a useful feature by itself.
1 parent a14ef42 commit 3f83ef2

File tree

4 files changed

+299
-34
lines changed

4 files changed

+299
-34
lines changed

mcp/src/main/java/io/modelcontextprotocol/server/McpTransportContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,13 @@ public interface McpTransportContext {
4747
*/
4848
McpTransportContext copy();
4949

50+
/**
51+
* Sends a notification from the server to the client.
52+
* @param method notification method name
53+
* @param params any parameters or {@code null}
54+
*/
55+
default void sendNotification(String method, Object params) {
56+
throw new UnsupportedOperationException("Not supported in this implementation of MCP transport context");
57+
}
58+
5059
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2024-2025 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.server;
6+
7+
import java.util.function.BiConsumer;
8+
9+
public class StatelessMcpTransportContext implements McpTransportContext {
10+
11+
private final McpTransportContext delegate;
12+
13+
private final BiConsumer<String, Object> notificationHandler;
14+
15+
/**
16+
* Create an empty instance.
17+
*/
18+
public StatelessMcpTransportContext(BiConsumer<String, Object> notificationHandler) {
19+
this(new DefaultMcpTransportContext(), notificationHandler);
20+
}
21+
22+
private StatelessMcpTransportContext(McpTransportContext delegate, BiConsumer<String, Object> notificationHandler) {
23+
this.delegate = delegate;
24+
this.notificationHandler = notificationHandler;
25+
}
26+
27+
@Override
28+
public Object get(String key) {
29+
return this.delegate.get(key);
30+
}
31+
32+
@Override
33+
public void put(String key, Object value) {
34+
this.delegate.put(key, value);
35+
}
36+
37+
public McpTransportContext copy() {
38+
return new StatelessMcpTransportContext(delegate.copy(), notificationHandler);
39+
}
40+
41+
@Override
42+
public void sendNotification(String method, Object params) {
43+
notificationHandler.accept(method, params);
44+
}
45+
46+
}

mcp/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStatelessServerTransport.java

Lines changed: 96 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,11 @@
44

55
package io.modelcontextprotocol.server.transport;
66

7-
import java.io.BufferedReader;
8-
import java.io.IOException;
9-
import java.io.PrintWriter;
10-
11-
import org.slf4j.Logger;
12-
import org.slf4j.LoggerFactory;
13-
147
import com.fasterxml.jackson.databind.ObjectMapper;
15-
16-
import io.modelcontextprotocol.server.DefaultMcpTransportContext;
178
import io.modelcontextprotocol.server.McpStatelessServerHandler;
189
import io.modelcontextprotocol.server.McpTransportContext;
1910
import io.modelcontextprotocol.server.McpTransportContextExtractor;
11+
import io.modelcontextprotocol.server.StatelessMcpTransportContext;
2012
import io.modelcontextprotocol.spec.McpError;
2113
import io.modelcontextprotocol.spec.McpSchema;
2214
import io.modelcontextprotocol.spec.McpStatelessServerTransport;
@@ -26,8 +18,17 @@
2618
import jakarta.servlet.http.HttpServlet;
2719
import jakarta.servlet.http.HttpServletRequest;
2820
import jakarta.servlet.http.HttpServletResponse;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
2923
import reactor.core.publisher.Mono;
3024

25+
import java.io.BufferedReader;
26+
import java.io.IOException;
27+
import java.io.PrintWriter;
28+
import java.util.concurrent.atomic.AtomicBoolean;
29+
import java.util.concurrent.atomic.AtomicInteger;
30+
import java.util.function.BiConsumer;
31+
3132
/**
3233
* Implementation of an HttpServlet based {@link McpStatelessServerTransport}.
3334
*
@@ -123,11 +124,16 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response)
123124
return;
124125
}
125126

126-
McpTransportContext transportContext = this.contextExtractor.extract(request, new DefaultMcpTransportContext());
127+
AtomicInteger nextId = new AtomicInteger(0);
128+
AtomicBoolean upgradedToSse = new AtomicBoolean(false);
129+
BiConsumer<String, Object> notificationHandler = buildNotificationHandler(response, upgradedToSse, nextId);
130+
McpTransportContext transportContext = this.contextExtractor.extract(request,
131+
new StatelessMcpTransportContext(notificationHandler));
127132

128133
String accept = request.getHeader(ACCEPT);
129134
if (accept == null || !(accept.contains(APPLICATION_JSON) && accept.contains(TEXT_EVENT_STREAM))) {
130-
this.responseError(response, HttpServletResponse.SC_BAD_REQUEST,
135+
this.responseError(response, HttpServletResponse.SC_BAD_REQUEST, null, upgradedToSse.get(),
136+
nextId.getAndIncrement(),
131137
new McpError("Both application/json and text/event-stream required in Accept header"));
132138
return;
133139
}
@@ -149,18 +155,24 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response)
149155
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext))
150156
.block();
151157

152-
response.setContentType(APPLICATION_JSON);
153-
response.setCharacterEncoding(UTF_8);
154-
response.setStatus(HttpServletResponse.SC_OK);
155-
156158
String jsonResponseText = objectMapper.writeValueAsString(jsonrpcResponse);
157-
PrintWriter writer = response.getWriter();
158-
writer.write(jsonResponseText);
159-
writer.flush();
159+
if (upgradedToSse.get()) {
160+
sendEvent(response.getWriter(), jsonResponseText, nextId.getAndIncrement());
161+
}
162+
else {
163+
response.setContentType(APPLICATION_JSON);
164+
response.setCharacterEncoding(UTF_8);
165+
response.setStatus(HttpServletResponse.SC_OK);
166+
167+
PrintWriter writer = response.getWriter();
168+
writer.write(jsonResponseText);
169+
writer.flush();
170+
}
160171
}
161172
catch (Exception e) {
162173
logger.error("Failed to handle request: {}", e.getMessage());
163-
this.responseError(response, HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
174+
this.responseError(response, HttpServletResponse.SC_INTERNAL_SERVER_ERROR, jsonrpcRequest.id(),
175+
upgradedToSse.get(), nextId.getAndIncrement(),
164176
new McpError("Failed to handle request: " + e.getMessage()));
165177
}
166178
}
@@ -173,41 +185,53 @@ else if (message instanceof McpSchema.JSONRPCNotification jsonrpcNotification) {
173185
}
174186
catch (Exception e) {
175187
logger.error("Failed to handle notification: {}", e.getMessage());
176-
this.responseError(response, HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
188+
this.responseError(response, HttpServletResponse.SC_INTERNAL_SERVER_ERROR, null,
189+
upgradedToSse.get(), nextId.getAndIncrement(),
177190
new McpError("Failed to handle notification: " + e.getMessage()));
178191
}
179192
}
180193
else {
181-
this.responseError(response, HttpServletResponse.SC_BAD_REQUEST,
182-
new McpError("The server accepts either requests or notifications"));
194+
this.responseError(response, HttpServletResponse.SC_BAD_REQUEST, null, upgradedToSse.get(),
195+
nextId.getAndIncrement(), new McpError("The server accepts either requests or notifications"));
183196
}
184197
}
185198
catch (IllegalArgumentException | IOException e) {
186199
logger.error("Failed to deserialize message: {}", e.getMessage());
187-
this.responseError(response, HttpServletResponse.SC_BAD_REQUEST, new McpError("Invalid message format"));
200+
this.responseError(response, HttpServletResponse.SC_BAD_REQUEST, null, upgradedToSse.get(),
201+
nextId.getAndIncrement(), new McpError("Invalid message format"));
188202
}
189203
catch (Exception e) {
190204
logger.error("Unexpected error handling message: {}", e.getMessage());
191-
this.responseError(response, HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
192-
new McpError("Unexpected error: " + e.getMessage()));
205+
this.responseError(response, HttpServletResponse.SC_INTERNAL_SERVER_ERROR, null, upgradedToSse.get(),
206+
nextId.getAndIncrement(), new McpError("Unexpected error: " + e.getMessage()));
193207
}
194208
}
195209

196210
/**
197211
* Sends an error response to the client.
198212
* @param response The HTTP servlet response
199213
* @param httpCode The HTTP status code
214+
* @param upgradedToSse true if the response is upgraded to SSE, false otherwise
215+
* @param eventIdIfNeeded if upgradedToSse, the event ID to use, otherwise ignored
200216
* @param mcpError The MCP error to send
201217
* @throws IOException If an I/O error occurs
202218
*/
203-
private void responseError(HttpServletResponse response, int httpCode, McpError mcpError) throws IOException {
204-
response.setContentType(APPLICATION_JSON);
205-
response.setCharacterEncoding(UTF_8);
206-
response.setStatus(httpCode);
207-
String jsonError = objectMapper.writeValueAsString(mcpError);
208-
PrintWriter writer = response.getWriter();
209-
writer.write(jsonError);
210-
writer.flush();
219+
private void responseError(HttpServletResponse response, int httpCode, Object requestId, boolean upgradedToSse,
220+
int eventIdIfNeeded, McpError mcpError) throws IOException {
221+
if (upgradedToSse) {
222+
String jsonError = objectMapper.writeValueAsString(new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION,
223+
requestId, null, mcpError.getJsonRpcError()));
224+
sendEvent(response.getWriter(), jsonError, eventIdIfNeeded);
225+
}
226+
else {
227+
response.setContentType(APPLICATION_JSON);
228+
response.setCharacterEncoding(UTF_8);
229+
response.setStatus(httpCode);
230+
PrintWriter writer = response.getWriter();
231+
String jsonError = objectMapper.writeValueAsString(mcpError);
232+
writer.write(jsonError);
233+
writer.flush();
234+
}
211235
}
212236

213237
/**
@@ -303,4 +327,43 @@ public HttpServletStatelessServerTransport build() {
303327

304328
}
305329

330+
private BiConsumer<String, Object> buildNotificationHandler(HttpServletResponse response,
331+
AtomicBoolean upgradedToSse, AtomicInteger nextId) {
332+
AtomicBoolean responseInitialized = new AtomicBoolean(false);
333+
334+
return (notificationMethod, params) -> {
335+
if (responseInitialized.compareAndSet(false, true)) {
336+
response.setContentType(TEXT_EVENT_STREAM);
337+
response.setCharacterEncoding(UTF_8);
338+
response.setStatus(HttpServletResponse.SC_OK);
339+
}
340+
341+
upgradedToSse.set(true);
342+
343+
McpSchema.JSONRPCNotification notification = new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION,
344+
notificationMethod, params);
345+
try {
346+
sendEvent(response.getWriter(), objectMapper.writeValueAsString(notification),
347+
nextId.getAndIncrement());
348+
}
349+
catch (IOException e) {
350+
logger.error("Failed to handle notification: {}", e.getMessage());
351+
throw new McpError(new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INTERNAL_ERROR,
352+
e.getMessage(), null));
353+
}
354+
};
355+
}
356+
357+
private void sendEvent(PrintWriter writer, String data, int id) throws IOException {
358+
// tested with MCP inspector. Event must consist of these two fields and only
359+
// these two fields
360+
writer.write("id: " + id + "\n");
361+
writer.write("data: " + data + "\n\n");
362+
writer.flush();
363+
364+
if (writer.checkError()) {
365+
throw new IOException("Client disconnected");
366+
}
367+
}
368+
306369
}

0 commit comments

Comments
 (0)