Skip to content

Commit f50af4b

Browse files
committed
refactor: Add StdioServerTransportProvider with reactive streams
Add StdioServerTransportProvider implementation that uses reactive streams for both inbound and outbound message processing. - Using Flux for asynchronous message handling - Implementing separate inbound and outbound processing pipelines - Improving error handling with proper propagation Signed-off-by: Christian Tzolov <[email protected]>
1 parent e58fcf4 commit f50af4b

24 files changed

+2784
-348
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2024-2024 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.server;
6+
7+
import com.fasterxml.jackson.databind.ObjectMapper;
8+
import io.modelcontextprotocol.server.transport.WebFluxSseServerTransport;
9+
import io.modelcontextprotocol.spec.ServerMcpTransport;
10+
import org.junit.jupiter.api.Timeout;
11+
import reactor.netty.DisposableServer;
12+
import reactor.netty.http.server.HttpServer;
13+
14+
import org.springframework.http.server.reactive.HttpHandler;
15+
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
16+
import org.springframework.web.reactive.function.server.RouterFunctions;
17+
18+
/**
19+
* Tests for {@link McpAsyncServer} using {@link WebFluxSseServerTransport}.
20+
*
21+
* @author Christian Tzolov
22+
*/
23+
@Deprecated
24+
@Timeout(15) // Giving extra time beyond the client timeout
25+
class WebFluxSseMcpAsyncServerDeprecatedTests extends AbstractMcpAsyncServerDeprecatedTests {
26+
27+
private static final int PORT = 8181;
28+
29+
private static final String MESSAGE_ENDPOINT = "/mcp/message";
30+
31+
private DisposableServer httpServer;
32+
33+
@Override
34+
protected ServerMcpTransport createMcpTransport() {
35+
var transport = new WebFluxSseServerTransport(new ObjectMapper(), MESSAGE_ENDPOINT);
36+
37+
HttpHandler httpHandler = RouterFunctions.toHttpHandler(transport.getRouterFunction());
38+
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
39+
httpServer = HttpServer.create().port(PORT).handle(adapter).bindNow();
40+
41+
return transport;
42+
}
43+
44+
@Override
45+
protected void onStart() {
46+
}
47+
48+
@Override
49+
protected void onClose() {
50+
if (httpServer != null) {
51+
httpServer.disposeNow();
52+
}
53+
}
54+
55+
}

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/server/WebFluxSseMcpAsyncServerTests.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
package io.modelcontextprotocol.server;
66

77
import com.fasterxml.jackson.databind.ObjectMapper;
8-
import io.modelcontextprotocol.server.transport.WebFluxSseServerTransport;
9-
import io.modelcontextprotocol.spec.ServerMcpTransport;
8+
import io.modelcontextprotocol.server.transport.WebFluxSseServerTransportProvider;
9+
import io.modelcontextprotocol.spec.McpServerTransportProvider;
1010
import org.junit.jupiter.api.Timeout;
1111
import reactor.netty.DisposableServer;
1212
import reactor.netty.http.server.HttpServer;
@@ -16,7 +16,7 @@
1616
import org.springframework.web.reactive.function.server.RouterFunctions;
1717

1818
/**
19-
* Tests for {@link McpAsyncServer} using {@link WebFluxSseServerTransport}.
19+
* Tests for {@link McpAsyncServer} using {@link WebFluxSseServerTransportProvider}.
2020
*
2121
* @author Christian Tzolov
2222
*/
@@ -30,14 +30,13 @@ class WebFluxSseMcpAsyncServerTests extends AbstractMcpAsyncServerTests {
3030
private DisposableServer httpServer;
3131

3232
@Override
33-
protected ServerMcpTransport createMcpTransport() {
34-
var transport = new WebFluxSseServerTransport(new ObjectMapper(), MESSAGE_ENDPOINT);
33+
protected McpServerTransportProvider createMcpTransportProvider() {
34+
var transportProvider = new WebFluxSseServerTransportProvider(new ObjectMapper(), MESSAGE_ENDPOINT);
3535

36-
HttpHandler httpHandler = RouterFunctions.toHttpHandler(transport.getRouterFunction());
36+
HttpHandler httpHandler = RouterFunctions.toHttpHandler(transportProvider.getRouterFunction());
3737
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
3838
httpServer = HttpServer.create().port(PORT).handle(adapter).bindNow();
39-
40-
return transport;
39+
return transportProvider;
4140
}
4241

4342
@Override
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2024-2024 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.server;
6+
7+
import com.fasterxml.jackson.databind.ObjectMapper;
8+
import io.modelcontextprotocol.server.transport.WebFluxSseServerTransport;
9+
import io.modelcontextprotocol.spec.ServerMcpTransport;
10+
import org.junit.jupiter.api.Timeout;
11+
import reactor.netty.DisposableServer;
12+
import reactor.netty.http.server.HttpServer;
13+
14+
import org.springframework.http.server.reactive.HttpHandler;
15+
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
16+
import org.springframework.web.reactive.function.server.RouterFunctions;
17+
18+
/**
19+
* Tests for {@link McpSyncServer} using {@link WebFluxSseServerTransport}.
20+
*
21+
* @author Christian Tzolov
22+
*/
23+
@Deprecated
24+
@Timeout(15) // Giving extra time beyond the client timeout
25+
class WebFluxSseMcpSyncServerDeprecatecTests extends AbstractMcpSyncServerDeprecatedTests {
26+
27+
private static final int PORT = 8182;
28+
29+
private static final String MESSAGE_ENDPOINT = "/mcp/message";
30+
31+
private DisposableServer httpServer;
32+
33+
private WebFluxSseServerTransport transport;
34+
35+
@Override
36+
protected ServerMcpTransport createMcpTransport() {
37+
transport = new WebFluxSseServerTransport(new ObjectMapper(), MESSAGE_ENDPOINT);
38+
return transport;
39+
}
40+
41+
@Override
42+
protected void onStart() {
43+
HttpHandler httpHandler = RouterFunctions.toHttpHandler(transport.getRouterFunction());
44+
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
45+
httpServer = HttpServer.create().port(PORT).handle(adapter).bindNow();
46+
}
47+
48+
@Override
49+
protected void onClose() {
50+
if (httpServer != null) {
51+
httpServer.disposeNow();
52+
}
53+
}
54+
55+
}

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/server/WebFluxSseMcpSyncServerTests.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
package io.modelcontextprotocol.server;
66

77
import com.fasterxml.jackson.databind.ObjectMapper;
8-
import io.modelcontextprotocol.server.transport.WebFluxSseServerTransport;
9-
import io.modelcontextprotocol.spec.ServerMcpTransport;
8+
import io.modelcontextprotocol.server.transport.WebFluxSseServerTransportProvider;
9+
import io.modelcontextprotocol.spec.McpServerTransportProvider;
1010
import org.junit.jupiter.api.Timeout;
1111
import reactor.netty.DisposableServer;
1212
import reactor.netty.http.server.HttpServer;
@@ -16,7 +16,7 @@
1616
import org.springframework.web.reactive.function.server.RouterFunctions;
1717

1818
/**
19-
* Tests for {@link McpSyncServer} using {@link WebFluxSseServerTransport}.
19+
* Tests for {@link McpSyncServer} using {@link WebFluxSseServerTransportProvider}.
2020
*
2121
* @author Christian Tzolov
2222
*/
@@ -29,17 +29,17 @@ class WebFluxSseMcpSyncServerTests extends AbstractMcpSyncServerTests {
2929

3030
private DisposableServer httpServer;
3131

32-
private WebFluxSseServerTransport transport;
32+
private WebFluxSseServerTransportProvider transportProvider;
3333

3434
@Override
35-
protected ServerMcpTransport createMcpTransport() {
36-
transport = new WebFluxSseServerTransport(new ObjectMapper(), MESSAGE_ENDPOINT);
37-
return transport;
35+
protected McpServerTransportProvider createMcpTransportProvider() {
36+
transportProvider = new WebFluxSseServerTransportProvider(new ObjectMapper(), MESSAGE_ENDPOINT);
37+
return transportProvider;
3838
}
3939

4040
@Override
4141
protected void onStart() {
42-
HttpHandler httpHandler = RouterFunctions.toHttpHandler(transport.getRouterFunction());
42+
HttpHandler httpHandler = RouterFunctions.toHttpHandler(transportProvider.getRouterFunction());
4343
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
4444
httpServer = HttpServer.create().port(PORT).handle(adapter).bindNow();
4545
}

mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcSseAsyncServerTransportTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.springframework.web.servlet.function.ServerResponse;
2222

2323
@Timeout(15)
24-
class WebMvcSseAsyncServerTransportTests extends AbstractMcpAsyncServerTests {
24+
class WebMvcSseAsyncServerTransportTests extends AbstractMcpAsyncServerDeprecatedTests {
2525

2626
private static final String MESSAGE_ENDPOINT = "/mcp/message";
2727

mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcSseSyncServerTransportTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.springframework.web.servlet.function.ServerResponse;
2222

2323
@Timeout(15)
24-
class WebMvcSseSyncServerTransportTests extends AbstractMcpSyncServerTests {
24+
class WebMvcSseSyncServerTransportTests extends AbstractMcpSyncServerDeprecatedTests {
2525

2626
private static final String MESSAGE_ENDPOINT = "/mcp/message";
2727

0 commit comments

Comments
 (0)