Skip to content

Streamable HTTP Server abstractions and WebFlux transport provider #420

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jul 30, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ private Flux<McpSchema.JSONRPCMessage> extractError(ClientResponse response, Str
McpSchema.JSONRPCResponse jsonRpcResponse = objectMapper.readValue(body,
McpSchema.JSONRPCResponse.class);
jsonRpcError = jsonRpcResponse.error();
toPropagate = new McpError(jsonRpcError);
toPropagate = jsonRpcError != null ? new McpError(jsonRpcError)
: new McpError("Can't parse the jsonResponse " + jsonRpcResponse);
}
catch (IOException ex) {
toPropagate = new RuntimeException("Sending request failed", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ public class WebFluxStreamableServerTransportProvider implements McpStreamableSe

public static final String MESSAGE_EVENT_TYPE = "message";

public static final String ENDPOINT_EVENT_TYPE = "endpoint";

public static final String DEFAULT_BASE_URL = "";

private final ObjectMapper objectMapper;
Expand Down Expand Up @@ -263,17 +261,28 @@ private Mono<ServerResponse> handlePost(ServerRequest request) {
McpStreamableServerSession.McpStreamableServerSessionInit init = this.sessionFactory
.startSession(initializeRequest);
sessions.put(init.session().getId(), init.session());
return init.initResult()
return init.initResult().map(initializeResult -> {
McpSchema.JSONRPCResponse jsonrpcResponse = new McpSchema.JSONRPCResponse(
McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), initializeResult, null);
try {
return this.objectMapper.writeValueAsString(jsonrpcResponse);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems redundant as the WebFlux would do the conversion?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose so! Although it's a temporary mitigation, I think we should deal with the init result conversion into JSON at an earlier layer. Need to come back to this.

}
catch (IOException e) {
logger.warn("Failed to serialize initResponse", e);
throw Exceptions.propagate(e);
}
})
.flatMap(initResult -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.header("mcp-session-id", init.session().getId())
.bodyValue(initResult));
}

if (!request.headers().asHttpHeaders().containsKey("sessionId")) {
if (!request.headers().asHttpHeaders().containsKey("mcp-session-id")) {
return ServerResponse.badRequest().bodyValue(new McpError("Session ID missing"));
}

String sessionId = request.headers().asHttpHeaders().getFirst("sessionId");
String sessionId = request.headers().asHttpHeaders().getFirst("mcp-session-id");
McpStreamableServerSession session = sessions.get(sessionId);

if (session == null) {
Expand Down Expand Up @@ -308,7 +317,9 @@ else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) {
logger.error("Failed to deserialize message: {}", e.getMessage());
return ServerResponse.badRequest().bodyValue(new McpError("Invalid message format"));
}
}).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext));
})
.switchIfEmpty(ServerResponse.badRequest().build())
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext));
}

private Mono<ServerResponse> handleDelete(ServerRequest request) {
Expand Down
Loading
Loading