Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

import static io.modelcontextprotocol.spec.McpSchema.Headers.LAST_EVENT_ID;
import static io.modelcontextprotocol.spec.McpSchema.Headers.MCP_SESSION_ID;

/**
* An implementation of the Streamable HTTP protocol as defined by the
* <code>2025-03-26</code> version of the MCP specification.
Expand Down Expand Up @@ -128,7 +131,7 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
private DefaultMcpTransportSession createTransportSession() {
Function<String, Publisher<Void>> onClose = sessionId -> sessionId == null ? Mono.empty()
: webClient.delete().uri(this.endpoint).headers(httpHeaders -> {
httpHeaders.add("mcp-session-id", sessionId);
httpHeaders.add(MCP_SESSION_ID, sessionId);
})
.retrieve()
.toBodilessEntity()
Expand Down Expand Up @@ -187,9 +190,9 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
.uri(this.endpoint)
.accept(MediaType.TEXT_EVENT_STREAM)
.headers(httpHeaders -> {
transportSession.sessionId().ifPresent(id -> httpHeaders.add("mcp-session-id", id));
transportSession.sessionId().ifPresent(id -> httpHeaders.add(MCP_SESSION_ID, id));
if (stream != null) {
stream.lastId().ifPresent(id -> httpHeaders.add("last-event-id", id));
stream.lastId().ifPresent(id -> httpHeaders.add(LAST_EVENT_ID, id));
}
})
.exchangeToFlux(response -> {
Expand Down Expand Up @@ -247,12 +250,11 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
.uri(this.endpoint)
.accept(MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_JSON)
.headers(httpHeaders -> {
transportSession.sessionId().ifPresent(id -> httpHeaders.add("mcp-session-id", id));
transportSession.sessionId().ifPresent(id -> httpHeaders.add(MCP_SESSION_ID, id));
})
.bodyValue(message)
.exchangeToFlux(response -> {
if (transportSession
.markInitialized(response.headers().asHttpHeaders().getFirst("mcp-session-id"))) {
if (transportSession.markInitialized(response.headers().asHttpHeaders().getFirst(MCP_SESSION_ID))) {
// Once we have a session, we try to open an async stream for
// the server to send notifications and requests out-of-band.
reconnect(null).contextWrite(sink.contextView()).subscribe();
Expand Down
22 changes: 22 additions & 0 deletions mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,28 @@ public static final class ErrorCodes {

}

// ---------------------------
// JSON-RPC Protocol Headers
// ---------------------------
/**
* Standard header name used in MCP JSON-RPC request and responses.
*/
public static final class Headers {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that this belongs around something like McpHttpTransport interface rather than in the McpSpec, which is more about the JSON-RPC layer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The primary purpose here is to provide users with the MCP standard header name information. The MCP-Session-Id does belong to the transport category. Of course, I have omitted the last-event-id here. At present, I think defining these header names as part of the MCPSchema is the most reasonable approach because I haven't thought of any other more reasonable alternatives. These header names are indeed mentioned in the Specification. Do you have any better suggestions? @chemicL


/**
* The Model Context Protocol (MCP) session ID header name.
*/
public static final String MCP_SESSION_ID = "Mcp-Session-Id";

/**
* The Last-Event-ID HTTP request header reports an EventSource object's last
* event ID string to the server when the user agent is to reestablish the
* connection.
*/
public static final String LAST_EVENT_ID = "Last-Event-ID";

}

public sealed interface Request permits InitializeRequest, CallToolRequest, CreateMessageRequest, ElicitRequest,
CompleteRequest, GetPromptRequest {

Expand Down