Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
*/
package io.agentscope.core.tool.mcp;

import io.agentscope.core.tool.mcp.task.DefaultTaskManager;
import io.agentscope.core.tool.mcp.task.ListTasksResult;
import io.agentscope.core.tool.mcp.task.Task;
import io.agentscope.core.tool.mcp.task.TaskManager;
import io.modelcontextprotocol.client.McpAsyncClient;
import io.modelcontextprotocol.spec.McpSchema;
import java.util.List;
Expand All @@ -28,37 +32,66 @@
* This implementation delegates to {@link McpAsyncClient} and provides
* reactive operations that return Mono types.
*
* <p>Example usage:
* <p>
* <strong>Task Support Status:</strong>
* <ul>
* <li>Task management infrastructure is available via
* {@link #getTaskManager()}</li>
* <li>⚠️ <strong>Current Limitation:</strong> Task operations will throw
* {@link UnsupportedOperationException} because the underlying MCP SDK
* (version 0.17.0) does not yet provide native task support</li>
* <li>The task infrastructure is ready and will be automatically enabled
* when a future MCP SDK version adds task support</li>
* <li>For custom task implementations, you can extend this class and override
* {@link #createTaskManager()} to provide your own
* {@link io.agentscope.core.tool.mcp.task.DefaultTaskManager.TaskOperations}</li>
* </ul>
*
* <p>
* Example usage:
*
* <pre>{@code
* McpAsyncClient client = ... // created via McpClient.async()
* McpAsyncClientWrapper wrapper = new McpAsyncClientWrapper("my-mcp", client);
* wrapper.initialize()
* .then(wrapper.callTool("tool_name", Map.of("arg1", "value1")))
* .subscribe(result -> System.out.println(result));
*
* // Task manager is available but operations will throw UnsupportedOperationException
* TaskManager taskManager = wrapper.getTaskManager();
* // taskManager.getTask("task-id").subscribe(...); // Will throw UnsupportedOperationException
* }</pre>
*
* @see McpClientWrapper#getTaskManager()
* @see io.agentscope.core.tool.mcp.task.TaskManager
*/
public class McpAsyncClientWrapper extends McpClientWrapper {

private static final Logger logger = LoggerFactory.getLogger(McpAsyncClientWrapper.class);

private final McpAsyncClient client;
private final TaskManager taskManager;

/**
* Constructs a new asynchronous MCP client wrapper.
*
* @param name unique identifier for this client
* @param name unique identifier for this client
* @param client the underlying async MCP client
*/
public McpAsyncClientWrapper(String name, McpAsyncClient client) {
super(name);
this.client = client;
this.taskManager = createTaskManager();
}

/**
* Initializes the async MCP client connection and caches available tools.
*
* <p>This method connects to the MCP server, discovers available tools, and caches them for
* later use. If already initialized, this method returns immediately without re-initializing.
* <p>
* This method connects to the MCP server, discovers available tools, and caches
* them for
* later use. If already initialized, this method returns immediately without
* re-initializing.
*
* @return a Mono that completes when initialization is finished
*/
Expand Down Expand Up @@ -95,7 +128,9 @@ public Mono<Void> initialize() {
/**
* Lists all tools available from the MCP server.
*
* <p>This method queries the MCP server for its current list of tools. The client must be
* <p>
* This method queries the MCP server for its current list of tools. The client
* must be
* initialized before calling this method.
*
* @return a Mono emitting the list of available tools
Expand All @@ -114,10 +149,12 @@ public Mono<List<McpSchema.Tool>> listTools() {
/**
* Invokes a tool on the MCP server asynchronously.
*
* <p>This method sends a tool call request to the MCP server and returns the result
* <p>
* This method sends a tool call request to the MCP server and returns the
* result
* asynchronously. The client must be initialized before calling this method.
*
* @param toolName the name of the tool to call
* @param toolName the name of the tool to call
* @param arguments the arguments to pass to the tool
* @return a Mono emitting the tool call result (may contain error information)
* @throws IllegalStateException if the client is not initialized
Expand Down Expand Up @@ -153,11 +190,73 @@ public Mono<McpSchema.CallToolResult> callTool(String toolName, Map<String, Obje
e.getMessage()));
}

/**
* Gets the task manager for this MCP client.
*
* @return the task manager instance
*/
@Override
public TaskManager getTaskManager() {
return taskManager;
}

/**
* Creates a task manager for this client.
*
* <p>
* This method creates a DefaultTaskManager with task operations that delegate
* to the underlying MCP client. Note that task support depends on the MCP SDK
* version and server capabilities.
*
* @return a new TaskManager instance
*/
private TaskManager createTaskManager() {
DefaultTaskManager.TaskOperations operations =
new DefaultTaskManager.TaskOperations() {
@Override
public Mono<Task> getTask(String taskId) {
// TODO: Implement when MCP SDK supports tasks/get
return Mono.error(
new UnsupportedOperationException(
"Task operations not yet supported by MCP SDK"));
}

@Override
public Mono<Object> getTaskResult(String taskId) {
// TODO: Implement when MCP SDK supports tasks/result
return Mono.error(
new UnsupportedOperationException(
"Task operations not yet supported by MCP SDK"));
}

@Override
public Mono<ListTasksResult> listTasks(String cursor) {
// TODO: Implement when MCP SDK supports tasks/list
return Mono.error(
new UnsupportedOperationException(
"Task operations not yet supported by MCP SDK"));
}

@Override
public Mono<Task> cancelTask(String taskId) {
// TODO: Implement when MCP SDK supports tasks/cancel
return Mono.error(
new UnsupportedOperationException(
"Task operations not yet supported by MCP SDK"));
}
};

return new DefaultTaskManager(name, operations);
}

/**
* Closes the MCP client connection and releases all resources.
*
* <p>This method attempts to close the client gracefully, falling back to forceful closure if
* graceful closure fails. This method is idempotent and can be called multiple times safely.
* <p>
* This method attempts to close the client gracefully, falling back to forceful
* closure if
* graceful closure fails. This method is idempotent and can be called multiple
* times safely.
*/
@Override
public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.agentscope.core.tool.mcp;

import io.agentscope.core.tool.mcp.task.TaskManager;
import io.modelcontextprotocol.spec.McpSchema;
import java.util.List;
import java.util.Map;
Expand All @@ -24,14 +25,17 @@
/**
* Abstract wrapper for MCP (Model Context Protocol) clients.
* This class manages the lifecycle of MCP client connections and provides
* a unified interface for both asynchronous and synchronous client implementations.
* a unified interface for both asynchronous and synchronous client
* implementations.
*
* <p>The wrapper handles:
* <p>
* The wrapper handles:
* <ul>
* <li>Client initialization and connection management</li>
* <li>Tool discovery and caching</li>
* <li>Tool invocation through the MCP protocol</li>
* <li>Resource cleanup on close</li>
* <li>Client initialization and connection management</li>
* <li>Tool discovery and caching</li>
* <li>Tool invocation through the MCP protocol</li>
* <li>Task management for long-running operations</li>
* <li>Resource cleanup on close</li>
* </ul>
*
* @see McpAsyncClientWrapper
Expand Down Expand Up @@ -94,7 +98,7 @@ public boolean isInitialized() {
/**
* Invokes a tool on the MCP server.
*
* @param toolName the name of the tool to call
* @param toolName the name of the tool to call
* @param arguments the arguments to pass to the tool
* @return a Mono emitting the tool call result
*/
Expand All @@ -111,6 +115,18 @@ public McpSchema.Tool getCachedTool(String toolName) {
return cachedTools.get(toolName);
}

/**
* Gets the task manager for this MCP client.
*
* <p>
* The task manager provides methods for managing long-running operations,
* including retrieving task status, getting results, listing tasks, and
* cancelling tasks.
*
* @return the task manager instance
*/
public abstract TaskManager getTaskManager();

/**
* Closes this MCP client and releases all resources.
* This method is idempotent and can be called multiple times safely.
Expand Down
Loading
Loading