Skip to content
This repository was archived by the owner on Feb 14, 2025. It is now read-only.

Commit edbd784

Browse files
committed
refactor(transport): Replace SseEmitter with queue-based SSE implementation in WebMvc transport
Replaces Spring's SseEmitter with a custom BlockingQueue-based SSE implementation in WebMvcSseServerTransport for improved event delivery control and connection management. The new implementation: - Uses BlockingQueue for reliable event queuing and delivery - Adds proper session management with dedicated SSEEvent record - Improves error handling and timeout management - Adds comprehensive integration tests Resolves #57
1 parent 5058b08 commit edbd784

File tree

13 files changed

+726
-104
lines changed

13 files changed

+726
-104
lines changed

mcp-transport/mcp-webflux-sse-transport/pom.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,14 @@
108108
<scope>test</scope>
109109
</dependency>
110110

111+
<dependency>
112+
<groupId>org.junit.jupiter</groupId>
113+
<artifactId>junit-jupiter-params</artifactId>
114+
<version>${junit-jupiter.version}</version>
115+
<scope>test</scope>
116+
</dependency>
117+
111118
</dependencies>
112119

113120

114-
</project>
121+
</project>
Lines changed: 58 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,29 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package org.springframework.ai.mcp.server;
16+
package org.springframework.ai.mcp;
1717

1818
import java.time.Duration;
1919
import java.util.List;
2020
import java.util.Map;
21+
import java.util.concurrent.ConcurrentHashMap;
2122
import java.util.concurrent.atomic.AtomicReference;
2223
import java.util.function.Function;
2324

2425
import com.fasterxml.jackson.databind.ObjectMapper;
2526
import org.junit.jupiter.api.AfterEach;
2627
import org.junit.jupiter.api.BeforeEach;
2728
import org.junit.jupiter.api.Test;
29+
import org.junit.jupiter.params.ParameterizedTest;
30+
import org.junit.jupiter.params.provider.ValueSource;
2831
import reactor.netty.DisposableServer;
2932
import reactor.netty.http.server.HttpServer;
3033
import reactor.test.StepVerifier;
3134

3235
import org.springframework.ai.mcp.client.McpClient;
36+
import org.springframework.ai.mcp.client.transport.HttpClientSseClientTransport;
3337
import org.springframework.ai.mcp.client.transport.WebFluxSseClientTransport;
38+
import org.springframework.ai.mcp.server.McpServer;
3439
import org.springframework.ai.mcp.server.McpServer.ToolRegistration;
3540
import org.springframework.ai.mcp.server.transport.WebFluxSseServerTransport;
3641
import org.springframework.ai.mcp.spec.McpError;
@@ -54,7 +59,7 @@
5459
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5560
import static org.awaitility.Awaitility.await;
5661

57-
public class WebFluxSseAsyncIntegrationTests {
62+
public class WebFluxSseIntegrationTests {
5863

5964
private static final int PORT = 8182;
6065

@@ -64,18 +69,21 @@ public class WebFluxSseAsyncIntegrationTests {
6469

6570
private WebFluxSseServerTransport mcpServerTransport;
6671

67-
McpClient.Builder clientBuilder;
72+
ConcurrentHashMap<String, McpClient.Builder> clientBulders = new ConcurrentHashMap<>();
6873

6974
@BeforeEach
7075
public void before() {
76+
7177
this.mcpServerTransport = new WebFluxSseServerTransport(new ObjectMapper(), MESSAGE_ENDPOINT);
7278

7379
HttpHandler httpHandler = RouterFunctions.toHttpHandler(mcpServerTransport.getRouterFunction());
7480
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
7581
this.httpServer = HttpServer.create().port(PORT).handle(adapter).bindNow();
7682

77-
this.clientBuilder = McpClient
78-
.using(new WebFluxSseClientTransport(WebClient.builder().baseUrl("http://localhost:" + PORT)));
83+
clientBulders.put("httpclient", McpClient.using(new HttpClientSseClientTransport("http://localhost:" + PORT)));
84+
clientBulders.put("webflux", McpClient
85+
.using(new WebFluxSseClientTransport(WebClient.builder().baseUrl("http://localhost:" + PORT))));
86+
7987
}
8088

8189
@AfterEach
@@ -105,11 +113,14 @@ void testCreateMessageWithoutInitialization() {
105113
});
106114
}
107115

108-
@Test
109-
void testCreateMessageWithoutSamplingCapabilities() {
116+
@ParameterizedTest(name = "{0} : {displayName} ")
117+
@ValueSource(strings = { "httpclient", "webflux" })
118+
void testCreateMessageWithoutSamplingCapabilities(String clientType) {
110119

111120
var mcpAsyncServer = McpServer.using(mcpServerTransport).serverInfo("test-server", "1.0.0").async();
112121

122+
var clientBuilder = clientBulders.get(clientType);
123+
113124
var client = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0")).sync();
114125

115126
InitializeResult initResult = client.initialize();
@@ -128,8 +139,11 @@ void testCreateMessageWithoutSamplingCapabilities() {
128139
});
129140
}
130141

131-
@Test
132-
void testCreateMessageSuccess() throws InterruptedException {
142+
@ParameterizedTest(name = "{0} : {displayName} ")
143+
@ValueSource(strings = { "httpclient", "webflux" })
144+
void testCreateMessageSuccess(String clientType) throws InterruptedException {
145+
146+
var clientBuilder = clientBulders.get(clientType);
133147

134148
var mcpAsyncServer = McpServer.using(mcpServerTransport).serverInfo("test-server", "1.0.0").async();
135149

@@ -169,8 +183,11 @@ void testCreateMessageSuccess() throws InterruptedException {
169183
// ---------------------------------------
170184
// Roots Tests
171185
// ---------------------------------------
172-
@Test
173-
void testRootsSuccess() {
186+
@ParameterizedTest(name = "{0} : {displayName} ")
187+
@ValueSource(strings = { "httpclient", "webflux" })
188+
void testRootsSuccess(String clientType) {
189+
var clientBuilder = clientBulders.get(clientType);
190+
174191
List<Root> roots = List.of(new Root("uri1://", "root1"), new Root("uri2://", "root2"));
175192

176193
AtomicReference<List<Root>> rootsRef = new AtomicReference<>();
@@ -214,8 +231,11 @@ void testRootsSuccess() {
214231
mcpServer.close();
215232
}
216233

217-
@Test
218-
void testRootsWithoutCapability() {
234+
@ParameterizedTest(name = "{0} : {displayName} ")
235+
@ValueSource(strings = { "httpclient", "webflux" })
236+
void testRootsWithoutCapability(String clientType) {
237+
var clientBuilder = clientBulders.get(clientType);
238+
219239
var mcpServer = McpServer.using(mcpServerTransport).rootsChangeConsumer(rootsUpdate -> {
220240
}).sync();
221241

@@ -236,8 +256,11 @@ void testRootsWithoutCapability() {
236256
mcpServer.close();
237257
}
238258

239-
@Test
240-
void testRootsWithEmptyRootsList() {
259+
@ParameterizedTest(name = "{0} : {displayName} ")
260+
@ValueSource(strings = { "httpclient", "webflux" })
261+
void testRootsWithEmptyRootsList(String clientType) {
262+
var clientBuilder = clientBulders.get(clientType);
263+
241264
AtomicReference<List<Root>> rootsRef = new AtomicReference<>();
242265
var mcpServer = McpServer.using(mcpServerTransport)
243266
.rootsChangeConsumer(rootsUpdate -> rootsRef.set(rootsUpdate))
@@ -260,8 +283,11 @@ void testRootsWithEmptyRootsList() {
260283
mcpServer.close();
261284
}
262285

263-
@Test
264-
void testRootsWithMultipleConsumers() {
286+
@ParameterizedTest(name = "{0} : {displayName} ")
287+
@ValueSource(strings = { "httpclient", "webflux" })
288+
void testRootsWithMultipleConsumers(String clientType) {
289+
var clientBuilder = clientBulders.get(clientType);
290+
265291
List<Root> roots = List.of(new Root("uri1://", "root1"));
266292

267293
AtomicReference<List<Root>> rootsRef1 = new AtomicReference<>();
@@ -290,8 +316,12 @@ void testRootsWithMultipleConsumers() {
290316
mcpServer.close();
291317
}
292318

293-
@Test
294-
void testRootsServerCloseWithActiveSubscription() {
319+
@ParameterizedTest(name = "{0} : {displayName} ")
320+
@ValueSource(strings = { "httpclient", "webflux" })
321+
void testRootsServerCloseWithActiveSubscription(String clientType) {
322+
323+
var clientBuilder = clientBulders.get(clientType);
324+
295325
List<Root> roots = List.of(new Root("uri1://", "root1"));
296326

297327
AtomicReference<List<Root>> rootsRef = new AtomicReference<>();
@@ -331,9 +361,11 @@ void testRootsServerCloseWithActiveSubscription() {
331361
}
332362
""";
333363

334-
@Test
364+
@ParameterizedTest(name = "{0} : {displayName} ")
365+
@ValueSource(strings = { "httpclient", "webflux" })
366+
void testToolCallSuccess(String clientType) {
335367

336-
void testToolCallSuccess() {
368+
var clientBuilder = clientBulders.get(clientType);
337369

338370
var callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")), null);
339371
ToolRegistration tool1 = new ToolRegistration(new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema),
@@ -369,8 +401,11 @@ void testToolCallSuccess() {
369401
mcpServer.close();
370402
}
371403

372-
@Test
373-
void testToolListChangeHandlingSuccess() {
404+
@ParameterizedTest(name = "{0} : {displayName} ")
405+
@ValueSource(strings = { "httpclient", "webflux" })
406+
void testToolListChangeHandlingSuccess(String clientType) {
407+
408+
var clientBuilder = clientBulders.get(clientType);
374409

375410
var callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")), null);
376411
ToolRegistration tool1 = new ToolRegistration(new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema),

0 commit comments

Comments
 (0)