Skip to content

Streamable HTTP Server abstractions and WebFlux transport provider #420

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jul 30, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiFunction;

import io.modelcontextprotocol.spec.DefaultMcpStreamableServerSessionFactory;
import io.modelcontextprotocol.spec.McpServerTransportProviderBase;
import io.modelcontextprotocol.spec.McpStreamableServerTransportProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -86,7 +89,7 @@ public class McpAsyncServer {

private static final Logger logger = LoggerFactory.getLogger(McpAsyncServer.class);

private final McpServerTransportProvider mcpTransportProvider;
private final McpServerTransportProviderBase mcpTransportProvider;

private final ObjectMapper objectMapper;

Expand Down Expand Up @@ -139,7 +142,57 @@ public class McpAsyncServer {
this.uriTemplateManagerFactory = uriTemplateManagerFactory;
this.jsonSchemaValidator = jsonSchemaValidator;

Map<String, McpServerSession.RequestHandler<?>> requestHandlers = new HashMap<>();
Map<String, McpRequestHandler<?>> requestHandlers = prepareRequestHandlers();
Map<String, McpNotificationHandler> notificationHandlers = prepareNotificationHandlers(features);

mcpTransportProvider.setSessionFactory(
transport -> new McpServerSession(UUID.randomUUID().toString(), requestTimeout, transport,
this::asyncInitializeRequestHandler, Mono::empty, requestHandlers, notificationHandlers));
}

McpAsyncServer(McpStreamableServerTransportProvider mcpTransportProvider, ObjectMapper objectMapper,
McpServerFeatures.Async features, Duration requestTimeout,
McpUriTemplateManagerFactory uriTemplateManagerFactory, JsonSchemaValidator jsonSchemaValidator) {
this.mcpTransportProvider = mcpTransportProvider;
this.objectMapper = objectMapper;
this.serverInfo = features.serverInfo();
this.serverCapabilities = features.serverCapabilities();
this.instructions = features.instructions();
this.tools.addAll(withStructuredOutputHandling(jsonSchemaValidator, features.tools()));
this.resources.putAll(features.resources());
this.resourceTemplates.addAll(features.resourceTemplates());
this.prompts.putAll(features.prompts());
this.completions.putAll(features.completions());
this.uriTemplateManagerFactory = uriTemplateManagerFactory;
this.jsonSchemaValidator = jsonSchemaValidator;

Map<String, McpRequestHandler<?>> requestHandlers = prepareRequestHandlers();
Map<String, McpNotificationHandler> notificationHandlers = prepareNotificationHandlers(features);

mcpTransportProvider.setSessionFactory(new DefaultMcpStreamableServerSessionFactory(requestTimeout,
this::asyncInitializeRequestHandler, requestHandlers, notificationHandlers));
}

private Map<String, McpNotificationHandler> prepareNotificationHandlers(McpServerFeatures.Async features) {
Map<String, McpNotificationHandler> notificationHandlers = new HashMap<>();

notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_INITIALIZED, (exchange, params) -> Mono.empty());

List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootsChangeConsumers = features
.rootsChangeConsumers();

if (Utils.isEmpty(rootsChangeConsumers)) {
rootsChangeConsumers = List.of((exchange, roots) -> Mono.fromRunnable(() -> logger
.warn("Roots list changed notification, but no consumers provided. Roots list changed: {}", roots)));
}

notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_ROOTS_LIST_CHANGED,
asyncRootsListChangedNotificationHandler(rootsChangeConsumers));
return notificationHandlers;
}

private Map<String, McpRequestHandler<?>> prepareRequestHandlers() {
Map<String, McpRequestHandler<?>> requestHandlers = new HashMap<>();

// Initialize request handlers for standard MCP methods

Expand Down Expand Up @@ -174,25 +227,7 @@ public class McpAsyncServer {
if (this.serverCapabilities.completions() != null) {
requestHandlers.put(McpSchema.METHOD_COMPLETION_COMPLETE, completionCompleteRequestHandler());
}

Map<String, McpServerSession.NotificationHandler> notificationHandlers = new HashMap<>();

notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_INITIALIZED, (exchange, params) -> Mono.empty());

List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootsChangeConsumers = features
.rootsChangeConsumers();

if (Utils.isEmpty(rootsChangeConsumers)) {
rootsChangeConsumers = List.of((exchange, roots) -> Mono.fromRunnable(() -> logger
.warn("Roots list changed notification, but no consumers provided. Roots list changed: {}", roots)));
}

notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_ROOTS_LIST_CHANGED,
asyncRootsListChangedNotificationHandler(rootsChangeConsumers));

mcpTransportProvider.setSessionFactory(
transport -> new McpServerSession(UUID.randomUUID().toString(), requestTimeout, transport,
this::asyncInitializeRequestHandler, Mono::empty, requestHandlers, notificationHandlers));
return requestHandlers;
}

// ---------------------------------------
Expand Down Expand Up @@ -258,7 +293,7 @@ public void close() {
this.mcpTransportProvider.close();
}

private McpServerSession.NotificationHandler asyncRootsListChangedNotificationHandler(
private McpNotificationHandler asyncRootsListChangedNotificationHandler(
List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootsChangeConsumers) {
return (exchange, params) -> exchange.listRoots()
.flatMap(listRootsResult -> Flux.fromIterable(rootsChangeConsumers)
Expand Down Expand Up @@ -450,15 +485,15 @@ public Mono<Void> notifyToolsListChanged() {
return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_TOOLS_LIST_CHANGED, null);
}

private McpServerSession.RequestHandler<McpSchema.ListToolsResult> toolsListRequestHandler() {
private McpRequestHandler<McpSchema.ListToolsResult> toolsListRequestHandler() {
return (exchange, params) -> {
List<Tool> tools = this.tools.stream().map(McpServerFeatures.AsyncToolSpecification::tool).toList();

return Mono.just(new McpSchema.ListToolsResult(tools, null));
};
}

private McpServerSession.RequestHandler<CallToolResult> toolsCallRequestHandler() {
private McpRequestHandler<CallToolResult> toolsCallRequestHandler() {
return (exchange, params) -> {
McpSchema.CallToolRequest callToolRequest = objectMapper.convertValue(params,
new TypeReference<McpSchema.CallToolRequest>() {
Expand Down Expand Up @@ -551,7 +586,7 @@ public Mono<Void> notifyResourcesUpdated(McpSchema.ResourcesUpdatedNotification
resourcesUpdatedNotification);
}

private McpServerSession.RequestHandler<McpSchema.ListResourcesResult> resourcesListRequestHandler() {
private McpRequestHandler<McpSchema.ListResourcesResult> resourcesListRequestHandler() {
return (exchange, params) -> {
var resourceList = this.resources.values()
.stream()
Expand All @@ -561,7 +596,7 @@ private McpServerSession.RequestHandler<McpSchema.ListResourcesResult> resources
};
}

private McpServerSession.RequestHandler<McpSchema.ListResourceTemplatesResult> resourceTemplateListRequestHandler() {
private McpRequestHandler<McpSchema.ListResourceTemplatesResult> resourceTemplateListRequestHandler() {
return (exchange, params) -> Mono
.just(new McpSchema.ListResourceTemplatesResult(this.getResourceTemplates(), null));

Expand All @@ -585,7 +620,7 @@ private List<McpSchema.ResourceTemplate> getResourceTemplates() {
return list;
}

private McpServerSession.RequestHandler<McpSchema.ReadResourceResult> resourcesReadRequestHandler() {
private McpRequestHandler<McpSchema.ReadResourceResult> resourcesReadRequestHandler() {
return (exchange, params) -> {
McpSchema.ReadResourceRequest resourceRequest = objectMapper.convertValue(params,
new TypeReference<McpSchema.ReadResourceRequest>() {
Expand Down Expand Up @@ -678,7 +713,7 @@ public Mono<Void> notifyPromptsListChanged() {
return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_PROMPTS_LIST_CHANGED, null);
}

private McpServerSession.RequestHandler<McpSchema.ListPromptsResult> promptsListRequestHandler() {
private McpRequestHandler<McpSchema.ListPromptsResult> promptsListRequestHandler() {
return (exchange, params) -> {
// TODO: Implement pagination
// McpSchema.PaginatedRequest request = objectMapper.convertValue(params,
Expand All @@ -694,7 +729,7 @@ private McpServerSession.RequestHandler<McpSchema.ListPromptsResult> promptsList
};
}

private McpServerSession.RequestHandler<McpSchema.GetPromptResult> promptsGetRequestHandler() {
private McpRequestHandler<McpSchema.GetPromptResult> promptsGetRequestHandler() {
return (exchange, params) -> {
McpSchema.GetPromptRequest promptRequest = objectMapper.convertValue(params,
new TypeReference<McpSchema.GetPromptRequest>() {
Expand Down Expand Up @@ -740,7 +775,7 @@ public Mono<Void> loggingNotification(LoggingMessageNotification loggingMessageN
loggingMessageNotification);
}

private McpServerSession.RequestHandler<Object> setLoggerRequestHandler() {
private McpRequestHandler<Object> setLoggerRequestHandler() {
return (exchange, params) -> {
return Mono.defer(() -> {

Expand All @@ -759,7 +794,7 @@ private McpServerSession.RequestHandler<Object> setLoggerRequestHandler() {
};
}

private McpServerSession.RequestHandler<McpSchema.CompleteResult> completionCompleteRequestHandler() {
private McpRequestHandler<McpSchema.CompleteResult> completionCompleteRequestHandler() {
return (exchange, params) -> {
McpSchema.CompleteRequest request = parseCompletionParams(params);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
import java.util.Collections;

import com.fasterxml.jackson.core.type.TypeReference;
import io.modelcontextprotocol.spec.DefaultMcpTransportContext;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpSchema.LoggingLevel;
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
import io.modelcontextprotocol.spec.McpServerSession;
import io.modelcontextprotocol.spec.McpSession;
import io.modelcontextprotocol.spec.McpTransportContext;
import io.modelcontextprotocol.util.Assert;
import reactor.core.publisher.Mono;

Expand All @@ -25,12 +28,16 @@
*/
public class McpAsyncServerExchange {

private final McpServerSession session;
private final String sessionId;

private final McpSession session;

private final McpSchema.ClientCapabilities clientCapabilities;

private final McpSchema.Implementation clientInfo;

private final McpTransportContext transportContext;

private volatile LoggingLevel minLoggingLevel = LoggingLevel.INFO;

private static final TypeReference<McpSchema.CreateMessageResult> CREATE_MESSAGE_RESULT_TYPE_REF = new TypeReference<>() {
Expand All @@ -51,12 +58,35 @@ public class McpAsyncServerExchange {
* @param clientCapabilities The client capabilities that define the supported
* features and functionality.
* @param clientInfo The client implementation information.
* @deprecated Use
* {@link #McpAsyncServerExchange(String, McpSession, McpSchema.ClientCapabilities, McpSchema.Implementation, McpTransportContext)}
*/
public McpAsyncServerExchange(McpServerSession session, McpSchema.ClientCapabilities clientCapabilities,
@Deprecated
public McpAsyncServerExchange(McpSession session, McpSchema.ClientCapabilities clientCapabilities,
McpSchema.Implementation clientInfo) {
this.sessionId = null;
this.session = session;
this.clientCapabilities = clientCapabilities;
this.clientInfo = clientInfo;
this.transportContext = McpTransportContext.EMPTY;
}

/**
* Create a new asynchronous exchange with the client.
* @param session The server session representing a 1-1 interaction.
* @param clientCapabilities The client capabilities that define the supported
* features and functionality.
* @param transportContext context associated with the client as extracted from the
* transport
* @param clientInfo The client implementation information.
*/
public McpAsyncServerExchange(String sessionId, McpSession session, McpSchema.ClientCapabilities clientCapabilities,
McpSchema.Implementation clientInfo, McpTransportContext transportContext) {
this.sessionId = sessionId;
this.session = session;
this.clientCapabilities = clientCapabilities;
this.clientInfo = clientInfo;
this.transportContext = transportContext;
}

/**
Expand All @@ -75,6 +105,14 @@ public McpSchema.Implementation getClientInfo() {
return this.clientInfo;
}

public McpTransportContext transportContext() {
return this.transportContext;
}

public String sessionId() {
return this.sessionId;
}

/**
* Create a new message using the sampling capabilities of the client. The Model
* Context Protocol (MCP) provides a standardized way for servers to request LLM
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.modelcontextprotocol.server;

import io.modelcontextprotocol.spec.McpSchema;
import reactor.core.publisher.Mono;

/**
* Request handler for the initialization request.
*/
public interface McpInitRequestHandler {

/**
* Handles the initialization request.
* @param initializeRequest the initialization request by the client
* @return a Mono that will emit the result of the initialization
*/
Mono<McpSchema.InitializeResult> handle(McpSchema.InitializeRequest initializeRequest);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.modelcontextprotocol.server;

import reactor.core.publisher.Mono;

/**
* A handler for client-initiated notifications.
*/
public interface McpNotificationHandler {

/**
* Handles a notification from the client.
* @param exchange the exchange associated with the client that allows calling back to
* the connected client or inspecting its capabilities.
* @param params the parameters of the notification.
* @return a Mono that completes once the notification is handled.
*/
Mono<Void> handle(McpAsyncServerExchange exchange, Object params);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.modelcontextprotocol.server;

import reactor.core.publisher.Mono;

/**
* A handler for client-initiated requests.
*
* @param <T> the type of the response that is expected as a result of handling the
* request.
*/
public interface McpRequestHandler<T> {

/**
* Handles a request from the client.
* @param exchange the exchange associated with the client that allows calling back to
* the connected client or inspecting its capabilities.
* @param params the parameters of the request.
* @return a Mono that will emit the response to the request.
*/
Mono<T> handle(McpAsyncServerExchange exchange, Object params);

}
Loading