Skip to content
This repository was archived by the owner on Feb 14, 2025. It is now read-only.

Commit 975578b

Browse files
committed
feat: Add resources and prompts change notification support
- Add resourcesChangeNotificationHandler and promptsChangeNotificationHandler with non-blocking execution - Add builder methods for resources and prompts change consumers - Add test coverage for all notification handlers - Update documentation to cover all three notification types (tools, resources, prompts) - Consolidate notification documentation under unified Change Notifications section - Ensure consistent non-blocking behavior using boundedElastic scheduler All notification handlers follow the same pattern of non-blocking execution and error handling, providing a consistent approach to change notifications across the system. Resolves #20
1 parent 383d6a7 commit 975578b

File tree

5 files changed

+197
-12
lines changed

5 files changed

+197
-12
lines changed

spring-ai-mcp-core/README.md

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -261,17 +261,17 @@ The SDK follows a layered architecture with clear separation of concerns:
261261
- Execution handling with timeout support
262262
- Result processing with error handling
263263

264-
### Tool Change Notifications
264+
### Change Notifications
265265

266-
The SDK supports automatic handling of tool list changes through a non-blocking notification system:
266+
The SDK supports automatic handling of changes through a non-blocking notification system for tools, resources, and prompts:
267267

268268
#### Features
269-
- Register multiple consumers to handle tool list changes
269+
- Register multiple consumers for tools, resources, and prompts changes
270270
- Non-blocking execution using Project Reactor's boundedElastic scheduler
271-
- Automatic tools/list request handling when notifications are received
271+
- Automatic list request handling when notifications are received
272272
- Error resilient with proper error handling and logging
273273

274-
#### Example with Tools Change Notification
274+
#### Example with Change Notifications
275275

276276
```java
277277
// Create tool change consumers
@@ -286,21 +286,50 @@ List<Consumer<List<McpSchema.Tool>>> toolsChangeConsumers = List.of(
286286
}
287287
);
288288

289-
// Create client with tools change notification support
289+
// Create resource change consumers
290+
List<Consumer<List<McpSchema.Resource>>> resourcesChangeConsumers = List.of(
291+
resources -> {
292+
// Handle resource changes
293+
resources.forEach(resource -> {
294+
System.out.println("Resource updated: " + resource.uri());
295+
updateResourcesUI(resource);
296+
});
297+
}
298+
);
299+
300+
// Create prompt change consumers
301+
List<Consumer<List<McpSchema.Prompt>>> promptsChangeConsumers = List.of(
302+
prompts -> {
303+
// Handle prompt changes
304+
prompts.forEach(prompt -> {
305+
System.out.println("Prompt updated: " + prompt.name());
306+
updatePromptsCache(prompt);
307+
});
308+
}
309+
);
310+
311+
// Create client with change notification support
290312
McpAsyncClient client = McpClient.using(transport)
291-
.toolsChangeConsumer(toolsChangeConsumers)
313+
.toolsChangeConsumer(toolsChangeConsumer)
314+
.resourcesChangeConsumer(resourcesChangeConsumer)
315+
.promptsChangeConsumer(promptsChangeConsumer)
292316
.async();
293317

294318
// Initialize client
295319
client.initialize()
296320
.doOnSuccess(result -> {
297-
// Client will automatically handle tools/list_changed notifications
321+
// Client will automatically handle all change notifications
298322
// and invoke consumers non-blockingly on boundedElastic scheduler
299323
})
300324
.subscribe();
301325
```
302326

303-
The tools change notification system ensures that consumers are executed non-blockingly, preventing any potential performance impact from blocking implementations. All consumers are executed on Project Reactor's boundedElastic scheduler, making it safe to perform potentially blocking operations within the consumers.
327+
The change notification system ensures that all consumers are executed non-blockingly, preventing any potential performance impact from blocking implementations. All consumers are executed on Project Reactor's boundedElastic scheduler, making it safe to perform potentially blocking operations within the consumers.
328+
329+
Each type of notification handler operates independently:
330+
- Tools: Handles notifications when available tools change
331+
- Resources: Handles notifications when available resources change
332+
- Prompts: Handles notifications when available prompts change
304333

305334
## Error Handling
306335

spring-ai-mcp-core/src/main/java/org/springframework/ai/mcp/client/McpAsyncClient.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ public class McpAsyncClient {
8181
*/
8282
public McpAsyncClient(McpTransport transport, Duration requestTimeout,
8383
List<Supplier<List<Root>>> rootsListProviders, boolean rootsListChangedNotification,
84-
List<Consumer<List<McpSchema.Tool>>> toolsChangeConsumers) {
84+
List<Consumer<List<McpSchema.Tool>>> toolsChangeConsumers,
85+
List<Consumer<List<McpSchema.Resource>>> resourcesChangeConsumers,
86+
List<Consumer<List<McpSchema.Prompt>>> promptsChangeConsumers) {
8587

8688
Map<String, RequestHandler> requestHanlers = new HashMap<>();
8789

@@ -97,6 +99,16 @@ public McpAsyncClient(McpTransport transport, Duration requestTimeout,
9799
toolsChangeNotificationHandler(toolsChangeConsumers));
98100
}
99101

102+
if (resourcesChangeConsumers != null && !resourcesChangeConsumers.isEmpty()) {
103+
notificationHandlers.put("notifications/resources/list_changed",
104+
resourcesChangeNotificationHandler(resourcesChangeConsumers));
105+
}
106+
107+
if (promptsChangeConsumers != null && !promptsChangeConsumers.isEmpty()) {
108+
notificationHandlers.put("notifications/prompts/list_changed",
109+
promptsChangeNotificationHandler(promptsChangeConsumers));
110+
}
111+
100112
this.mcpSession = new DefaultMcpSession(requestTimeout, transport, requestHanlers, notificationHandlers);
101113

102114
}
@@ -145,6 +157,44 @@ public Mono<Void> handle(Object params) {
145157
};
146158
};
147159

160+
private NotificationHandler resourcesChangeNotificationHandler(
161+
List<Consumer<List<McpSchema.Resource>>> resourcesChangeConsumers) {
162+
163+
return new NotificationHandler() {
164+
@Override
165+
public Mono<Void> handle(Object params) {
166+
// TODO: add support for cursor/pagination
167+
return listResources().flatMap(listResourcesResult -> Mono.fromRunnable(() -> {
168+
for (Consumer<List<McpSchema.Resource>> resourceChangeConsumer : resourcesChangeConsumers) {
169+
resourceChangeConsumer.accept(listResourcesResult.resources());
170+
}
171+
}).subscribeOn(Schedulers.boundedElastic())).onErrorResume(error -> {
172+
logger.error("Error handling resources list change notification", error);
173+
return Mono.empty();
174+
}).then(); // Convert to Mono<Void>
175+
}
176+
};
177+
};
178+
179+
private NotificationHandler promptsChangeNotificationHandler(
180+
List<Consumer<List<McpSchema.Prompt>>> promptsChangeConsumers) {
181+
182+
return new NotificationHandler() {
183+
@Override
184+
public Mono<Void> handle(Object params) {
185+
// TODO: add support for cursor/pagination
186+
return listPrompts().flatMap(listPromptsResult -> Mono.fromRunnable(() -> {
187+
for (Consumer<List<McpSchema.Prompt>> promptChangeConsumer : promptsChangeConsumers) {
188+
promptChangeConsumer.accept(listPromptsResult.prompts());
189+
}
190+
}).subscribeOn(Schedulers.boundedElastic())).onErrorResume(error -> {
191+
logger.error("Error handling prompts list change notification", error);
192+
return Mono.empty();
193+
}).then(); // Convert to Mono<Void>
194+
}
195+
};
196+
};
197+
148198
/**
149199
* The initialization phase MUST be the first interaction between client and server.
150200
* During this phase, the client and server:

spring-ai-mcp-core/src/main/java/org/springframework/ai/mcp/client/McpClient.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ public static class Builder {
8888

8989
private List<Consumer<List<McpSchema.Tool>>> toolsChangeConsumers = new ArrayList<>();
9090

91+
private List<Consumer<List<McpSchema.Resource>>> resourcesChangeConsumers = new ArrayList<>();
92+
93+
private List<Consumer<List<McpSchema.Prompt>>> promptsChangeConsumers = new ArrayList<>();
94+
9195
private Builder(McpTransport transport) {
9296
Assert.notNull(transport, "Transport must not be null");
9397
this.transport = transport;
@@ -119,6 +123,16 @@ public Builder toolsChangeConsumer(Consumer<List<McpSchema.Tool>> toolsChangeCon
119123
return this;
120124
}
121125

126+
public Builder resourcesChangeConsumer(Consumer<List<McpSchema.Resource>> resourcesChangeConsumer) {
127+
this.resourcesChangeConsumers.add(resourcesChangeConsumer);
128+
return this;
129+
}
130+
131+
public Builder promptsChangeConsumer(Consumer<List<McpSchema.Prompt>> promptsChangeConsumer) {
132+
this.promptsChangeConsumers.add(promptsChangeConsumer);
133+
return this;
134+
}
135+
122136
/**
123137
* Build a synchronous MCP client.
124138
* @return A new instance of {@link McpSyncClient}
@@ -133,7 +147,7 @@ public McpSyncClient sync() {
133147
*/
134148
public McpAsyncClient async() {
135149
return new McpAsyncClient(transport, requestTimeout, rootsListProviders, rootsListChangedNotification,
136-
toolsChangeConsumers);
150+
toolsChangeConsumers, resourcesChangeConsumers, promptsChangeConsumers);
137151
}
138152

139153
}

spring-ai-mcp-core/src/test/java/org/springframework/ai/mcp/client/AbstractMcpAsyncClientTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ void testInitializeWithRootsListProviders() {
183183
var transport = createMcpTransport();
184184
List<Supplier<List<Root>>> providers = List.of(() -> List.of(new Root("file:///test/path", "test-root")));
185185

186-
var client = new McpAsyncClient(transport, TIMEOUT, providers, true, List.of());
186+
var client = McpClient.using(transport).requestTimeout(TIMEOUT).rootsListProvider(providers.get(0)).async();
187187

188188
assertThatCode(() -> client.initialize().block(Duration.ofSeconds(10))).doesNotThrowAnyException();
189189

spring-ai-mcp-core/src/test/java/org/springframework/ai/mcp/client/McpAsyncClientResponseHandlerTests.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,4 +171,96 @@ void testRootsListRequestHandling() {
171171
asyncMcpClient.closeGracefully();
172172
}
173173

174+
@Test
175+
void testResourcesChangeNotificationHandling() {
176+
MockMcpTransport transport = new MockMcpTransport();
177+
178+
// Create a list to store received resources for verification
179+
List<McpSchema.Resource> receivedResources = new ArrayList<>();
180+
181+
// Create a consumer that will be called when resources change
182+
Consumer<List<McpSchema.Resource>> resourcesChangeConsumer = resources -> {
183+
receivedResources.addAll(resources);
184+
};
185+
186+
// Create client with resources change consumer
187+
McpAsyncClient asyncMcpClient = McpClient.using(transport)
188+
.resourcesChangeConsumer(resourcesChangeConsumer)
189+
.async();
190+
191+
// Create a mock resources list that the server will return
192+
McpSchema.Resource mockResource = new McpSchema.Resource("test://resource", "Test Resource", "A test resource",
193+
"text/plain", null);
194+
McpSchema.ListResourcesResult mockResourcesResult = new McpSchema.ListResourcesResult(List.of(mockResource),
195+
null);
196+
197+
// Simulate server sending resources/list_changed notification
198+
McpSchema.JSONRPCNotification notification = new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION,
199+
"notifications/resources/list_changed", null);
200+
transport.simulateIncomingMessage(notification);
201+
202+
// Simulate server response to resources/list request
203+
McpSchema.JSONRPCRequest resourcesListRequest = transport.getLastSentMessageAsRequest();
204+
assertThat(resourcesListRequest.method()).isEqualTo("resources/list");
205+
206+
McpSchema.JSONRPCResponse resourcesListResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION,
207+
resourcesListRequest.id(), mockResourcesResult, null);
208+
transport.simulateIncomingMessage(resourcesListResponse);
209+
210+
// Verify the consumer received the expected resources
211+
await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
212+
assertThat(receivedResources).hasSize(1);
213+
assertThat(receivedResources.get(0).uri()).isEqualTo("test://resource");
214+
assertThat(receivedResources.get(0).name()).isEqualTo("Test Resource");
215+
assertThat(receivedResources.get(0).description()).isEqualTo("A test resource");
216+
});
217+
218+
asyncMcpClient.closeGracefully();
219+
}
220+
221+
@Test
222+
void testPromptsChangeNotificationHandling() {
223+
MockMcpTransport transport = new MockMcpTransport();
224+
225+
// Create a list to store received prompts for verification
226+
List<McpSchema.Prompt> receivedPrompts = new ArrayList<>();
227+
228+
// Create a consumer that will be called when prompts change
229+
Consumer<List<McpSchema.Prompt>> promptsChangeConsumer = prompts -> {
230+
receivedPrompts.addAll(prompts);
231+
};
232+
233+
// Create client with prompts change consumer
234+
McpAsyncClient asyncMcpClient = McpClient.using(transport).promptsChangeConsumer(promptsChangeConsumer).async();
235+
236+
// Create a mock prompts list that the server will return
237+
McpSchema.Prompt mockPrompt = new McpSchema.Prompt("test-prompt", "Test Prompt Description",
238+
List.of(new McpSchema.PromptArgument("arg1", "Test argument", true)));
239+
McpSchema.ListPromptsResult mockPromptsResult = new McpSchema.ListPromptsResult(List.of(mockPrompt), null);
240+
241+
// Simulate server sending prompts/list_changed notification
242+
McpSchema.JSONRPCNotification notification = new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION,
243+
"notifications/prompts/list_changed", null);
244+
transport.simulateIncomingMessage(notification);
245+
246+
// Simulate server response to prompts/list request
247+
McpSchema.JSONRPCRequest promptsListRequest = transport.getLastSentMessageAsRequest();
248+
assertThat(promptsListRequest.method()).isEqualTo("prompts/list");
249+
250+
McpSchema.JSONRPCResponse promptsListResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION,
251+
promptsListRequest.id(), mockPromptsResult, null);
252+
transport.simulateIncomingMessage(promptsListResponse);
253+
254+
// Verify the consumer received the expected prompts
255+
await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
256+
assertThat(receivedPrompts).hasSize(1);
257+
assertThat(receivedPrompts.get(0).name()).isEqualTo("test-prompt");
258+
assertThat(receivedPrompts.get(0).description()).isEqualTo("Test Prompt Description");
259+
assertThat(receivedPrompts.get(0).arguments()).hasSize(1);
260+
assertThat(receivedPrompts.get(0).arguments().get(0).name()).isEqualTo("arg1");
261+
});
262+
263+
asyncMcpClient.closeGracefully();
264+
}
265+
174266
}

0 commit comments

Comments
 (0)