Skip to content

Commit b59687d

Browse files
committed
feat: add integration tests and post-processor for handling Flux return types in MCP tools
Signed-off-by: liugddx <[email protected]>
1 parent cb4e631 commit b59687d

File tree

3 files changed

+299
-84
lines changed

3 files changed

+299
-84
lines changed

auto-configurations/mcp/spring-ai-autoconfigure-mcp-server-common/src/main/java/org/springframework/ai/mcp/server/common/autoconfigure/annotations/FluxToolSpecificationPostProcessor.java

Lines changed: 172 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -16,116 +16,230 @@
1616

1717
package org.springframework.ai.mcp.server.common.autoconfigure.annotations;
1818

19+
import java.lang.reflect.Method;
1920
import java.util.ArrayList;
2021
import java.util.List;
22+
import java.util.Map;
2123
import java.util.function.BiFunction;
2224

25+
import com.fasterxml.jackson.databind.ObjectMapper;
2326
import io.modelcontextprotocol.server.McpStatelessServerFeatures;
2427
import io.modelcontextprotocol.spec.McpSchema;
2528
import io.modelcontextprotocol.spec.McpTransportContext;
26-
import org.reactivestreams.Publisher;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
import org.springaicommunity.mcp.annotation.McpTool;
32+
import org.springaicommunity.mcp.annotation.McpToolParam;
2733
import reactor.core.publisher.Flux;
2834
import reactor.core.publisher.Mono;
2935

36+
import org.springframework.util.ReflectionUtils;
37+
3038
/**
31-
* Post-processor that attempts to wrap AsyncToolSpecifications to handle Flux return
32-
* types properly by collecting all elements.
33-
*
39+
* Post-processor that wraps AsyncToolSpecifications to handle Flux return types properly
40+
* by collecting all elements before serialization.
41+
*
3442
* <p>
35-
* <strong>NOTE:</strong> This class demonstrates the intended fix for Issue #4542 where
36-
* Flux-returning tool methods only return the first element. However, the actual fix
37-
* cannot be applied at this level because the Flux has already been improperly consumed
38-
* by the time the AsyncToolSpecification's callHandler is invoked.
39-
*
43+
* <strong>Background:</strong> This class fixes Issue #4542 where Flux-returning @McpTool
44+
* methods only return the first element. The root cause is in the external {@code
45+
* org.springaicommunity.mcp.provider.tool.AsyncStatelessMcpToolProvider} library, which
46+
* treats Flux as a single-value Publisher and only takes the first element.
47+
*
4048
* <p>
41-
* The real fix needs to be in the {@code org.springaicommunity.mcp.provider.tool.AsyncStatelessMcpToolProvider}
42-
* class (from the external {@code mcp-annotations} library), where the annotated method
43-
* is invoked and its return value is processed. That code needs to check if the return
44-
* value is a {@code Flux<T>} and call {@code .collectList()} before converting it to a
45-
* {@code CallToolResult}.
46-
*
49+
* <strong>Solution:</strong> This post-processor intercepts tool specifications and wraps
50+
* their call handlers. When a method returns a Flux, it collects all elements into a list
51+
* before passing the result to the MCP serialization layer.
52+
*
4753
* <p>
48-
* <strong>Workaround:</strong> Users should return {@code Mono<List<T>>} instead of
49-
* {@code Flux<T>} from their {@code @McpTool} methods. See
50-
* {@code FLUX-RETURN-TYPE-WORKAROUND.md} for details.
51-
*
54+
* <strong>Note:</strong> Users can also work around this issue by returning {@code
55+
* Mono<List<T>>} instead of {@code Flux<T>} from their {@code @McpTool} methods.
56+
*
57+
* @author liugddx
5258
* @since 1.1.0
5359
* @see <a href="https://github.com/spring-projects/spring-ai/issues/4542">Issue #4542</a>
5460
*/
55-
public class FluxToolSpecificationPostProcessor {
61+
public final class FluxToolSpecificationPostProcessor {
62+
63+
private static final Logger logger = LoggerFactory.getLogger(FluxToolSpecificationPostProcessor.class);
64+
65+
private static final ObjectMapper objectMapper = new ObjectMapper();
66+
67+
private FluxToolSpecificationPostProcessor() {
68+
// Utility class - no instances allowed
69+
}
5670

5771
/**
5872
* Wraps tool specifications to properly handle Flux return types by collecting all
5973
* elements into a list.
6074
* @param originalSpecs the original tool specifications from the annotation provider
75+
* @param toolBeans the bean objects containing @McpTool annotated methods
6176
* @return wrapped tool specifications that properly collect Flux elements
6277
*/
6378
public static List<McpStatelessServerFeatures.AsyncToolSpecification> processToolSpecifications(
64-
List<McpStatelessServerFeatures.AsyncToolSpecification> originalSpecs) {
79+
List<McpStatelessServerFeatures.AsyncToolSpecification> originalSpecs, List<Object> toolBeans) {
6580

66-
List<McpStatelessServerFeatures.AsyncToolSpecification> wrappedSpecs = new ArrayList<>();
81+
List<McpStatelessServerFeatures.AsyncToolSpecification> processedSpecs = new ArrayList<>();
6782

6883
for (McpStatelessServerFeatures.AsyncToolSpecification spec : originalSpecs) {
69-
McpStatelessServerFeatures.AsyncToolSpecification wrappedSpec = wrapToolSpecification(spec);
70-
wrappedSpecs.add(wrappedSpec);
84+
ToolMethodInfo methodInfo = findToolMethod(toolBeans, spec.tool().name());
85+
if (methodInfo != null && methodInfo.returnsFlux()) {
86+
logger.info("Detected Flux return type for MCP tool '{}', applying collection wrapper",
87+
spec.tool().name());
88+
McpStatelessServerFeatures.AsyncToolSpecification wrappedSpec = wrapToolSpecificationForFlux(spec,
89+
methodInfo);
90+
processedSpecs.add(wrappedSpec);
91+
}
92+
else {
93+
processedSpecs.add(spec);
94+
}
7195
}
7296

73-
return wrappedSpecs;
97+
return processedSpecs;
7498
}
7599

76-
private static McpStatelessServerFeatures.AsyncToolSpecification wrapToolSpecification(
77-
McpStatelessServerFeatures.AsyncToolSpecification original) {
100+
/**
101+
* Finds the method annotated with @McpTool that matches the given tool name.
102+
* @param toolBeans the bean objects containing @McpTool annotated methods
103+
* @param toolName the name of the tool to find
104+
* @return the ToolMethodInfo object, or null if not found
105+
*/
106+
private static ToolMethodInfo findToolMethod(List<Object> toolBeans, String toolName) {
107+
for (Object bean : toolBeans) {
108+
Class<?> clazz = bean.getClass();
109+
Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz);
110+
for (Method method : methods) {
111+
McpTool annotation = method.getAnnotation(McpTool.class);
112+
if (annotation != null && annotation.name().equals(toolName)) {
113+
return new ToolMethodInfo(bean, method);
114+
}
115+
}
116+
}
117+
return null;
118+
}
119+
120+
/**
121+
* Wraps a tool specification to collect all Flux elements before serialization.
122+
* @param original the original tool specification
123+
* @param methodInfo the method information including bean and method
124+
* @return the wrapped tool specification
125+
*/
126+
private static McpStatelessServerFeatures.AsyncToolSpecification wrapToolSpecificationForFlux(
127+
McpStatelessServerFeatures.AsyncToolSpecification original, ToolMethodInfo methodInfo) {
78128

79129
BiFunction<McpTransportContext, McpSchema.CallToolRequest, Mono<McpSchema.CallToolResult>> originalHandler = original
80130
.callHandler();
81131

82132
BiFunction<McpTransportContext, McpSchema.CallToolRequest, Mono<McpSchema.CallToolResult>> wrappedHandler = (
83133
context, request) -> {
84-
Mono<McpSchema.CallToolResult> result = originalHandler.apply(context, request);
85-
86-
// The issue is that if the underlying implementation returns a Flux but
87-
// only emits the first element, we need to ensure all elements are
88-
// collected.
89-
// However, at this level, we receive a Mono<CallToolResult>, so the damage
90-
// is already done.
91-
// The fix needs to be in the org.springaicommunity.mcp library itself.
92-
93-
return result;
134+
try {
135+
// Invoke the method directly to get access to the Flux
136+
Object[] args = buildMethodArguments(methodInfo.method(), request.arguments());
137+
Object result = ReflectionUtils.invokeMethod(methodInfo.method(), methodInfo.bean(), args);
138+
139+
if (result instanceof Flux) {
140+
// Collect all Flux elements into a list
141+
Flux<?> flux = (Flux<?>) result;
142+
return flux.collectList().flatMap(list -> {
143+
// Serialize the list to JSON
144+
try {
145+
String jsonContent = objectMapper.writeValueAsString(list);
146+
return Mono.just(new McpSchema.CallToolResult(
147+
List.of(new McpSchema.TextContent(jsonContent)), false));
148+
}
149+
catch (Exception e) {
150+
logger.error("Failed to serialize Flux result for tool '{}'", original.tool().name(), e);
151+
return Mono.just(new McpSchema.CallToolResult(
152+
List.of(new McpSchema.TextContent("Error: " + e.getMessage())), true));
153+
}
154+
});
155+
}
156+
else {
157+
// Fall back to original handler for non-Flux results
158+
return originalHandler.apply(context, request);
159+
}
160+
}
161+
catch (Exception e) {
162+
logger.error("Failed to invoke tool method '{}'", original.tool().name(), e);
163+
return Mono.just(
164+
new McpSchema.CallToolResult(List.of(new McpSchema.TextContent("Error: " + e.getMessage())),
165+
true));
166+
}
94167
};
95168

96169
return new McpStatelessServerFeatures.AsyncToolSpecification(original.tool(), wrappedHandler);
97170
}
98171

99172
/**
100-
* Helper method to detect if a Publisher might be a Flux that needs special
101-
* handling.
102-
* @param publisher the publisher to check
103-
* @return true if the publisher is a Flux (multi-element stream)
173+
* Builds method arguments from the request arguments map.
174+
* @param method the method to invoke
175+
* @param requestArgs the arguments from the CallToolRequest
176+
* @return array of method arguments
104177
*/
105-
private static boolean isFlux(Publisher<?> publisher) {
106-
return publisher instanceof Flux;
178+
private static Object[] buildMethodArguments(Method method, Map<String, Object> requestArgs) {
179+
java.lang.reflect.Parameter[] parameters = method.getParameters();
180+
Object[] args = new Object[parameters.length];
181+
182+
for (int i = 0; i < parameters.length; i++) {
183+
java.lang.reflect.Parameter param = parameters[i];
184+
McpToolParam paramAnnotation = param.getAnnotation(McpToolParam.class);
185+
186+
if (paramAnnotation != null) {
187+
String paramName = paramAnnotation.name().isEmpty() ? param.getName() : paramAnnotation.name();
188+
Object value = requestArgs.get(paramName);
189+
190+
// Type conversion if needed
191+
if (value != null) {
192+
args[i] = objectMapper.convertValue(value, param.getType());
193+
}
194+
else if (!paramAnnotation.required()) {
195+
args[i] = null;
196+
}
197+
else {
198+
throw new IllegalArgumentException("Required parameter '" + paramName + "' is missing");
199+
}
200+
}
201+
else {
202+
// Try to match by parameter name
203+
Object value = requestArgs.get(param.getName());
204+
if (value != null) {
205+
args[i] = objectMapper.convertValue(value, param.getType());
206+
}
207+
else {
208+
args[i] = null;
209+
}
210+
}
211+
}
212+
213+
return args;
107214
}
108215

109216
/**
110-
* Attempts to collect all elements from a Flux into a list. If the publisher is
111-
* already a Mono, returns it as-is.
112-
* @param publisher the publisher
113-
* @return a Mono containing either the single value or a list of all values
217+
* Holds information about a tool method.
114218
*/
115-
@SuppressWarnings("unchecked")
116-
private static <T> Mono<T> collectIfFlux(Publisher<T> publisher) {
117-
if (publisher instanceof Flux) {
118-
// Collect all elements into a list
119-
return ((Flux<T>) publisher).collectList().map(list -> (T) list);
219+
private static class ToolMethodInfo {
220+
221+
private final Object bean;
222+
223+
private final Method method;
224+
225+
ToolMethodInfo(Object bean, Method method) {
226+
this.bean = bean;
227+
this.method = method;
228+
ReflectionUtils.makeAccessible(method);
120229
}
121-
else if (publisher instanceof Mono) {
122-
return (Mono<T>) publisher;
230+
231+
Object bean() {
232+
return this.bean;
123233
}
124-
else {
125-
// Generic Publisher - convert to Flux and collect
126-
return Flux.from(publisher).collectList().map(list -> (T) list);
234+
235+
Method method() {
236+
return this.method;
127237
}
238+
239+
boolean returnsFlux() {
240+
return Flux.class.isAssignableFrom(this.method.getReturnType());
241+
}
242+
128243
}
129244

130245
}
131-

auto-configurations/mcp/spring-ai-autoconfigure-mcp-server-common/src/main/java/org/springframework/ai/mcp/server/common/autoconfigure/annotations/StatelessServerSpecificationFactoryAutoConfiguration.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,12 @@ public List<McpStatelessServerFeatures.AsyncCompletionSpecification> completionS
127127
@Bean
128128
public List<McpStatelessServerFeatures.AsyncToolSpecification> toolSpecs(
129129
ServerMcpAnnotatedBeans beansWithMcpMethodAnnotations) {
130-
return AsyncMcpAnnotationProviders
131-
.statelessToolSpecifications(beansWithMcpMethodAnnotations.getBeansByAnnotation(McpTool.class));
130+
List<Object> toolBeans = beansWithMcpMethodAnnotations.getBeansByAnnotation(McpTool.class);
131+
List<McpStatelessServerFeatures.AsyncToolSpecification> originalSpecs = AsyncMcpAnnotationProviders
132+
.statelessToolSpecifications(toolBeans);
133+
134+
// Apply post-processing to handle Flux return types (Issue #4542)
135+
return FluxToolSpecificationPostProcessor.processToolSpecifications(originalSpecs, toolBeans);
132136
}
133137

134138
}

0 commit comments

Comments
 (0)