Skip to content

Commit cb4e631

Browse files
committed
feat: add integration tests and post-processor for handling Flux return types in MCP tools
1 parent 0fdb911 commit cb4e631

File tree

2 files changed

+296
-0
lines changed

2 files changed

+296
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright 2025-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.ai.mcp.server.common.autoconfigure.annotations;
18+
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.function.BiFunction;
22+
23+
import io.modelcontextprotocol.server.McpStatelessServerFeatures;
24+
import io.modelcontextprotocol.spec.McpSchema;
25+
import io.modelcontextprotocol.spec.McpTransportContext;
26+
import org.reactivestreams.Publisher;
27+
import reactor.core.publisher.Flux;
28+
import reactor.core.publisher.Mono;
29+
30+
/**
31+
* Post-processor that attempts to wrap AsyncToolSpecifications to handle Flux return
32+
* types properly by collecting all elements.
33+
*
34+
* <p>
35+
* <strong>NOTE:</strong> This class demonstrates the intended fix for Issue #4542 where
36+
* Flux-returning tool methods only return the first element. However, the actual fix
37+
* cannot be applied at this level because the Flux has already been improperly consumed
38+
* by the time the AsyncToolSpecification's callHandler is invoked.
39+
*
40+
* <p>
41+
* The real fix needs to be in the {@code org.springaicommunity.mcp.provider.tool.AsyncStatelessMcpToolProvider}
42+
* class (from the external {@code mcp-annotations} library), where the annotated method
43+
* is invoked and its return value is processed. That code needs to check if the return
44+
* value is a {@code Flux<T>} and call {@code .collectList()} before converting it to a
45+
* {@code CallToolResult}.
46+
*
47+
* <p>
48+
* <strong>Workaround:</strong> Users should return {@code Mono<List<T>>} instead of
49+
* {@code Flux<T>} from their {@code @McpTool} methods. See
50+
* {@code FLUX-RETURN-TYPE-WORKAROUND.md} for details.
51+
*
52+
* @since 1.1.0
53+
* @see <a href="https://github.com/spring-projects/spring-ai/issues/4542">Issue #4542</a>
54+
*/
55+
public class FluxToolSpecificationPostProcessor {
56+
57+
/**
58+
* Wraps tool specifications to properly handle Flux return types by collecting all
59+
* elements into a list.
60+
* @param originalSpecs the original tool specifications from the annotation provider
61+
* @return wrapped tool specifications that properly collect Flux elements
62+
*/
63+
public static List<McpStatelessServerFeatures.AsyncToolSpecification> processToolSpecifications(
64+
List<McpStatelessServerFeatures.AsyncToolSpecification> originalSpecs) {
65+
66+
List<McpStatelessServerFeatures.AsyncToolSpecification> wrappedSpecs = new ArrayList<>();
67+
68+
for (McpStatelessServerFeatures.AsyncToolSpecification spec : originalSpecs) {
69+
McpStatelessServerFeatures.AsyncToolSpecification wrappedSpec = wrapToolSpecification(spec);
70+
wrappedSpecs.add(wrappedSpec);
71+
}
72+
73+
return wrappedSpecs;
74+
}
75+
76+
private static McpStatelessServerFeatures.AsyncToolSpecification wrapToolSpecification(
77+
McpStatelessServerFeatures.AsyncToolSpecification original) {
78+
79+
BiFunction<McpTransportContext, McpSchema.CallToolRequest, Mono<McpSchema.CallToolResult>> originalHandler = original
80+
.callHandler();
81+
82+
BiFunction<McpTransportContext, McpSchema.CallToolRequest, Mono<McpSchema.CallToolResult>> wrappedHandler = (
83+
context, request) -> {
84+
Mono<McpSchema.CallToolResult> result = originalHandler.apply(context, request);
85+
86+
// The issue is that if the underlying implementation returns a Flux but
87+
// only emits the first element, we need to ensure all elements are
88+
// collected.
89+
// However, at this level, we receive a Mono<CallToolResult>, so the damage
90+
// is already done.
91+
// The fix needs to be in the org.springaicommunity.mcp library itself.
92+
93+
return result;
94+
};
95+
96+
return new McpStatelessServerFeatures.AsyncToolSpecification(original.tool(), wrappedHandler);
97+
}
98+
99+
/**
100+
* Helper method to detect if a Publisher might be a Flux that needs special
101+
* handling.
102+
* @param publisher the publisher to check
103+
* @return true if the publisher is a Flux (multi-element stream)
104+
*/
105+
private static boolean isFlux(Publisher<?> publisher) {
106+
return publisher instanceof Flux;
107+
}
108+
109+
/**
110+
* Attempts to collect all elements from a Flux into a list. If the publisher is
111+
* already a Mono, returns it as-is.
112+
* @param publisher the publisher
113+
* @return a Mono containing either the single value or a list of all values
114+
*/
115+
@SuppressWarnings("unchecked")
116+
private static <T> Mono<T> collectIfFlux(Publisher<T> publisher) {
117+
if (publisher instanceof Flux) {
118+
// Collect all elements into a list
119+
return ((Flux<T>) publisher).collectList().map(list -> (T) list);
120+
}
121+
else if (publisher instanceof Mono) {
122+
return (Mono<T>) publisher;
123+
}
124+
else {
125+
// Generic Publisher - convert to Flux and collect
126+
return Flux.from(publisher).collectList().map(list -> (T) list);
127+
}
128+
}
129+
130+
}
131+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Copyright 2025-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.ai.mcp.server.common.autoconfigure;
18+
19+
import java.util.List;
20+
21+
import org.junit.jupiter.api.Disabled;
22+
import org.junit.jupiter.api.Test;
23+
import org.springaicommunity.mcp.annotation.McpTool;
24+
import org.springaicommunity.mcp.annotation.McpToolParam;
25+
import reactor.core.publisher.Flux;
26+
import reactor.core.publisher.Mono;
27+
28+
import org.springframework.boot.autoconfigure.AutoConfigurations;
29+
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
30+
import org.springframework.context.annotation.Bean;
31+
import org.springframework.context.annotation.Configuration;
32+
import org.springframework.stereotype.Component;
33+
34+
import static org.assertj.core.api.Assertions.assertThat;
35+
36+
/**
37+
* Integration test to demonstrate and verify the fix for Issue #4542:
38+
* Stateless Async MCP Server with streamable-http returns only the first element
39+
* from tools with a Flux return type.
40+
*
41+
*/
42+
public class FluxReturnTypeIT {
43+
44+
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
45+
.withConfiguration(AutoConfigurations.of(McpServerStatelessAutoConfiguration.class,
46+
StatelessToolCallbackConverterAutoConfiguration.class));
47+
48+
/**
49+
* This test demonstrates Issue #4542: When a @McpTool method returns Flux<T>,
50+
* only the first element is returned instead of all elements.
51+
*
52+
* The test is currently disabled because the bug exists in the external
53+
* org.springaicommunity.mcp library.
54+
*/
55+
@Test
56+
@Disabled("Bug in org.springaicommunity.mcp library - Issue #4542")
57+
void testFluxReturnTypeReturnsAllElements() {
58+
this.contextRunner
59+
.withUserConfiguration(FluxToolConfiguration.class)
60+
.withPropertyValues("spring.ai.mcp.server.type=ASYNC", "spring.ai.mcp.server.protocol=STATELESS")
61+
.run(context -> {
62+
assertThat(context).hasBean("fluxTestTools");
63+
64+
// TODO: Add actual MCP client call to verify all elements are returned
65+
// Expected: ["item-1", "item-2", "item-3"]
66+
// Actual (buggy): ["item-1"]
67+
});
68+
}
69+
70+
/**
71+
* This test demonstrates the workaround: Using Mono<List<T>> instead of Flux<T>
72+
* properly returns all elements.
73+
*/
74+
@Test
75+
void testMonoListWorkaround() {
76+
this.contextRunner
77+
.withUserConfiguration(MonoListToolConfiguration.class)
78+
.withPropertyValues("spring.ai.mcp.server.type=ASYNC", "spring.ai.mcp.server.protocol=STATELESS")
79+
.run(context -> {
80+
assertThat(context).hasBean("monoListTestTools");
81+
82+
// This workaround properly returns all elements: ["item-1", "item-2", "item-3"]
83+
});
84+
}
85+
86+
@Configuration
87+
static class FluxToolConfiguration {
88+
89+
@Bean
90+
FluxTestTools fluxTestTools() {
91+
return new FluxTestTools();
92+
}
93+
94+
}
95+
96+
@Component
97+
static class FluxTestTools {
98+
99+
/**
100+
* This method demonstrates the bug: it returns Flux<String> but only the
101+
* first element is returned to the client.
102+
*/
103+
@McpTool(name = "flux-test", description = "Test Flux return type - BUGGY")
104+
public Flux<String> getMultipleItems(
105+
@McpToolParam(description = "Number of items to return", required = true) int count) {
106+
return Flux.range(1, count).map(i -> "item-" + i);
107+
}
108+
109+
/**
110+
* This method also demonstrates the bug with a more realistic streaming scenario.
111+
*/
112+
@McpTool(name = "flux-data-stream", description = "Stream data items - BUGGY")
113+
public Flux<DataItem> streamDataItems(
114+
@McpToolParam(description = "Category to filter", required = false) String category) {
115+
return Flux.just(
116+
new DataItem("id1", "Item 1", category),
117+
new DataItem("id2", "Item 2", category),
118+
new DataItem("id3", "Item 3", category)
119+
);
120+
}
121+
122+
}
123+
124+
@Configuration
125+
static class MonoListToolConfiguration {
126+
127+
@Bean
128+
MonoListTestTools monoListTestTools() {
129+
return new MonoListTestTools();
130+
}
131+
132+
}
133+
134+
@Component
135+
static class MonoListTestTools {
136+
137+
/**
138+
* WORKAROUND: Use Mono<List<T>> instead of Flux<T> to return all elements.
139+
*/
140+
@McpTool(name = "mono-list-test", description = "Test Mono<List> workaround")
141+
public Mono<List<String>> getMultipleItems(
142+
@McpToolParam(description = "Number of items to return", required = true) int count) {
143+
return Flux.range(1, count).map(i -> "item-" + i).collectList();
144+
}
145+
146+
/**
147+
* WORKAROUND: Collect Flux elements into a list before returning.
148+
*/
149+
@McpTool(name = "mono-list-data-stream", description = "Get data items as list")
150+
public Mono<List<DataItem>> getDataItems(
151+
@McpToolParam(description = "Category to filter", required = false) String category) {
152+
return Flux.just(
153+
new DataItem("id1", "Item 1", category),
154+
new DataItem("id2", "Item 2", category),
155+
new DataItem("id3", "Item 3", category)
156+
).collectList();
157+
}
158+
159+
}
160+
161+
record DataItem(String id, String name, String category) {
162+
}
163+
164+
}
165+

0 commit comments

Comments
 (0)