Skip to content
This repository was archived by the owner on Feb 14, 2025. It is now read-only.

Commit 7c9cf46

Browse files
committed
Factor out the McpSession into an inteface with DefaultMcpSession impl.
1 parent 7abf118 commit 7c9cf46

File tree

8 files changed

+215
-339
lines changed

8 files changed

+215
-339
lines changed

mcp/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ The SDK follows a layered architecture:
102102
- **McpAsyncClient**: Async implementation using Project Reactor
103103
- **McpSyncClient**: Synchronous wrapper around the async client
104104
- **McpTransport**: Transport layer interface
105-
- **DefaultMcpTransport**: Base transport implementation
105+
- **AbstracttMcpTransport**: Abstract transport implementation
106106
- **StdioServerTransport**: Stdio-based server communication
107107

108108
### Key Interactions

mcp/docs/mcp-class-diagram.puml

Lines changed: 0 additions & 118 deletions
This file was deleted.

mcp/docs/mcp-sequence-diagram.puml

Lines changed: 0 additions & 105 deletions
This file was deleted.

mcp/src/main/java/spring/ai/experimental/mcp/client/McpAsyncClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,16 @@
55
import com.fasterxml.jackson.core.type.TypeReference;
66
import com.fasterxml.jackson.databind.ObjectMapper;
77
import reactor.core.publisher.Mono;
8-
import spring.ai.experimental.mcp.spec.McpSession;
8+
import spring.ai.experimental.mcp.spec.DefaultMcpSession;
99
import spring.ai.experimental.mcp.spec.McpTransport;
10+
import spring.ai.experimental.mcp.spec.McpError;
1011
import spring.ai.experimental.mcp.spec.McpSchema;
1112

1213
/**
1314
* @author Dariusz Jędrzejczyk
1415
* @author Christian Tzolov
1516
*/
16-
public class McpAsyncClient extends McpSession {
17+
public class McpAsyncClient extends DefaultMcpSession {
1718

1819
public McpAsyncClient(McpTransport transport) {
1920
this(transport, Duration.ofSeconds(10), new ObjectMapper());
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package spring.ai.experimental.mcp.spec;
2+
3+
import java.time.Duration;
4+
import java.util.Map;
5+
import java.util.UUID;
6+
import java.util.concurrent.ConcurrentHashMap;
7+
8+
import com.fasterxml.jackson.core.type.TypeReference;
9+
import com.fasterxml.jackson.databind.ObjectMapper;
10+
import reactor.core.publisher.Mono;
11+
import reactor.core.publisher.MonoSink;
12+
import spring.ai.experimental.mcp.client.util.Assert;
13+
14+
/**
15+
* Implementation of the MCP client session.
16+
*
17+
* @author Christian Tzolov
18+
* @author Dariusz Jędrzejczyk
19+
*/
20+
public class DefaultMcpSession implements McpSession {
21+
22+
private final ConcurrentHashMap<Object, MonoSink<McpSchema.JSONRPCResponse>> pendingResponses = new ConcurrentHashMap<>();
23+
24+
private final Duration requestTimeout;
25+
26+
private final ObjectMapper objectMapper;
27+
28+
private final McpTransport transport;
29+
30+
public DefaultMcpSession(Duration requestTimeout, ObjectMapper objectMapper, McpTransport transport) {
31+
32+
Assert.notNull(objectMapper, "The ObjectMapper can not be null");
33+
Assert.notNull(requestTimeout, "The requstTimeout can not be null");
34+
Assert.notNull(transport, "The transport can not be null");
35+
36+
this.requestTimeout = requestTimeout;
37+
this.objectMapper = objectMapper;
38+
this.transport = transport;
39+
40+
this.transport.setInboudMessageHandler(message -> {
41+
switch (message) {
42+
case McpSchema.JSONRPCResponse response -> {
43+
var sink = pendingResponses.remove(response.id());
44+
if (sink == null) {
45+
System.out.println("Unexpected response for unkown id " + response.id());
46+
} else {
47+
sink.success(response);
48+
}
49+
}
50+
case McpSchema.JSONRPCRequest request -> {
51+
System.out.println("Client does not yet support server requests");
52+
}
53+
case McpSchema.JSONRPCNotification notification -> {
54+
System.out.println("Notifications not yet supported");
55+
}
56+
}
57+
});
58+
59+
this.transport.setInboundErrorHandler(error -> System.out.println("Error received " + error));
60+
61+
this.transport.start();
62+
}
63+
64+
@Override
65+
public <T> Mono<T> sendRequest(String method, Object requestParams, TypeReference<T> typeRef) {
66+
String requestId = UUID.randomUUID().toString();
67+
68+
return Mono.<McpSchema.JSONRPCResponse>create(sink -> {
69+
this.pendingResponses.put(requestId, sink);
70+
McpSchema.JSONRPCRequest jsonrpcRequest = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, method,
71+
requestId, requestParams);
72+
try {
73+
// TODO: This is non-blocking, but it's actually a synchronous call,
74+
// perhaps there's no need to make it return Mono?
75+
this.transport.sendMessage(jsonrpcRequest)
76+
// TODO: It's most efficient to create a dedicated
77+
// Subscriber here
78+
.subscribe(v -> {
79+
}, e -> {
80+
this.pendingResponses.remove(requestId);
81+
sink.error(e);
82+
});
83+
} catch (Exception e) {
84+
sink.error(e);
85+
}
86+
}).timeout(this.requestTimeout).handle((jsonRpcResponse, s) -> {
87+
if (jsonRpcResponse.error() != null) {
88+
s.error(new McpError(jsonRpcResponse.error()));
89+
} else {
90+
if (typeRef.getType().getTypeName().equals("java.lang.Void")) {
91+
s.complete();
92+
} else {
93+
s.next(this.objectMapper.convertValue(jsonRpcResponse.result(), typeRef));
94+
}
95+
}
96+
});
97+
}
98+
99+
@Override
100+
public Mono<Void> sendNotification(String method, Map<String, Object> params) {
101+
McpSchema.JSONRPCNotification jsonrpcNotification = new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION,
102+
method, params);
103+
try {
104+
this.transport.sendMessage(jsonrpcNotification);
105+
} catch (Exception e) {
106+
return Mono.error(new McpError(e));
107+
}
108+
return Mono.empty();
109+
}
110+
111+
@Override
112+
public Mono<Void> closeGracefully(Duration timeout) {
113+
// TODO handle the timeout in transport
114+
return Mono.fromRunnable(this.transport::close);
115+
}
116+
117+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright 2024 - 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package spring.ai.experimental.mcp.spec;
17+
18+
public class McpError extends RuntimeException {
19+
20+
public McpError(Object error) {
21+
super(error.toString());
22+
}
23+
24+
}

0 commit comments

Comments
 (0)