Skip to content
This repository was archived by the owner on Feb 14, 2025. It is now read-only.

Commit 383d6a7

Browse files
committed
feat(mcp-client): Add non-blocking tools change notification support
- Add toolsChangeNotificationHandler with non-blocking consumer execution using boundedElastic scheduler - Add toolsChangeConsumer builder method to McpClient for easy configuration - Add comprehensive test coverage for tools change notifications - Update documentation with tools change notification features and examples The implementation ensures consumers are executed non-blockingly on the boundedElastic scheduler, preventing performance impact from blocking implementations while maintaining proper error handling and logging. Part of #20
1 parent 9adcc5b commit 383d6a7

File tree

6 files changed

+165
-26
lines changed

6 files changed

+165
-26
lines changed

spring-ai-mcp-core/README.md

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ This SDK implements the Model Context Protocol, enabling seamless integration wi
1212
- Standard MCP operations support:
1313
- Protocol version compatibility negotiation
1414
- Client-server capability exchange
15-
- Tool discovery and execution
15+
- Tool discovery and execution with change notifications
16+
- Tool list change notifications with non-blocking consumer support
1617
- Resource management with URI templates
1718
- Resource subscription system
1819
- Roots list management and notifications
@@ -185,6 +186,23 @@ var subscription = client.listResources()
185186
return client.subscribeResource(new McpSchema.SubscribeRequest("resource-uri"));
186187
});
187188

189+
// Set up tools change notification handling
190+
List<Consumer<List<McpSchema.Tool>>> toolsChangeConsumers = List.of(
191+
tools -> {
192+
// Handle tools list changes reactively
193+
tools.forEach(tool -> {
194+
System.out.println("Tool updated: " + tool.name());
195+
});
196+
}
197+
);
198+
199+
McpAsyncClient clientWithToolsNotifications = McpClient.using(transport)
200+
.toolsChangeConsumer(toolsChangeConsumers)
201+
.async();
202+
203+
// The client will now automatically handle tools/list_changed notifications
204+
// and invoke the consumers on the boundedElastic scheduler to avoid blocking
205+
188206
// Handle results reactively or block if needed
189207
McpSchema.GetPromptResult promptResult = result.block();
190208
subscription.block();
@@ -243,6 +261,47 @@ The SDK follows a layered architecture with clear separation of concerns:
243261
- Execution handling with timeout support
244262
- Result processing with error handling
245263

264+
### Tool Change Notifications
265+
266+
The SDK supports automatic handling of tool list changes through a non-blocking notification system:
267+
268+
#### Features
269+
- Register multiple consumers to handle tool list changes
270+
- Non-blocking execution using Project Reactor's boundedElastic scheduler
271+
- Automatic tools/list request handling when notifications are received
272+
- Error resilient with proper error handling and logging
273+
274+
#### Example with Tools Change Notification
275+
276+
```java
277+
// Create tool change consumers
278+
List<Consumer<List<McpSchema.Tool>>> toolsChangeConsumers = List.of(
279+
tools -> {
280+
// First consumer - e.g., update UI
281+
tools.forEach(tool -> updateToolsUI(tool));
282+
},
283+
tools -> {
284+
// Second consumer - e.g., update cache
285+
toolsCache.updateTools(tools);
286+
}
287+
);
288+
289+
// Create client with tools change notification support
290+
McpAsyncClient client = McpClient.using(transport)
291+
.toolsChangeConsumer(toolsChangeConsumers)
292+
.async();
293+
294+
// Initialize client
295+
client.initialize()
296+
.doOnSuccess(result -> {
297+
// Client will automatically handle tools/list_changed notifications
298+
// and invoke consumers non-blockingly on boundedElastic scheduler
299+
})
300+
.subscribe();
301+
```
302+
303+
The tools change notification system ensures that consumers are executed non-blockingly, preventing any potential performance impact from blocking implementations. All consumers are executed on Project Reactor's boundedElastic scheduler, making it safe to perform potentially blocking operations within the consumers.
304+
246305
## Error Handling
247306

248307
The SDK provides comprehensive error handling through the McpError class:

spring-ai-mcp-core/pom.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,14 @@
9898
<scope>test</scope>
9999
</dependency>
100100

101+
<dependency>
102+
<groupId>org.awaitility</groupId>
103+
<artifactId>awaitility</artifactId>
104+
<version>4.2.0</version>
105+
<scope>test</scope>
106+
</dependency>
107+
101108
</dependencies>
102109

103110

104-
</project>
111+
</project>

spring-ai-mcp-core/src/main/java/org/springframework/ai/mcp/client/McpAsyncClient.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.HashMap;
2121
import java.util.List;
2222
import java.util.Map;
23+
import java.util.function.Consumer;
2324
import java.util.function.Supplier;
2425

2526
import com.fasterxml.jackson.core.type.TypeReference;
@@ -29,6 +30,7 @@
2930
import reactor.core.scheduler.Schedulers;
3031

3132
import org.springframework.ai.mcp.spec.DefaultMcpSession;
33+
import org.springframework.ai.mcp.spec.DefaultMcpSession.NotificationHandler;
3234
import org.springframework.ai.mcp.spec.DefaultMcpSession.RequestHandler;
3335
import org.springframework.ai.mcp.spec.McpError;
3436
import org.springframework.ai.mcp.spec.McpSchema;
@@ -78,7 +80,8 @@ public class McpAsyncClient {
7880
* notification.
7981
*/
8082
public McpAsyncClient(McpTransport transport, Duration requestTimeout,
81-
List<Supplier<List<Root>>> rootsListProviders, boolean rootsListChangedNotification) {
83+
List<Supplier<List<Root>>> rootsListProviders, boolean rootsListChangedNotification,
84+
List<Consumer<List<McpSchema.Tool>>> toolsChangeConsumers) {
8285

8386
Map<String, RequestHandler> requestHanlers = new HashMap<>();
8487

@@ -87,7 +90,14 @@ public McpAsyncClient(McpTransport transport, Duration requestTimeout,
8790
this.rootCapabilities = new McpSchema.ClientCapabilities.RootCapabilities(rootsListChangedNotification);
8891
}
8992

90-
this.mcpSession = new DefaultMcpSession(requestTimeout, transport, requestHanlers, Map.of());
93+
Map<String, NotificationHandler> notificationHandlers = new HashMap<>();
94+
95+
if (toolsChangeConsumers != null && !toolsChangeConsumers.isEmpty()) {
96+
notificationHandlers.put("notifications/tools/list_changed",
97+
toolsChangeNotificationHandler(toolsChangeConsumers));
98+
}
99+
100+
this.mcpSession = new DefaultMcpSession(requestTimeout, transport, requestHanlers, notificationHandlers);
91101

92102
}
93103

@@ -116,6 +126,25 @@ public Mono<Object> handle(Object params) {
116126
};
117127
};
118128

129+
private NotificationHandler toolsChangeNotificationHandler(
130+
List<Consumer<List<McpSchema.Tool>>> toolsChangeConsumers) {
131+
132+
return new NotificationHandler() {
133+
@Override
134+
public Mono<Void> handle(Object params) {
135+
// TODO: add support for cursor/pagination
136+
return listTools().flatMap(listToolsResult -> Mono.fromRunnable(() -> {
137+
for (Consumer<List<McpSchema.Tool>> toolsChangeConsumer : toolsChangeConsumers) {
138+
toolsChangeConsumer.accept(listToolsResult.tools());
139+
}
140+
}).subscribeOn(Schedulers.boundedElastic())).onErrorResume(error -> {
141+
logger.error("Error handling tools list change notification", error);
142+
return Mono.empty();
143+
}).then(); // Convert to Mono<Void>
144+
}
145+
};
146+
};
147+
119148
/**
120149
* The initialization phase MUST be the first interaction between client and server.
121150
* During this phase, the client and server:

spring-ai-mcp-core/src/main/java/org/springframework/ai/mcp/client/McpClient.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
import java.time.Duration;
2020
import java.util.ArrayList;
2121
import java.util.List;
22+
import java.util.function.Consumer;
2223
import java.util.function.Supplier;
2324

25+
import org.springframework.ai.mcp.spec.McpSchema;
2426
import org.springframework.ai.mcp.spec.McpSchema.Root;
2527
import org.springframework.ai.mcp.spec.McpTransport;
2628
import org.springframework.ai.mcp.util.Assert;
@@ -84,6 +86,8 @@ public static class Builder {
8486

8587
private List<Supplier<List<Root>>> rootsListProviders = new ArrayList<>();
8688

89+
private List<Consumer<List<McpSchema.Tool>>> toolsChangeConsumers = new ArrayList<>();
90+
8791
private Builder(McpTransport transport) {
8892
Assert.notNull(transport, "Transport must not be null");
8993
this.transport = transport;
@@ -110,6 +114,11 @@ public Builder rootsListProvider(Supplier<List<Root>> rootsListProvider) {
110114
return this;
111115
}
112116

117+
public Builder toolsChangeConsumer(Consumer<List<McpSchema.Tool>> toolsChangeConsumer) {
118+
this.toolsChangeConsumers.add(toolsChangeConsumer);
119+
return this;
120+
}
121+
113122
/**
114123
* Build a synchronous MCP client.
115124
* @return A new instance of {@link McpSyncClient}
@@ -123,7 +132,8 @@ public McpSyncClient sync() {
123132
* @return A new instance of {@link McpAsyncClient}
124133
*/
125134
public McpAsyncClient async() {
126-
return new McpAsyncClient(transport, requestTimeout, rootsListProviders, rootsListChangedNotification);
135+
return new McpAsyncClient(transport, requestTimeout, rootsListProviders, rootsListChangedNotification,
136+
toolsChangeConsumers);
127137
}
128138

129139
}

spring-ai-mcp-core/src/test/java/org/springframework/ai/mcp/client/AbstractMcpAsyncClientTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ void testInitializeWithRootsListProviders() {
183183
var transport = createMcpTransport();
184184
List<Supplier<List<Root>>> providers = List.of(() -> List.of(new Root("file:///test/path", "test-root")));
185185

186-
var client = new McpAsyncClient(transport, TIMEOUT, providers, true);
186+
var client = new McpAsyncClient(transport, TIMEOUT, providers, true, List.of());
187187

188188
assertThatCode(() -> client.initialize().block(Duration.ofSeconds(10))).doesNotThrowAnyException();
189189

spring-ai-mcp-core/src/test/java/org/springframework/ai/mcp/client/McpAsyncClientResponseHandlerTests.java

Lines changed: 54 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,19 @@
1616

1717
package org.springframework.ai.mcp.client;
1818

19+
import java.time.Duration;
20+
import java.util.ArrayList;
1921
import java.util.List;
22+
import java.util.Map;
2023
import java.util.concurrent.atomic.AtomicInteger;
24+
import java.util.function.Consumer;
2125
import java.util.function.Function;
2226
import java.util.function.Supplier;
2327

2428
import com.fasterxml.jackson.core.type.TypeReference;
29+
30+
import static org.awaitility.Awaitility.await;
2531
import com.fasterxml.jackson.databind.ObjectMapper;
26-
import org.junit.jupiter.api.AfterEach;
27-
import org.junit.jupiter.api.BeforeEach;
2832
import org.junit.jupiter.api.Test;
2933
import reactor.core.publisher.Flux;
3034
import reactor.core.publisher.Mono;
@@ -41,10 +45,6 @@
4145

4246
class McpAsyncClientResponseHandlerTests {
4347

44-
private McpAsyncClient asyncMcpClient;
45-
46-
private MockMcpTransport transport;
47-
4848
@SuppressWarnings("unused")
4949
private static class MockMcpTransport implements McpTransport {
5050

@@ -103,24 +103,56 @@ public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
103103

104104
}
105105

106-
@BeforeEach
107-
void setUp() {
108-
transport = new MockMcpTransport();
109-
110-
Supplier<List<Root>> rootsListProvider = () -> List.of(new Root("file:///test/path", "test-root"));
111-
112-
asyncMcpClient = McpClient.using(transport).rootsListProvider(rootsListProvider).async();
113-
}
114-
115-
@AfterEach
116-
void tearDown() {
117-
if (asyncMcpClient != null) {
118-
asyncMcpClient.closeGracefully();
119-
}
106+
@Test
107+
void testToolsChangeNotificationHandling() {
108+
MockMcpTransport transport = new MockMcpTransport();
109+
110+
// Create a list to store received tools for verification
111+
List<McpSchema.Tool> receivedTools = new ArrayList<>();
112+
113+
// Create a consumer that will be called when tools change
114+
Consumer<List<McpSchema.Tool>> toolsChangeConsumer = tools -> {
115+
receivedTools.addAll(tools);
116+
};
117+
118+
// Create client with tools change consumer
119+
McpAsyncClient asyncMcpClient = McpClient.using(transport).toolsChangeConsumer(toolsChangeConsumer).async();
120+
121+
// Create a mock tools list that the server will return
122+
Map<String, Object> inputSchema = Map.of("type", "object", "properties", Map.of(), "required", List.of());
123+
McpSchema.Tool mockTool = new McpSchema.Tool("test-tool", "Test Tool Description", inputSchema);
124+
McpSchema.ListToolsResult mockToolsResult = new McpSchema.ListToolsResult(List.of(mockTool), null);
125+
126+
// Simulate server sending tools/list_changed notification
127+
McpSchema.JSONRPCNotification notification = new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION,
128+
"notifications/tools/list_changed", null);
129+
transport.simulateIncomingMessage(notification);
130+
131+
// Simulate server response to tools/list request
132+
McpSchema.JSONRPCRequest toolsListRequest = transport.getLastSentMessageAsRequest();
133+
assertThat(toolsListRequest.method()).isEqualTo("tools/list");
134+
135+
McpSchema.JSONRPCResponse toolsListResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION,
136+
toolsListRequest.id(), mockToolsResult, null);
137+
transport.simulateIncomingMessage(toolsListResponse);
138+
139+
// Verify the consumer received the expected tools
140+
await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
141+
assertThat(receivedTools).hasSize(1);
142+
assertThat(receivedTools.get(0).name()).isEqualTo("test-tool");
143+
assertThat(receivedTools.get(0).description()).isEqualTo("Test Tool Description");
144+
});
145+
146+
asyncMcpClient.closeGracefully();
120147
}
121148

122149
@Test
123150
void testRootsListRequestHandling() {
151+
MockMcpTransport transport = new MockMcpTransport();
152+
153+
Supplier<List<Root>> rootsListProvider = () -> List.of(new Root("file:///test/path", "test-root"));
154+
155+
McpAsyncClient asyncMcpClient = McpClient.using(transport).rootsListProvider(rootsListProvider).async();
124156

125157
// Simulate incoming request
126158
McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, "roots/list",
@@ -135,6 +167,8 @@ void testRootsListRequestHandling() {
135167
assertThat(response.id()).isEqualTo("test-id");
136168
assertThat(response.result()).isEqualTo(List.of(new Root("file:///test/path", "test-root")));
137169
assertThat(response.error()).isNull();
170+
171+
asyncMcpClient.closeGracefully();
138172
}
139173

140174
}

0 commit comments

Comments
 (0)