1111import reactor .core .publisher .MonoSink ;
1212import spring .ai .mcp .spec .McpAsyncTransport ;
1313import spring .ai .mcp .spec .McpTransport ;
14+
15+ import org .springframework .util .Assert ;
16+
1417import spring .ai .mcp .spec .McpSchema ;
1518
1619public class McpAsyncSession {
1720
18- private final ConcurrentHashMap <Object , MonoSink <McpSchema .JSONRPCResponse >> pendingResponses
19- = new ConcurrentHashMap <>();
21+ private final ConcurrentHashMap <Object , MonoSink <McpSchema .JSONRPCResponse >> pendingResponses = new ConcurrentHashMap <>();
2022
2123 private final Duration requestTimeout ;
2224
2325 private final ObjectMapper objectMapper ;
2426
2527 private final McpAsyncTransport transport ;
2628
27- public McpAsyncSession (Duration requestTimeout ,
28- ObjectMapper objectMapper ,
29- McpAsyncTransport transport ) {
29+ public McpAsyncSession (Duration requestTimeout , ObjectMapper objectMapper , McpAsyncTransport transport ) {
30+
31+ Assert .notNull (requestTimeout , "Request timeout must not be null" );
32+ Assert .notNull (objectMapper , "ObjectMapper must not be null" );
33+ Assert .notNull (transport , "Transport must not be null" );
34+
3035 this .requestTimeout = requestTimeout ;
3136 this .objectMapper = objectMapper ;
3237 this .transport = transport ;
@@ -37,7 +42,8 @@ public McpAsyncSession(Duration requestTimeout,
3742 var sink = pendingResponses .remove (response .id ());
3843 if (sink == null ) {
3944 System .out .println ("Unexpected response for unkown id " + response .id ());
40- } else {
45+ }
46+ else {
4147 sink .success (response );
4248 }
4349 }
@@ -60,50 +66,53 @@ public <T> Mono<T> sendRequest(String method, Object requestParams, TypeReferenc
6066
6167 return Mono .<McpSchema .JSONRPCResponse >create (sink -> {
6268 this .pendingResponses .put (requestId , sink );
63- McpSchema .JSONRPCRequest jsonrpcRequest = new McpSchema .JSONRPCRequest (McpSchema .JSONRPC_VERSION , method , requestId , requestParams );
69+ McpSchema .JSONRPCRequest jsonrpcRequest = new McpSchema .JSONRPCRequest (McpSchema .JSONRPC_VERSION , method ,
70+ requestId , requestParams );
6471 try {
6572 // TODO: This is non-blocking, but it's actually a synchronous call,
6673 // perhaps there's no need to make it return Mono?
6774 this .transport .sendMessage (jsonrpcRequest )
68- // TODO: It's most efficient to create a dedicated
69- // Subscriber here
70- .subscribe (v -> {}, e -> {
71- this .pendingResponses .remove (requestId );
72- sink .error (e );
73- });
74- } catch (Exception e ) {
75+ // TODO: It's most efficient to create a dedicated
76+ // Subscriber here
77+ .subscribe (v -> {
78+ }, e -> {
79+ this .pendingResponses .remove (requestId );
80+ sink .error (e );
81+ });
82+ }
83+ catch (Exception e ) {
7584 sink .error (e );
7685 }
77- })
78- .timeout (this .requestTimeout )
79- .handle ((jsonRpcResponse , s ) -> {
80- if (jsonRpcResponse .error () != null ) {
81- s .error (new McpError (jsonRpcResponse .error ()));
82- } else {
83- if (typeRef .getType ().getTypeName ().equals ("java.lang.Void" )) {
84- s .complete ();
85- } else {
86- s .next (this .objectMapper .convertValue (jsonRpcResponse .result (),
87- typeRef ));
88- }
89- }
90- });
86+ }).timeout (this .requestTimeout ).handle ((jsonRpcResponse , s ) -> {
87+ if (jsonRpcResponse .error () != null ) {
88+ s .error (new McpError (jsonRpcResponse .error ()));
89+ }
90+ else {
91+ if (typeRef .getType ().getTypeName ().equals ("java.lang.Void" )) {
92+ s .complete ();
93+ }
94+ else {
95+ s .next (this .objectMapper .convertValue (jsonRpcResponse .result (), typeRef ));
96+ }
97+ }
98+ });
9199 }
92100
93101 public static class McpError extends RuntimeException {
94102
95103 public McpError (Object error ) {
96104 super (error .toString ());
97105 }
106+
98107 }
99108
100109 public Mono <Void > sendNotification (String method ) {
101110 return sendNotification (method , null );
102111 }
103112
104113 public Mono <Void > sendNotification (String method , Map <String , Object > params ) {
105- McpSchema .JSONRPCNotification
106- jsonrpcNotification = new McpSchema . JSONRPCNotification ( McpSchema . JSONRPC_VERSION , method , params );
114+ McpSchema .JSONRPCNotification jsonrpcNotification = new McpSchema . JSONRPCNotification ( McpSchema . JSONRPC_VERSION ,
115+ method , params );
107116 try {
108117 // TODO: make it non-blocking
109118 this .transport .sendMessage (jsonrpcNotification );
@@ -118,4 +127,5 @@ public Mono<Void> closeGracefully(Duration timeout) {
118127 // TODO handle the timeout in transport
119128 return Mono .fromRunnable (this .transport ::close );
120129 }
130+
121131}
0 commit comments