-
Notifications
You must be signed in to change notification settings - Fork 242
feat(mcp): add notification handlers support for async client #568
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
3ef7d5d
5b9582d
ae06223
b966d99
378b7b1
9a23888
e9df304
fa1e155
3e95f85
2d086ef
7c97c55
e99b485
e34c3fd
5d37bda
3839079
5733e57
a8292de
e72373d
20724d9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -19,17 +19,16 @@ | |||||||
| import io.modelcontextprotocol.spec.McpSchema; | ||||||||
| import java.util.List; | ||||||||
| import java.util.Map; | ||||||||
| import java.util.concurrent.atomic.AtomicReference; | ||||||||
| import org.slf4j.Logger; | ||||||||
| import org.slf4j.LoggerFactory; | ||||||||
| import reactor.core.publisher.Mono; | ||||||||
|
|
||||||||
| /** | ||||||||
| * Wrapper for asynchronous MCP clients using Project Reactor. | ||||||||
| * This implementation delegates to {@link McpAsyncClient} and provides | ||||||||
| * reactive operations that return Mono types. | ||||||||
| * Wrapper for asynchronous MCP clients using Project Reactor. This implementation delegates to | ||||||||
| * {@link McpAsyncClient} and provides reactive operations that return Mono types. | ||||||||
| * | ||||||||
| * <p>Example usage: | ||||||||
| * <pre>{@code | ||||||||
| * <p>Example usage: <pre>{@code | ||||||||
| * McpAsyncClient client = ... // created via McpClient.async() | ||||||||
| * McpAsyncClientWrapper wrapper = new McpAsyncClientWrapper("my-mcp", client); | ||||||||
| * wrapper.initialize() | ||||||||
|
|
@@ -41,7 +40,7 @@ public class McpAsyncClientWrapper extends McpClientWrapper { | |||||||
|
|
||||||||
| private static final Logger logger = LoggerFactory.getLogger(McpAsyncClientWrapper.class); | ||||||||
|
|
||||||||
| private final McpAsyncClient client; | ||||||||
| private final AtomicReference<McpAsyncClient> clientRef; | ||||||||
|
|
||||||||
| /** | ||||||||
| * Constructs a new asynchronous MCP client wrapper. | ||||||||
|
|
@@ -51,7 +50,32 @@ public class McpAsyncClientWrapper extends McpClientWrapper { | |||||||
| */ | ||||||||
| public McpAsyncClientWrapper(String name, McpAsyncClient client) { | ||||||||
| super(name); | ||||||||
| this.client = client; | ||||||||
| this.clientRef = new AtomicReference<>(client); | ||||||||
| } | ||||||||
|
|
||||||||
| /** | ||||||||
| * Sets the underlying MCP async client. This is called by McpClientBuilder after the client | ||||||||
| * is created with notification handlers. | ||||||||
| * | ||||||||
| * @param client the MCP async client | ||||||||
| */ | ||||||||
| void setClient(McpAsyncClient client) { | ||||||||
| this.clientRef.set(client); | ||||||||
| } | ||||||||
|
|
||||||||
| /** | ||||||||
| * Updates the cached tools map with new tools from the server. This method is called when the | ||||||||
| * server sends a tools/list_changed notification. | ||||||||
| * | ||||||||
| * @param tools the new list of tools from the server (empty list clears cache) | ||||||||
| */ | ||||||||
| void updateCachedTools(List<McpSchema.Tool> tools) { | ||||||||
| if (tools != null) { | ||||||||
| // Clear and rebuild cache | ||||||||
| cachedTools.clear(); | ||||||||
| tools.forEach(tool -> cachedTools.put(tool.name(), tool)); | ||||||||
| logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size()); | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| /** | ||||||||
|
|
@@ -68,6 +92,12 @@ public Mono<Void> initialize() { | |||||||
| return Mono.empty(); | ||||||||
| } | ||||||||
|
|
||||||||
| McpAsyncClient client = clientRef.get(); | ||||||||
| if (client == null) { | ||||||||
| return Mono.error( | ||||||||
| new IllegalStateException("McpAsyncClient not set. Call setClient() first.")); | ||||||||
|
||||||||
| return Mono.error( | |
| new IllegalStateException("McpAsyncClient not set. Call setClient() first.")); | |
| return Mono.error(new IllegalStateException("MCP client not available")); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,8 @@ | |
| import java.util.Map; | ||
| import java.util.function.Consumer; | ||
| import java.util.stream.Collectors; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
| import reactor.core.publisher.Mono; | ||
|
|
||
| /** | ||
|
|
@@ -78,6 +80,7 @@ public class McpClientBuilder { | |
|
|
||
| private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(120); | ||
| private static final Duration DEFAULT_INIT_TIMEOUT = Duration.ofSeconds(30); | ||
| private static final Logger logger = LoggerFactory.getLogger(McpClientBuilder.class); | ||
|
|
||
| private final String name; | ||
| private TransportConfig transportConfig; | ||
|
|
@@ -283,6 +286,10 @@ public McpClientBuilder initializationTimeout(Duration timeout) { | |
| /** | ||
| * Builds an asynchronous MCP client wrapper. | ||
| * | ||
| * <p>This method uses a two-phase build pattern to support notification handlers. | ||
| * The wrapper is created first, then the MCP client is built with notification consumers | ||
| * that can reference the wrapper. | ||
| * | ||
| * @return Mono emitting the async client wrapper | ||
| */ | ||
| public Mono<McpClientWrapper> buildAsync() { | ||
|
|
@@ -303,15 +310,76 @@ public Mono<McpClientWrapper> buildAsync() { | |
| McpSchema.ClientCapabilities clientCapabilities = | ||
| McpSchema.ClientCapabilities.builder().build(); | ||
|
|
||
| // ========== Phase 1: Create wrapper (client is temporarily null) ========== | ||
| McpAsyncClientWrapper wrapper = new McpAsyncClientWrapper(name, null); | ||
|
|
||
| // ========== Phase 2: Build client (can reference wrapper) ========== | ||
| McpAsyncClient mcpClient = | ||
| McpClient.async(transport) | ||
| .requestTimeout(requestTimeout) | ||
| .initializationTimeout(initializationTimeout) | ||
| .clientInfo(clientInfo) | ||
| .capabilities(clientCapabilities) | ||
|
|
||
| // ----- Log notification Consumer ----- | ||
| .loggingConsumer( | ||
| notification -> { | ||
| // Parse notification content | ||
| String level = | ||
| notification.level() != null | ||
| ? notification.level().toString() | ||
| : "info"; | ||
| String loggerName = | ||
| notification.logger() != null | ||
| ? notification.logger() | ||
| : "mcp"; | ||
| String data = | ||
| notification.data() != null | ||
| ? notification.data() | ||
| : ""; | ||
|
|
||
| // Log to SLF4J by level | ||
| switch (level.toLowerCase()) { | ||
| case "error" -> | ||
| logger.error( | ||
| "[MCP-{}] [{}] {}", | ||
| name, | ||
| loggerName, | ||
| data); | ||
| case "warning" -> | ||
| logger.warn( | ||
| "[MCP-{}] [{}] {}", | ||
| name, | ||
| loggerName, | ||
| data); | ||
| case "debug" -> | ||
| logger.debug( | ||
| "[MCP-{}] [{}] {}", | ||
| name, | ||
| loggerName, | ||
| data); | ||
| default -> | ||
| logger.info( | ||
| "[MCP-{}] [{}] {}", | ||
| name, | ||
|
||
| loggerName, | ||
| data); | ||
| } | ||
| return Mono.empty(); | ||
| }) | ||
|
|
||
| // ----- Tools change notification Consumer ----- | ||
| .toolsChangeConsumer( | ||
| tools -> { | ||
| // Call wrapper method to update cache | ||
| wrapper.updateCachedTools(tools); | ||
| return Mono.empty(); | ||
| }) | ||
| .build(); | ||
|
|
||
| return new McpAsyncClientWrapper(name, mcpClient); | ||
| // ========== Phase 3: Link MCP client to wrapper ========== | ||
| wrapper.setClient(mcpClient); | ||
| return wrapper; | ||
| }); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -19,18 +19,18 @@ | |||||||
| import io.modelcontextprotocol.spec.McpSchema; | ||||||||
| import java.util.List; | ||||||||
| import java.util.Map; | ||||||||
| import java.util.concurrent.atomic.AtomicReference; | ||||||||
| import org.slf4j.Logger; | ||||||||
| import org.slf4j.LoggerFactory; | ||||||||
| import reactor.core.publisher.Mono; | ||||||||
| import reactor.core.scheduler.Schedulers; | ||||||||
|
|
||||||||
| /** | ||||||||
| * Wrapper for synchronous MCP clients that converts blocking calls to reactive Mono types. | ||||||||
| * This implementation delegates to {@link McpSyncClient} and wraps blocking operations | ||||||||
| * in Reactor's boundedElastic scheduler to avoid blocking the event loop. | ||||||||
| * Wrapper for synchronous MCP clients that converts blocking calls to reactive Mono types. This | ||||||||
| * implementation delegates to {@link McpSyncClient} and wraps blocking operations in Reactor's | ||||||||
| * boundedElastic scheduler to avoid blocking the event loop. | ||||||||
| * | ||||||||
| * <p>Example usage: | ||||||||
| * <pre>{@code | ||||||||
| * <p>Example usage: <pre>{@code | ||||||||
| * McpSyncClient client = ... // created via McpClient.sync() | ||||||||
| * McpSyncClientWrapper wrapper = new McpSyncClientWrapper("my-mcp", client); | ||||||||
| * wrapper.initialize() | ||||||||
|
|
@@ -42,7 +42,7 @@ public class McpSyncClientWrapper extends McpClientWrapper { | |||||||
|
|
||||||||
| private static final Logger logger = LoggerFactory.getLogger(McpSyncClientWrapper.class); | ||||||||
|
|
||||||||
| private final McpSyncClient client; | ||||||||
| private final AtomicReference<McpSyncClient> clientRef; | ||||||||
|
|
||||||||
| /** | ||||||||
| * Constructs a new synchronous MCP client wrapper. | ||||||||
|
|
@@ -52,7 +52,7 @@ public class McpSyncClientWrapper extends McpClientWrapper { | |||||||
| */ | ||||||||
| public McpSyncClientWrapper(String name, McpSyncClient client) { | ||||||||
| super(name); | ||||||||
| this.client = client; | ||||||||
| this.clientRef = new AtomicReference<>(client); | ||||||||
| } | ||||||||
|
|
||||||||
| /** | ||||||||
|
|
@@ -70,10 +70,16 @@ public Mono<Void> initialize() { | |||||||
| return Mono.empty(); | ||||||||
| } | ||||||||
|
|
||||||||
| logger.info("Initializing MCP sync client: {}", name); | ||||||||
|
|
||||||||
| return Mono.fromCallable( | ||||||||
| () -> { | ||||||||
| McpSyncClient client = clientRef.get(); | ||||||||
| if (client == null) { | ||||||||
| throw new IllegalStateException( | ||||||||
| "McpSyncClient not set. Call setClient() first."); | ||||||||
|
||||||||
| throw new IllegalStateException( | |
| "McpSyncClient not set. Call setClient() first."); | |
| throw new IllegalStateException("MCP client not available"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
updateCachedTools()method performs a non-atomic clear-and-rebuild operation oncachedTools. Since this can be called asynchronously via notification handlers while other threads may be reading from the cache (e.g., viagetCachedTool()or during initialization), this creates a race condition. WhileConcurrentHashMapis thread-safe for individual operations, the clear-then-add pattern is not atomic. Consider synchronizing this method or using other concurrency patterns to ensure atomicity.