Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.spec.json.TypeRef;
import io.modelcontextprotocol.spec.json.McpJsonMapper;
import io.modelcontextprotocol.spec.json.jackson.JacksonMcpJsonMapper;

import io.modelcontextprotocol.spec.DefaultMcpTransportSession;
import io.modelcontextprotocol.spec.DefaultMcpTransportStream;
Expand Down Expand Up @@ -88,7 +89,7 @@ public class WebClientStreamableHttpTransport implements McpClientTransport {
private static final ParameterizedTypeReference<ServerSentEvent<String>> PARAMETERIZED_TYPE_REF = new ParameterizedTypeReference<>() {
};

private final ObjectMapper objectMapper;
private final McpJsonMapper jsonMapper;

private final WebClient webClient;

Expand All @@ -104,9 +105,9 @@ public class WebClientStreamableHttpTransport implements McpClientTransport {

private final AtomicReference<Consumer<Throwable>> exceptionHandler = new AtomicReference<>();

private WebClientStreamableHttpTransport(ObjectMapper objectMapper, WebClient.Builder webClientBuilder,
private WebClientStreamableHttpTransport(McpJsonMapper jsonMapper, WebClient.Builder webClientBuilder,
String endpoint, boolean resumableStreams, boolean openConnectionOnStartup) {
this.objectMapper = objectMapper;
this.jsonMapper = jsonMapper;
this.webClient = webClientBuilder.build();
this.endpoint = endpoint;
this.resumableStreams = resumableStreams;
Expand Down Expand Up @@ -366,8 +367,7 @@ private Flux<McpSchema.JSONRPCMessage> extractError(ClientResponse response, Str
McpSchema.JSONRPCResponse.JSONRPCError jsonRpcError = null;
Exception toPropagate;
try {
McpSchema.JSONRPCResponse jsonRpcResponse = objectMapper.readValue(body,
McpSchema.JSONRPCResponse.class);
McpSchema.JSONRPCResponse jsonRpcResponse = jsonMapper.readValue(body, McpSchema.JSONRPCResponse.class);
jsonRpcError = jsonRpcResponse.error();
toPropagate = jsonRpcError != null ? new McpError(jsonRpcError)
: new McpTransportException("Can't parse the jsonResponse " + jsonRpcResponse);
Expand Down Expand Up @@ -427,7 +427,7 @@ private Flux<McpSchema.JSONRPCMessage> directResponseFlux(McpSchema.JSONRPCMessa
s.complete();
}
else {
McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema.deserializeJsonRpcMessage(objectMapper,
McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema.deserializeJsonRpcMessage(jsonMapper,
responseMessage);
s.next(List.of(jsonRpcResponse));
}
Expand All @@ -447,16 +447,16 @@ private Flux<McpSchema.JSONRPCMessage> newEventStream(ClientResponse response, S
}

@Override
public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
return this.objectMapper.convertValue(data, typeRef);
public <T> T unmarshalFrom(Object data, TypeRef<T> typeRef) {
return this.jsonMapper.convertValue(data, typeRef);
}

private Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> parse(ServerSentEvent<String> event) {
if (MESSAGE_EVENT_TYPE.equals(event.event())) {
try {
// We don't support batching ATM and probably won't since the next version
// considers removing it.
McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, event.data());
McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.jsonMapper, event.data());
return Tuples.of(Optional.ofNullable(event.id()), List.of(message));
}
catch (IOException ioException) {
Expand All @@ -474,7 +474,7 @@ private Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> parse(Serve
*/
public static class Builder {

private ObjectMapper objectMapper;
private McpJsonMapper jsonMapper;

private WebClient.Builder webClientBuilder;

Expand All @@ -494,9 +494,20 @@ private Builder(WebClient.Builder webClientBuilder) {
* @param objectMapper instance to use
* @return the builder instance
*/
public Builder objectMapper(ObjectMapper objectMapper) {
public Builder objectMapper(com.fasterxml.jackson.databind.ObjectMapper objectMapper) {
Assert.notNull(objectMapper, "ObjectMapper must not be null");
this.objectMapper = objectMapper;
this.jsonMapper = new JacksonMcpJsonMapper(objectMapper);
return this;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.jsonMapper = new JacksonMcpJsonMapper(objectMapper);
return this;
return jsonMapper(new JacksonMcpJsonMapper(objectMapper));

}

/**
* Configure the {@link McpJsonMapper} to use.
* @param jsonMapper instance to use
* @return the builder instance
*/
public Builder jsonMapper(McpJsonMapper jsonMapper) {
Assert.notNull(jsonMapper, "JsonMapper must not be null");
this.jsonMapper = jsonMapper;
return this;
}

Expand Down Expand Up @@ -555,9 +566,10 @@ public Builder openConnectionOnStartup(boolean openConnectionOnStartup) {
* @return a new instance of {@link WebClientStreamableHttpTransport}
*/
public WebClientStreamableHttpTransport build() {
ObjectMapper objectMapper = this.objectMapper != null ? this.objectMapper : new ObjectMapper();
McpJsonMapper jsonMapper = this.jsonMapper != null ? this.jsonMapper
: new JacksonMcpJsonMapper(new com.fasterxml.jackson.databind.ObjectMapper());

return new WebClientStreamableHttpTransport(objectMapper, this.webClientBuilder, endpoint, resumableStreams,
return new WebClientStreamableHttpTransport(jsonMapper, this.webClientBuilder, endpoint, resumableStreams,
openConnectionOnStartup);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
import java.util.function.BiConsumer;
import java.util.function.Function;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.spec.json.McpJsonMapper;
import io.modelcontextprotocol.spec.json.TypeRef;
import io.modelcontextprotocol.spec.json.jackson.JacksonMcpJsonMapper;

import io.modelcontextprotocol.spec.HttpHeaders;
import io.modelcontextprotocol.spec.McpClientTransport;
Expand Down Expand Up @@ -100,10 +101,10 @@ public class WebFluxSseClientTransport implements McpClientTransport {
private final WebClient webClient;

/**
* ObjectMapper for serializing outbound messages and deserializing inbound messages.
* JSON mapper for serializing outbound messages and deserializing inbound messages.
* Handles conversion between JSON-RPC messages and their string representation.
*/
protected ObjectMapper objectMapper;
protected McpJsonMapper jsonMapper;

/**
* Subscription for the SSE connection handling inbound messages. Used for cleanup
Expand Down Expand Up @@ -137,37 +138,36 @@ public class WebFluxSseClientTransport implements McpClientTransport {
* @throws IllegalArgumentException if webClientBuilder is null
*/
public WebFluxSseClientTransport(WebClient.Builder webClientBuilder) {
this(webClientBuilder, new ObjectMapper());
this(webClientBuilder, new JacksonMcpJsonMapper(new com.fasterxml.jackson.databind.ObjectMapper()));
}

/**
* Constructs a new SseClientTransport with the specified WebClient builder and
* ObjectMapper. Initializes both inbound and outbound message processing pipelines.
* @param webClientBuilder the WebClient.Builder to use for creating the WebClient
* instance
* @param objectMapper the ObjectMapper to use for JSON processing
* @param jsonMapper the ObjectMapper to use for JSON processing
* @throws IllegalArgumentException if either parameter is null
*/
public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMapper objectMapper) {
this(webClientBuilder, objectMapper, DEFAULT_SSE_ENDPOINT);
public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, McpJsonMapper jsonMapper) {
this(webClientBuilder, jsonMapper, DEFAULT_SSE_ENDPOINT);
}

/**
* Constructs a new SseClientTransport with the specified WebClient builder and
* ObjectMapper. Initializes both inbound and outbound message processing pipelines.
* @param webClientBuilder the WebClient.Builder to use for creating the WebClient
* instance
* @param objectMapper the ObjectMapper to use for JSON processing
* @param jsonMapper the ObjectMapper to use for JSON processing
* @param sseEndpoint the SSE endpoint URI to use for establishing the connection
* @throws IllegalArgumentException if either parameter is null
*/
public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMapper objectMapper,
String sseEndpoint) {
Assert.notNull(objectMapper, "ObjectMapper must not be null");
public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, McpJsonMapper jsonMapper, String sseEndpoint) {
Assert.notNull(jsonMapper, "jsonMapper must not be null");
Assert.notNull(webClientBuilder, "WebClient.Builder must not be null");
Assert.hasText(sseEndpoint, "SSE endpoint must not be null or empty");

this.objectMapper = objectMapper;
this.jsonMapper = jsonMapper;
this.webClient = webClientBuilder.build();
this.sseEndpoint = sseEndpoint;
}
Expand Down Expand Up @@ -217,7 +217,7 @@ public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> h
}
else if (MESSAGE_EVENT_TYPE.equals(event.event())) {
try {
JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, event.data());
JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.jsonMapper, event.data());
s.next(message);
}
catch (IOException ioException) {
Expand Down Expand Up @@ -255,7 +255,7 @@ public Mono<Void> sendMessage(JSONRPCMessage message) {
return Mono.empty();
}
try {
String jsonText = this.objectMapper.writeValueAsString(message);
String jsonText = this.jsonMapper.writeValueAsString(message);
return webClient.post()
.uri(messageEndpointUri)
.contentType(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -349,13 +349,13 @@ public Mono<Void> closeGracefully() { // @formatter:off
* type conversion capabilities to handle complex object structures.
* @param <T> the target type to convert the data into
* @param data the source object to convert
* @param typeRef the TypeReference describing the target type
* @param typeRef the TypeRef describing the target type
* @return the unmarshalled object of type T
* @throws IllegalArgumentException if the conversion cannot be performed
*/
@Override
public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
return this.objectMapper.convertValue(data, typeRef);
public <T> T unmarshalFrom(Object data, TypeRef<T> typeRef) {
return this.jsonMapper.convertValue(data, typeRef);
}

/**
Expand All @@ -377,7 +377,7 @@ public static class Builder {

private String sseEndpoint = DEFAULT_SSE_ENDPOINT;

private ObjectMapper objectMapper = new ObjectMapper();
private McpJsonMapper jsonMapper = new JacksonMcpJsonMapper(new com.fasterxml.jackson.databind.ObjectMapper());

/**
* Creates a new builder with the specified WebClient.Builder.
Expand All @@ -401,12 +401,25 @@ public Builder sseEndpoint(String sseEndpoint) {

/**
* Sets the object mapper for JSON serialization/deserialization.
* @param objectMapper the object mapper
* @param objectMapper the Jackson ObjectMapper
* @return this builder
* @deprecated Use {@link #jsonMapper(McpJsonMapper)} instead
*/
public Builder objectMapper(ObjectMapper objectMapper) {
@Deprecated(forRemoval = true)
public Builder objectMapper(com.fasterxml.jackson.databind.ObjectMapper objectMapper) {
Assert.notNull(objectMapper, "objectMapper must not be null");
this.objectMapper = objectMapper;
this.jsonMapper = new JacksonMcpJsonMapper(objectMapper);
return this;
}

/**
* Sets the JSON mapper for serialization/deserialization.
* @param jsonMapper the JsonMapper to use
* @return this builder
*/
public Builder jsonMapper(McpJsonMapper jsonMapper) {
Assert.notNull(jsonMapper, "jsonMapper must not be null");
this.jsonMapper = jsonMapper;
return this;
}

Expand All @@ -415,7 +428,7 @@ public Builder objectMapper(ObjectMapper objectMapper) {
* @return a new transport instance
*/
public WebFluxSseClientTransport build() {
return new WebFluxSseClientTransport(webClientBuilder, objectMapper, sseEndpoint);
return new WebFluxSseClientTransport(webClientBuilder, jsonMapper, sseEndpoint);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.spec.json.TypeRef;
import io.modelcontextprotocol.spec.json.jackson.JacksonMcpJsonMapper;

import io.modelcontextprotocol.common.McpTransportContext;
import io.modelcontextprotocol.server.McpTransportContextExtractor;
Expand Down Expand Up @@ -452,8 +454,8 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
}

@Override
public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
return objectMapper.convertValue(data, typeRef);
public <T> T unmarshalFrom(Object data, TypeRef<T> typeRef) {
return new JacksonMcpJsonMapper(objectMapper).convertValue(data, typeRef);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package io.modelcontextprotocol.server.transport;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.spec.json.McpJsonMapper;
import io.modelcontextprotocol.spec.json.jackson.JacksonMcpJsonMapper;
import io.modelcontextprotocol.common.McpTransportContext;
import io.modelcontextprotocol.server.McpStatelessServerHandler;
import io.modelcontextprotocol.server.McpTransportContextExtractor;
Expand Down Expand Up @@ -34,7 +36,7 @@ public class WebFluxStatelessServerTransport implements McpStatelessServerTransp

private static final Logger logger = LoggerFactory.getLogger(WebFluxStatelessServerTransport.class);

private final ObjectMapper objectMapper;
private final McpJsonMapper jsonMapper;

private final String mcpEndpoint;

Expand All @@ -46,13 +48,13 @@ public class WebFluxStatelessServerTransport implements McpStatelessServerTransp

private volatile boolean isClosing = false;

private WebFluxStatelessServerTransport(ObjectMapper objectMapper, String mcpEndpoint,
private WebFluxStatelessServerTransport(McpJsonMapper jsonMapper, String mcpEndpoint,
McpTransportContextExtractor<ServerRequest> contextExtractor) {
Assert.notNull(objectMapper, "objectMapper must not be null");
Assert.notNull(jsonMapper, "jsonMapper must not be null");
Assert.notNull(mcpEndpoint, "mcpEndpoint must not be null");
Assert.notNull(contextExtractor, "contextExtractor must not be null");

this.objectMapper = objectMapper;
this.jsonMapper = jsonMapper;
this.mcpEndpoint = mcpEndpoint;
this.contextExtractor = contextExtractor;
this.routerFunction = RouterFunctions.route()
Expand Down Expand Up @@ -106,13 +108,20 @@ private Mono<ServerResponse> handlePost(ServerRequest request) {

return request.bodyToMono(String.class).<ServerResponse>flatMap(body -> {
try {
McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body);
McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(jsonMapper, body);

if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) {
return this.mcpHandler.handleRequest(transportContext, jsonrpcRequest)
.flatMap(jsonrpcResponse -> ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(jsonrpcResponse));
return this.mcpHandler.handleRequest(transportContext, jsonrpcRequest).flatMap(jsonrpcResponse -> {
try {
String json = jsonMapper.writeValueAsString(jsonrpcResponse);
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyValue(json);
}
catch (IOException e) {
logger.error("Failed to serialize response: {}", e.getMessage());
return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR)
.bodyValue(new McpError("Failed to serialize response"));
}
});
}
else if (message instanceof McpSchema.JSONRPCNotification jsonrpcNotification) {
return this.mcpHandler.handleNotification(transportContext, jsonrpcNotification)
Expand Down Expand Up @@ -146,7 +155,7 @@ public static Builder builder() {
*/
public static class Builder {

private ObjectMapper objectMapper;
private McpJsonMapper jsonMapper;

private String mcpEndpoint = "/mcp";

Expand All @@ -166,7 +175,20 @@ private Builder() {
*/
public Builder objectMapper(ObjectMapper objectMapper) {
Assert.notNull(objectMapper, "ObjectMapper must not be null");
this.objectMapper = objectMapper;
this.jsonMapper = new JacksonMcpJsonMapper(objectMapper);
return this;
}

/**
* Sets the JsonMapper to use for JSON serialization/deserialization of MCP
* messages.
* @param jsonMapper The JsonMapper instance. Must not be null.
* @return this builder instance
* @throws IllegalArgumentException if jsonMapper is null
*/
public Builder jsonMapper(McpJsonMapper jsonMapper) {
Assert.notNull(jsonMapper, "JsonMapper must not be null");
this.jsonMapper = jsonMapper;
return this;
}

Expand Down Expand Up @@ -205,10 +227,12 @@ public Builder contextExtractor(McpTransportContextExtractor<ServerRequest> cont
* @throws IllegalStateException if required parameters are not set
*/
public WebFluxStatelessServerTransport build() {
Assert.notNull(objectMapper, "ObjectMapper must be set");
if (this.jsonMapper == null) {
throw new IllegalStateException("JsonMapper must be set");
}
Assert.notNull(mcpEndpoint, "Message endpoint must be set");

return new WebFluxStatelessServerTransport(objectMapper, mcpEndpoint, contextExtractor);
return new WebFluxStatelessServerTransport(jsonMapper, mcpEndpoint, contextExtractor);
}

}
Expand Down
Loading