Skip to content

Commit 4609b8e

Browse files
author
wenhaozhao
committed
feat: support AsyncMcpTool
1 parent 4d40166 commit 4609b8e

File tree

3 files changed

+167
-165
lines changed

3 files changed

+167
-165
lines changed

core/src/main/java/com/google/adk/tools/mcp/McpAsyncTool.java

Lines changed: 95 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.google.adk.tools.mcp;
1818

19+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
20+
1921
import com.fasterxml.jackson.databind.ObjectMapper;
2022
import com.google.adk.JsonBaseModel;
2123
import com.google.adk.tools.BaseTool;
@@ -29,12 +31,9 @@
2931
import io.modelcontextprotocol.spec.McpSchema.Tool;
3032
import io.reactivex.rxjava3.core.Maybe;
3133
import io.reactivex.rxjava3.core.Single;
32-
3334
import java.util.Map;
3435
import java.util.Optional;
3536

36-
import static java.util.concurrent.TimeUnit.MILLISECONDS;
37-
3837
// TODO(b/413489523): Add support for auth. This is a TODO for Python as well.
3938

4039
/**
@@ -45,103 +44,106 @@
4544
*/
4645
public final class McpAsyncTool extends BaseTool {
4746

48-
Tool mcpTool;
49-
Single<McpAsyncClient> mcpSession;
50-
McpSessionManager mcpSessionManager;
51-
ObjectMapper objectMapper;
47+
Tool mcpTool;
48+
Single<McpAsyncClient> mcpSession;
49+
McpSessionManager mcpSessionManager;
50+
ObjectMapper objectMapper;
5251

53-
/**
54-
* Creates a new McpTool with the default ObjectMapper.
55-
*
56-
* @param mcpTool The MCP tool to wrap.
57-
* @param mcpSession The MCP session to use to call the tool.
58-
* @param mcpSessionManager The MCP session manager to use to create new sessions.
59-
* @throws IllegalArgumentException If mcpTool or mcpSession are null.
60-
*/
61-
public McpAsyncTool(Tool mcpTool, Single<McpAsyncClient> mcpSession, McpSessionManager mcpSessionManager) {
62-
this(mcpTool, mcpSession, mcpSessionManager, JsonBaseModel.getMapper());
63-
}
52+
/**
53+
* Creates a new McpTool with the default ObjectMapper.
54+
*
55+
* @param mcpTool The MCP tool to wrap.
56+
* @param mcpSession The MCP session to use to call the tool.
57+
* @param mcpSessionManager The MCP session manager to use to create new sessions.
58+
* @throws IllegalArgumentException If mcpTool or mcpSession are null.
59+
*/
60+
public McpAsyncTool(
61+
Tool mcpTool, Single<McpAsyncClient> mcpSession, McpSessionManager mcpSessionManager) {
62+
this(mcpTool, mcpSession, mcpSessionManager, JsonBaseModel.getMapper());
63+
}
6464

65-
/**
66-
* Creates a new McpTool with the default ObjectMapper.
67-
*
68-
* @param mcpTool The MCP tool to wrap.
69-
* @param mcpSession The MCP session to use to call the tool.
70-
* @param mcpSessionManager The MCP session manager to use to create new sessions.
71-
* @param objectMapper The ObjectMapper to use to convert JSON schemas.
72-
* @throws IllegalArgumentException If mcpTool or mcpSession are null.
73-
*/
74-
public McpAsyncTool(
75-
Tool mcpTool,
76-
Single<McpAsyncClient> mcpSession,
77-
McpSessionManager mcpSessionManager,
78-
ObjectMapper objectMapper) {
79-
super(
80-
mcpTool == null ? "" : mcpTool.name(),
81-
mcpTool == null ? "" : (mcpTool.description().isEmpty() ? "" : mcpTool.description()));
65+
/**
66+
* Creates a new McpTool with the default ObjectMapper.
67+
*
68+
* @param mcpTool The MCP tool to wrap.
69+
* @param mcpSession The MCP session to use to call the tool.
70+
* @param mcpSessionManager The MCP session manager to use to create new sessions.
71+
* @param objectMapper The ObjectMapper to use to convert JSON schemas.
72+
* @throws IllegalArgumentException If mcpTool or mcpSession are null.
73+
*/
74+
public McpAsyncTool(
75+
Tool mcpTool,
76+
Single<McpAsyncClient> mcpSession,
77+
McpSessionManager mcpSessionManager,
78+
ObjectMapper objectMapper) {
79+
super(
80+
mcpTool == null ? "" : mcpTool.name(),
81+
mcpTool == null ? "" : (mcpTool.description().isEmpty() ? "" : mcpTool.description()));
8282

83-
if (mcpTool == null) {
84-
throw new IllegalArgumentException("mcpTool cannot be null");
85-
}
86-
if (mcpSession == null) {
87-
throw new IllegalArgumentException("mcpSession cannot be null");
88-
}
89-
if (objectMapper == null) {
90-
throw new IllegalArgumentException("objectMapper cannot be null");
91-
}
92-
this.mcpTool = mcpTool;
93-
this.mcpSession = mcpSession;
94-
this.mcpSessionManager = mcpSessionManager;
95-
this.objectMapper = objectMapper;
83+
if (mcpTool == null) {
84+
throw new IllegalArgumentException("mcpTool cannot be null");
9685
}
97-
98-
public Single<McpAsyncClient> getMcpSession() {
99-
return this.mcpSession;
86+
if (mcpSession == null) {
87+
throw new IllegalArgumentException("mcpSession cannot be null");
10088
}
101-
102-
public Schema toGeminiSchema(JsonSchema openApiSchema) {
103-
return Schema.fromJson(objectMapper.valueToTree(openApiSchema).toString());
89+
if (objectMapper == null) {
90+
throw new IllegalArgumentException("objectMapper cannot be null");
10491
}
92+
this.mcpTool = mcpTool;
93+
this.mcpSession = mcpSession;
94+
this.mcpSessionManager = mcpSessionManager;
95+
this.objectMapper = objectMapper;
96+
}
10597

106-
private void reintializeSession() {
107-
this.mcpSession = this.mcpSessionManager.createAsyncSession();
108-
}
98+
public Single<McpAsyncClient> getMcpSession() {
99+
return this.mcpSession;
100+
}
109101

110-
@Override
111-
public Optional<FunctionDeclaration> declaration() {
112-
return Optional.of(
113-
FunctionDeclaration.builder()
114-
.name(this.name())
115-
.description(this.description())
116-
.parameters(toGeminiSchema(this.mcpTool.inputSchema()))
117-
.build());
118-
}
102+
public Schema toGeminiSchema(JsonSchema openApiSchema) {
103+
return Schema.fromJson(objectMapper.valueToTree(openApiSchema).toString());
104+
}
119105

120-
@Override
121-
public Single<Map<String, Object>> runAsync(Map<String, Object> args, ToolContext toolContext) {
122-
return Single.defer(() ->
123-
this.mcpSession.flatMapMaybe(client ->
124-
Maybe.fromCompletionStage(
125-
client.callTool(new CallToolRequest(this.name(), ImmutableMap.copyOf(args)))
126-
.toFuture()
127-
)
128-
).map(callResult -> McpTool.wrapCallResult(
129-
this.objectMapper, this.name(), callResult)
130-
).switchIfEmpty(
131-
Single.fromCallable(
132-
() -> McpTool.wrapCallResult(this.objectMapper, this.name(), null)
133-
)
134-
)
135-
)
136-
.retryWhen(
137-
errors ->
138-
errors
139-
.delay(100, MILLISECONDS)
140-
.take(3)
141-
.doOnNext(
142-
error -> {
143-
System.err.println("Retrying callTool due to: " + error);
144-
reintializeSession();
145-
}));
146-
}
106+
private void reintializeSession() {
107+
this.mcpSession = this.mcpSessionManager.createAsyncSession();
108+
}
109+
110+
@Override
111+
public Optional<FunctionDeclaration> declaration() {
112+
return Optional.of(
113+
FunctionDeclaration.builder()
114+
.name(this.name())
115+
.description(this.description())
116+
.parameters(toGeminiSchema(this.mcpTool.inputSchema()))
117+
.build());
118+
}
119+
120+
@Override
121+
public Single<Map<String, Object>> runAsync(Map<String, Object> args, ToolContext toolContext) {
122+
return Single.defer(
123+
() ->
124+
this.mcpSession
125+
.flatMapMaybe(
126+
client ->
127+
Maybe.fromCompletionStage(
128+
client
129+
.callTool(
130+
new CallToolRequest(this.name(), ImmutableMap.copyOf(args)))
131+
.toFuture()))
132+
.map(
133+
callResult ->
134+
McpTool.wrapCallResult(this.objectMapper, this.name(), callResult))
135+
.switchIfEmpty(
136+
Single.fromCallable(
137+
() -> McpTool.wrapCallResult(this.objectMapper, this.name(), null))))
138+
.retryWhen(
139+
errors ->
140+
errors
141+
.delay(100, MILLISECONDS)
142+
.take(3)
143+
.doOnNext(
144+
error -> {
145+
System.err.println("Retrying callTool due to: " + error);
146+
reintializeSession();
147+
}));
148+
}
147149
}

core/src/main/java/com/google/adk/tools/mcp/McpSessionManager.java

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,9 @@
2222
import io.modelcontextprotocol.spec.McpClientTransport;
2323
import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities;
2424
import io.modelcontextprotocol.spec.McpSchema.InitializeResult;
25+
import io.reactivex.rxjava3.core.Single;
2526
import java.time.Duration;
2627
import java.util.Optional;
27-
28-
import io.reactivex.rxjava3.core.Single;
2928
import org.slf4j.Logger;
3029
import org.slf4j.LoggerFactory;
3130

@@ -64,7 +63,8 @@ public static McpSyncClient initializeSession(
6463
}
6564
McpSyncClient client =
6665
McpClient.sync(transport)
67-
.initializationTimeout(Optional.ofNullable(initializationTimeout).orElse(Duration.ofSeconds(10)))
66+
.initializationTimeout(
67+
Optional.ofNullable(initializationTimeout).orElse(Duration.ofSeconds(10)))
6868
.requestTimeout(Optional.ofNullable(requestTimeout).orElse(Duration.ofSeconds(10)))
6969
.capabilities(ClientCapabilities.builder().build())
7070
.build();
@@ -82,7 +82,7 @@ public static Single<McpAsyncClient> initializeAsyncSession(Object connectionPar
8282
}
8383

8484
public static Single<McpAsyncClient> initializeAsyncSession(
85-
Object connectionParams, McpTransportBuilder transportBuilder) {
85+
Object connectionParams, McpTransportBuilder transportBuilder) {
8686
Duration initializationTimeout = null;
8787
Duration requestTimeout = null;
8888
McpClientTransport transport = transportBuilder.build(connectionParams);
@@ -91,21 +91,24 @@ public static Single<McpAsyncClient> initializeAsyncSession(
9191
requestTimeout = sseServerParams.sseReadTimeout();
9292
}
9393
McpAsyncClient client =
94-
McpClient.async(transport)
95-
.initializationTimeout(initializationTimeout == null ? Duration.ofSeconds(10) : initializationTimeout)
96-
.requestTimeout(requestTimeout == null ? Duration.ofSeconds(10) : requestTimeout)
97-
.capabilities(ClientCapabilities.builder().build())
98-
.build();
94+
McpClient.async(transport)
95+
.initializationTimeout(
96+
initializationTimeout == null ? Duration.ofSeconds(10) : initializationTimeout)
97+
.requestTimeout(requestTimeout == null ? Duration.ofSeconds(10) : requestTimeout)
98+
.capabilities(ClientCapabilities.builder().build())
99+
.build();
99100
return Single.fromCompletionStage(
100-
client.initialize()
101-
.doOnSuccess(initResult -> {
102-
logger.debug("Initialize McpAsyncClient Result: {}", initResult);
103-
})
104-
.doOnError(e -> {
105-
logger.error("Initialize McpAsyncClient Failed: {}", e.getMessage(), e);
106-
})
107-
.map(_initResult -> client)
108-
.toFuture()
109-
);
101+
client
102+
.initialize()
103+
.doOnSuccess(
104+
initResult -> {
105+
logger.debug("Initialize McpAsyncClient Result: {}", initResult);
106+
})
107+
.doOnError(
108+
e -> {
109+
logger.error("Initialize McpAsyncClient Failed: {}", e.getMessage(), e);
110+
})
111+
.map(_initResult -> client)
112+
.toFuture());
110113
}
111114
}

0 commit comments

Comments
 (0)