Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@
import io.modelcontextprotocol.spec.McpSchema;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/**
* Wrapper for asynchronous MCP clients using Project Reactor.
* This implementation delegates to {@link McpAsyncClient} and provides
* reactive operations that return Mono types.
* Wrapper for asynchronous MCP clients using Project Reactor. This implementation delegates to
* {@link McpAsyncClient} and provides reactive operations that return Mono types.
*
* <p>Example usage:
* <pre>{@code
* <p>Example usage: <pre>{@code
* McpAsyncClient client = ... // created via McpClient.async()
* McpAsyncClientWrapper wrapper = new McpAsyncClientWrapper("my-mcp", client);
* wrapper.initialize()
Expand All @@ -41,7 +42,7 @@ public class McpAsyncClientWrapper extends McpClientWrapper {

private static final Logger logger = LoggerFactory.getLogger(McpAsyncClientWrapper.class);

private final McpAsyncClient client;
private final AtomicReference<McpAsyncClient> clientRef;

/**
* Constructs a new asynchronous MCP client wrapper.
Expand All @@ -51,7 +52,34 @@ public class McpAsyncClientWrapper extends McpClientWrapper {
*/
public McpAsyncClientWrapper(String name, McpAsyncClient client) {
super(name);
this.client = client;
this.clientRef = new AtomicReference<>(client);
}

/**
* Sets the underlying MCP async client. This is called by McpClientBuilder after the client
* is created with notification handlers.
*
* @param client the MCP async client
*/
void setClient(McpAsyncClient client) {
this.clientRef.set(client);
}

/**
* Updates the cached tools map with new tools from the server. This method is called when the
* server sends a tools/list_changed notification. This method is thread-safe and can be
* called concurrently from notification handlers.
*
* @param tools the new list of tools from the server (empty list clears cache)
*/
void updateCachedTools(List<McpSchema.Tool> tools) {
if (tools != null) {
// Build new map first, then atomically replace via volatile assignment
Map<String, McpSchema.Tool> newTools =
tools.stream().collect(Collectors.toMap(McpSchema.Tool::name, t -> t));
cachedTools = new ConcurrentHashMap<>(newTools);
logger.info("[MCP-{}] Updated cached tools, total: {}", name, tools.size());
}
}

/**
Expand All @@ -68,6 +96,11 @@ public Mono<Void> initialize() {
return Mono.empty();
}

McpAsyncClient client = clientRef.get();
if (client == null) {
return Mono.error(new IllegalStateException("MCP client '" + name + "' not available"));
}

logger.info("Initializing MCP async client: {}", name);

return client.initialize()
Expand All @@ -84,8 +117,12 @@ public Mono<Void> initialize() {
"MCP client '{}' discovered {} tools",
name,
result.tools().size());
// Cache all tools
result.tools().forEach(tool -> cachedTools.put(tool.name(), tool));
// Cache all tools - build new map then atomically replace
Map<String, McpSchema.Tool> newTools =
result.tools().stream()
.collect(
Collectors.toMap(McpSchema.Tool::name, t -> t));
cachedTools = new ConcurrentHashMap<>(newTools);
})
.doOnSuccess(v -> initialized = true)
.doOnError(e -> logger.error("Failed to initialize MCP client: {}", name, e))
Expand All @@ -99,7 +136,6 @@ public Mono<Void> initialize() {
* initialized before calling this method.
*
* @return a Mono emitting the list of available tools
* @throws IllegalStateException if the client is not initialized
*/
@Override
public Mono<List<McpSchema.Tool>> listTools() {
Expand All @@ -108,6 +144,11 @@ public Mono<List<McpSchema.Tool>> listTools() {
new IllegalStateException("MCP client '" + name + "' not initialized"));
}

McpAsyncClient client = clientRef.get();
if (client == null) {
return Mono.error(new IllegalStateException("MCP client '" + name + "' not available"));
}

return client.listTools().map(McpSchema.ListToolsResult::tools);
}

Expand All @@ -120,7 +161,6 @@ public Mono<List<McpSchema.Tool>> listTools() {
* @param toolName the name of the tool to call
* @param arguments the arguments to pass to the tool
* @return a Mono emitting the tool call result (may contain error information)
* @throws IllegalStateException if the client is not initialized
*/
@Override
public Mono<McpSchema.CallToolResult> callTool(String toolName, Map<String, Object> arguments) {
Expand All @@ -129,6 +169,11 @@ public Mono<McpSchema.CallToolResult> callTool(String toolName, Map<String, Obje
new IllegalStateException("MCP client '" + name + "' not initialized"));
}

McpAsyncClient client = clientRef.get();
if (client == null) {
return Mono.error(new IllegalStateException("MCP client '" + name + "' not available"));
}

logger.debug("Calling MCP tool '{}' on client '{}'", toolName, name);

McpSchema.CallToolRequest request = new McpSchema.CallToolRequest(toolName, arguments);
Expand Down Expand Up @@ -161,19 +206,20 @@ public Mono<McpSchema.CallToolResult> callTool(String toolName, Map<String, Obje
*/
@Override
public void close() {
if (client != null) {
McpAsyncClient toClose = clientRef.getAndSet(null);
if (toClose != null) {
logger.info("Closing MCP async client: {}", name);
try {
client.closeGracefully()
toClose.closeGracefully()
.doOnSuccess(v -> logger.debug("MCP client '{}' closed", name))
.doOnError(e -> logger.error("Error closing MCP client '{}'", name, e))
.block();
} catch (Exception e) {
logger.error("Exception during MCP client close", e);
client.close();
toClose.close();
}
}
initialized = false;
cachedTools.clear();
cachedTools = new ConcurrentHashMap<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/**
Expand Down Expand Up @@ -78,6 +80,7 @@ public class McpClientBuilder {

private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(120);
private static final Duration DEFAULT_INIT_TIMEOUT = Duration.ofSeconds(30);
private static final Logger logger = LoggerFactory.getLogger(McpClientBuilder.class);

private final String name;
private TransportConfig transportConfig;
Expand Down Expand Up @@ -283,6 +286,10 @@ public McpClientBuilder initializationTimeout(Duration timeout) {
/**
* Builds an asynchronous MCP client wrapper.
*
* <p>This method uses a two-phase build pattern to support notification handlers.
* The wrapper is created first, then the MCP client is built with notification consumers
* that can reference the wrapper.
*
* @return Mono emitting the async client wrapper
*/
public Mono<McpClientWrapper> buildAsync() {
Expand All @@ -303,21 +310,86 @@ public Mono<McpClientWrapper> buildAsync() {
McpSchema.ClientCapabilities clientCapabilities =
McpSchema.ClientCapabilities.builder().build();

// ========== Phase 1: Create wrapper (client is temporarily null) ==========
McpAsyncClientWrapper wrapper = new McpAsyncClientWrapper(name, null);

// ========== Phase 2: Build client (can reference wrapper) ==========
McpAsyncClient mcpClient =
McpClient.async(transport)
.requestTimeout(requestTimeout)
.initializationTimeout(initializationTimeout)
.clientInfo(clientInfo)
.capabilities(clientCapabilities)

// ----- Log notification Consumer -----
.loggingConsumer(
notification -> {
// Parse notification content
String level =
notification.level() != null
? notification.level().toString()
: "info";
String loggerName =
notification.logger() != null
? notification.logger()
: "mcp";
String data =
notification.data() != null
? notification.data()
: "";

// Log to SLF4J by level
switch (level.toLowerCase()) {
case "error" ->
logger.error(
"[MCP-{}] [{}] {}",
name,
loggerName,
data);
case "warning" ->
logger.warn(
"[MCP-{}] [{}] {}",
name,
loggerName,
data);
case "debug" ->
logger.debug(
"[MCP-{}] [{}] {}",
name,
loggerName,
data);
default ->
logger.info(
"[MCP-{}] [{}] {}",
name,
Comment on lines +341 to +364
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buildSync() method does not implement notification handlers like buildAsync() does. For consistency and feature parity, synchronous clients should also support logging and tools change notifications. Consider implementing a similar two-phase build pattern for buildSync() with notification consumers.

Copilot uses AI. Check for mistakes.
loggerName,
data);
}
return Mono.empty();
})

// ----- Tools change notification Consumer -----
.toolsChangeConsumer(
tools -> {
// Call wrapper method to update cache
wrapper.updateCachedTools(tools);
return Mono.empty();
})
.build();

return new McpAsyncClientWrapper(name, mcpClient);
// ========== Phase 3: Link MCP client to wrapper ==========
wrapper.setClient(mcpClient);
return wrapper;
});
}

/**
* Builds a synchronous MCP client wrapper (blocking operations).
*
* <p>This method uses a two-phase build pattern to support notification handlers. The wrapper
* is created first, then the MCP client is built with notification consumers that can
* reference the wrapper.
*
* @return synchronous client wrapper
*/
public McpClientWrapper buildSync() {
Expand All @@ -334,15 +406,59 @@ public McpClientWrapper buildSync() {
McpSchema.ClientCapabilities clientCapabilities =
McpSchema.ClientCapabilities.builder().build();

// ========== Phase 1: Create wrapper (client is temporarily null) ==========
McpSyncClientWrapper wrapper = new McpSyncClientWrapper(name, null);

// ========== Phase 2: Build client (can reference wrapper) ==========
McpSyncClient mcpClient =
McpClient.sync(transport)
.requestTimeout(requestTimeout)
.initializationTimeout(initializationTimeout)
.clientInfo(clientInfo)
.capabilities(clientCapabilities)
// ----- Log notification Consumer -----
.loggingConsumer(
notification -> {
// Parse notification content
String level =
notification.level() != null
? notification.level().toString()
: "info";
String loggerName =
notification.logger() != null
? notification.logger()
: "mcp";
String data =
notification.data() != null ? notification.data() : "";

// Log to SLF4J by level
switch (level.toLowerCase()) {
case "error" ->
logger.error(
"[MCP-{}] [{}] {}", name, loggerName, data);
case "warning" ->
logger.warn(
"[MCP-{}] [{}] {}", name, loggerName, data);
case "debug" ->
logger.debug(
"[MCP-{}] [{}] {}", name, loggerName, data);
default ->
logger.info(
"[MCP-{}] [{}] {}", name, loggerName, data);
}
})

// ----- Tools change notification Consumer -----
.toolsChangeConsumer(
tools -> {
// Call wrapper method to update cache
wrapper.updateCachedTools(tools);
})
.build();

return new McpSyncClientWrapper(name, mcpClient);
// ========== Phase 3: Link MCP client to wrapper ==========
wrapper.setClient(mcpClient);
return wrapper;
}

// ==================== Internal Transport Configuration Classes ====================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public abstract class McpClientWrapper implements AutoCloseable {
protected final String name;

/** Cache of tools available from this MCP server */
protected final Map<String, McpSchema.Tool> cachedTools;
protected volatile Map<String, McpSchema.Tool> cachedTools;

/** Flag indicating whether the client has been initialized */
protected volatile boolean initialized = false;
Expand Down
Loading
Loading