Skip to content

Commit f088e75

Browse files
committed
Merge remote-tracking branch 'upstream/main' into output-schema
2 parents aaf7add + 2f94434 commit f088e75

File tree

40 files changed

+2605
-271
lines changed

40 files changed

+2605
-271
lines changed

mcp-spring/mcp-spring-webflux/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@
9999
<version>${testcontainers.version}</version>
100100
<scope>test</scope>
101101
</dependency>
102+
<dependency>
103+
<groupId>org.testcontainers</groupId>
104+
<artifactId>toxiproxy</artifactId>
105+
<version>${toxiproxy.version}</version>
106+
<scope>test</scope>
107+
</dependency>
102108

103109
<dependency>
104110
<groupId>org.awaitility</groupId>

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java

Lines changed: 520 additions & 0 deletions
Large diffs are not rendered by default.

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,9 @@ public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMappe
190190
*/
191191
@Override
192192
public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
193+
// TODO: Avoid eager connection opening and enable resilience
194+
// -> upon disconnects, re-establish connection
195+
// -> allow optimizing for eager connection start using a constructor flag
193196
Flux<ServerSentEvent<String>> events = eventStream();
194197
this.inboundSubscription = events.concatMap(event -> Mono.just(event).<JSONRPCMessage>handle((e, s) -> {
195198
if (ENDPOINT_EVENT_TYPE.equals(event.event())) {

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxSseIntegrationTests.java

Lines changed: 222 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package io.modelcontextprotocol;
55

66
import java.time.Duration;
7-
import java.util.ArrayList;
87
import java.util.List;
98
import java.util.Map;
109
import java.util.concurrent.ConcurrentHashMap;
@@ -28,11 +27,11 @@
2827
import io.modelcontextprotocol.spec.McpError;
2928
import io.modelcontextprotocol.spec.McpSchema;
3029
import io.modelcontextprotocol.spec.McpSchema.*;
31-
import io.modelcontextprotocol.spec.McpSchema.ServerCapabilities.CompletionCapabilities;
3230
import org.junit.jupiter.api.AfterEach;
3331
import org.junit.jupiter.api.BeforeEach;
3432
import org.junit.jupiter.params.ParameterizedTest;
3533
import org.junit.jupiter.params.provider.ValueSource;
34+
import reactor.core.publisher.Mono;
3635
import reactor.netty.DisposableServer;
3736
import reactor.netty.http.server.HttpServer;
3837

@@ -41,6 +40,7 @@
4140
import org.springframework.web.client.RestClient;
4241
import org.springframework.web.reactive.function.client.WebClient;
4342
import org.springframework.web.reactive.function.server.RouterFunctions;
43+
import reactor.test.StepVerifier;
4444

4545
import static org.assertj.core.api.Assertions.assertThat;
4646
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@@ -331,6 +331,226 @@ void testCreateMessageWithRequestTimeoutFail(String clientType) throws Interrupt
331331
mcpServer.closeGracefully().block();
332332
}
333333

334+
// ---------------------------------------
335+
// Elicitation Tests
336+
// ---------------------------------------
337+
@ParameterizedTest(name = "{0} : {displayName} ")
338+
@ValueSource(strings = { "httpclient", "webflux" })
339+
void testCreateElicitationWithoutElicitationCapabilities(String clientType) {
340+
341+
var clientBuilder = clientBuilders.get(clientType);
342+
343+
McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification(
344+
new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
345+
346+
exchange.createElicitation(mock(ElicitRequest.class)).block();
347+
348+
return Mono.just(mock(CallToolResult.class));
349+
});
350+
351+
var server = McpServer.async(mcpServerTransportProvider).serverInfo("test-server", "1.0.0").tools(tool).build();
352+
353+
try (
354+
// Create client without elicitation capabilities
355+
var client = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0")).build()) {
356+
357+
assertThat(client.initialize()).isNotNull();
358+
359+
try {
360+
client.callTool(new McpSchema.CallToolRequest("tool1", Map.of()));
361+
}
362+
catch (McpError e) {
363+
assertThat(e).isInstanceOf(McpError.class)
364+
.hasMessage("Client must be configured with elicitation capabilities");
365+
}
366+
}
367+
server.closeGracefully().block();
368+
}
369+
370+
@ParameterizedTest(name = "{0} : {displayName} ")
371+
@ValueSource(strings = { "httpclient", "webflux" })
372+
void testCreateElicitationSuccess(String clientType) {
373+
374+
var clientBuilder = clientBuilders.get(clientType);
375+
376+
Function<ElicitRequest, ElicitResult> elicitationHandler = request -> {
377+
assertThat(request.message()).isNotEmpty();
378+
assertThat(request.requestedSchema()).isNotNull();
379+
380+
return new ElicitResult(ElicitResult.Action.ACCEPT, Map.of("message", request.message()));
381+
};
382+
383+
CallToolResult callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")),
384+
null);
385+
386+
McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification(
387+
new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
388+
389+
var elicitationRequest = ElicitRequest.builder()
390+
.message("Test message")
391+
.requestedSchema(
392+
Map.of("type", "object", "properties", Map.of("message", Map.of("type", "string"))))
393+
.build();
394+
395+
StepVerifier.create(exchange.createElicitation(elicitationRequest)).consumeNextWith(result -> {
396+
assertThat(result).isNotNull();
397+
assertThat(result.action()).isEqualTo(ElicitResult.Action.ACCEPT);
398+
assertThat(result.content().get("message")).isEqualTo("Test message");
399+
}).verifyComplete();
400+
401+
return Mono.just(callResponse);
402+
});
403+
404+
var mcpServer = McpServer.async(mcpServerTransportProvider)
405+
.serverInfo("test-server", "1.0.0")
406+
.tools(tool)
407+
.build();
408+
409+
try (var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0"))
410+
.capabilities(ClientCapabilities.builder().elicitation().build())
411+
.elicitation(elicitationHandler)
412+
.build()) {
413+
414+
InitializeResult initResult = mcpClient.initialize();
415+
assertThat(initResult).isNotNull();
416+
417+
CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of()));
418+
419+
assertThat(response).isNotNull();
420+
assertThat(response).isEqualTo(callResponse);
421+
}
422+
mcpServer.closeGracefully().block();
423+
}
424+
425+
@ParameterizedTest(name = "{0} : {displayName} ")
426+
@ValueSource(strings = { "httpclient", "webflux" })
427+
void testCreateElicitationWithRequestTimeoutSuccess(String clientType) {
428+
429+
// Client
430+
var clientBuilder = clientBuilders.get(clientType);
431+
432+
Function<ElicitRequest, ElicitResult> elicitationHandler = request -> {
433+
assertThat(request.message()).isNotEmpty();
434+
assertThat(request.requestedSchema()).isNotNull();
435+
try {
436+
TimeUnit.SECONDS.sleep(2);
437+
}
438+
catch (InterruptedException e) {
439+
throw new RuntimeException(e);
440+
}
441+
return new ElicitResult(ElicitResult.Action.ACCEPT, Map.of("message", request.message()));
442+
};
443+
444+
var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0"))
445+
.capabilities(ClientCapabilities.builder().elicitation().build())
446+
.elicitation(elicitationHandler)
447+
.build();
448+
449+
// Server
450+
451+
CallToolResult callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")),
452+
null);
453+
454+
McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification(
455+
new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
456+
457+
var elicitationRequest = ElicitRequest.builder()
458+
.message("Test message")
459+
.requestedSchema(
460+
Map.of("type", "object", "properties", Map.of("message", Map.of("type", "string"))))
461+
.build();
462+
463+
StepVerifier.create(exchange.createElicitation(elicitationRequest)).consumeNextWith(result -> {
464+
assertThat(result).isNotNull();
465+
assertThat(result.action()).isEqualTo(ElicitResult.Action.ACCEPT);
466+
assertThat(result.content().get("message")).isEqualTo("Test message");
467+
}).verifyComplete();
468+
469+
return Mono.just(callResponse);
470+
});
471+
472+
var mcpServer = McpServer.async(mcpServerTransportProvider)
473+
.serverInfo("test-server", "1.0.0")
474+
.requestTimeout(Duration.ofSeconds(3))
475+
.tools(tool)
476+
.build();
477+
478+
InitializeResult initResult = mcpClient.initialize();
479+
assertThat(initResult).isNotNull();
480+
481+
CallToolResult response = mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of()));
482+
483+
assertThat(response).isNotNull();
484+
assertThat(response).isEqualTo(callResponse);
485+
486+
mcpClient.closeGracefully();
487+
mcpServer.closeGracefully().block();
488+
}
489+
490+
@ParameterizedTest(name = "{0} : {displayName} ")
491+
@ValueSource(strings = { "httpclient", "webflux" })
492+
void testCreateElicitationWithRequestTimeoutFail(String clientType) {
493+
494+
// Client
495+
var clientBuilder = clientBuilders.get(clientType);
496+
497+
Function<ElicitRequest, ElicitResult> elicitationHandler = request -> {
498+
assertThat(request.message()).isNotEmpty();
499+
assertThat(request.requestedSchema()).isNotNull();
500+
try {
501+
TimeUnit.SECONDS.sleep(2);
502+
}
503+
catch (InterruptedException e) {
504+
throw new RuntimeException(e);
505+
}
506+
return new ElicitResult(ElicitResult.Action.ACCEPT, Map.of("message", request.message()));
507+
};
508+
509+
var mcpClient = clientBuilder.clientInfo(new McpSchema.Implementation("Sample client", "0.0.0"))
510+
.capabilities(ClientCapabilities.builder().elicitation().build())
511+
.elicitation(elicitationHandler)
512+
.build();
513+
514+
// Server
515+
516+
CallToolResult callResponse = new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("CALL RESPONSE")),
517+
null);
518+
519+
McpServerFeatures.AsyncToolSpecification tool = new McpServerFeatures.AsyncToolSpecification(
520+
new McpSchema.Tool("tool1", "tool1 description", emptyJsonSchema), (exchange, request) -> {
521+
522+
var elicitationRequest = ElicitRequest.builder()
523+
.message("Test message")
524+
.requestedSchema(
525+
Map.of("type", "object", "properties", Map.of("message", Map.of("type", "string"))))
526+
.build();
527+
528+
StepVerifier.create(exchange.createElicitation(elicitationRequest)).consumeNextWith(result -> {
529+
assertThat(result).isNotNull();
530+
assertThat(result.action()).isEqualTo(ElicitResult.Action.ACCEPT);
531+
assertThat(result.content().get("message")).isEqualTo("Test message");
532+
}).verifyComplete();
533+
534+
return Mono.just(callResponse);
535+
});
536+
537+
var mcpServer = McpServer.async(mcpServerTransportProvider)
538+
.serverInfo("test-server", "1.0.0")
539+
.requestTimeout(Duration.ofSeconds(1))
540+
.tools(tool)
541+
.build();
542+
543+
InitializeResult initResult = mcpClient.initialize();
544+
assertThat(initResult).isNotNull();
545+
546+
assertThatExceptionOfType(McpError.class).isThrownBy(() -> {
547+
mcpClient.callTool(new McpSchema.CallToolRequest("tool1", Map.of()));
548+
}).withMessageContaining("within 1000ms");
549+
550+
mcpClient.closeGracefully();
551+
mcpServer.closeGracefully().block();
552+
}
553+
334554
// ---------------------------------------
335555
// Roots Tests
336556
// ---------------------------------------
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.modelcontextprotocol.client;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import io.modelcontextprotocol.client.transport.WebClientStreamableHttpTransport;
5+
import io.modelcontextprotocol.spec.McpClientTransport;
6+
import org.junit.jupiter.api.Timeout;
7+
import org.springframework.web.reactive.function.client.WebClient;
8+
9+
@Timeout(15)
10+
public class WebClientStreamableHttpAsyncClientResiliencyTests extends AbstractMcpAsyncClientResiliencyTests {
11+
12+
@Override
13+
protected McpClientTransport createMcpTransport() {
14+
return WebClientStreamableHttpTransport.builder(WebClient.builder().baseUrl(host)).build();
15+
}
16+
17+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package io.modelcontextprotocol.client;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import io.modelcontextprotocol.client.transport.WebClientStreamableHttpTransport;
5+
import io.modelcontextprotocol.spec.McpClientTransport;
6+
import org.junit.jupiter.api.Timeout;
7+
import org.springframework.web.reactive.function.client.WebClient;
8+
import org.testcontainers.containers.GenericContainer;
9+
import org.testcontainers.containers.wait.strategy.Wait;
10+
import org.testcontainers.images.builder.ImageFromDockerfile;
11+
12+
@Timeout(15)
13+
public class WebClientStreamableHttpAsyncClientTests extends AbstractMcpAsyncClientTests {
14+
15+
static String host = "http://localhost:3001";
16+
17+
// Uses the https://github.com/tzolov/mcp-everything-server-docker-image
18+
@SuppressWarnings("resource")
19+
GenericContainer<?> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v2")
20+
.withCommand("node dist/index.js streamableHttp")
21+
.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
22+
.withExposedPorts(3001)
23+
.waitingFor(Wait.forHttp("/").forStatusCode(404));
24+
25+
@Override
26+
protected McpClientTransport createMcpTransport() {
27+
return WebClientStreamableHttpTransport.builder(WebClient.builder().baseUrl(host)).build();
28+
}
29+
30+
@Override
31+
protected void onStart() {
32+
container.start();
33+
int port = container.getMappedPort(3001);
34+
host = "http://" + container.getHost() + ":" + port;
35+
}
36+
37+
@Override
38+
public void onClose() {
39+
container.stop();
40+
}
41+
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package io.modelcontextprotocol.client;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import io.modelcontextprotocol.client.transport.WebClientStreamableHttpTransport;
5+
import io.modelcontextprotocol.spec.McpClientTransport;
6+
import org.junit.jupiter.api.Timeout;
7+
import org.springframework.web.reactive.function.client.WebClient;
8+
import org.testcontainers.containers.GenericContainer;
9+
import org.testcontainers.containers.wait.strategy.Wait;
10+
11+
@Timeout(15)
12+
public class WebClientStreamableHttpSyncClientTests extends AbstractMcpSyncClientTests {
13+
14+
static String host = "http://localhost:3001";
15+
16+
// Uses the https://github.com/tzolov/mcp-everything-server-docker-image
17+
@SuppressWarnings("resource")
18+
GenericContainer<?> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v2")
19+
.withCommand("node dist/index.js streamableHttp")
20+
.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
21+
.withExposedPorts(3001)
22+
.waitingFor(Wait.forHttp("/").forStatusCode(404));
23+
24+
@Override
25+
protected McpClientTransport createMcpTransport() {
26+
return WebClientStreamableHttpTransport.builder(WebClient.builder().baseUrl(host)).build();
27+
}
28+
29+
@Override
30+
protected void onStart() {
31+
container.start();
32+
int port = container.getMappedPort(3001);
33+
host = "http://" + container.getHost() + ":" + port;
34+
}
35+
36+
@Override
37+
public void onClose() {
38+
container.stop();
39+
}
40+
41+
}

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebFluxSseMcpAsyncClientTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ class WebFluxSseMcpAsyncClientTests extends AbstractMcpAsyncClientTests {
2626

2727
// Uses the https://github.com/tzolov/mcp-everything-server-docker-image
2828
@SuppressWarnings("resource")
29-
GenericContainer<?> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v1")
29+
GenericContainer<?> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v2")
30+
.withCommand("node dist/index.js sse")
3031
.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
3132
.withExposedPorts(3001)
3233
.waitingFor(Wait.forHttp("/").forStatusCode(404));

0 commit comments

Comments
 (0)