@@ -62,8 +62,6 @@ public class McpClientSession implements McpSession {
6262 /** Atomic counter for generating unique request IDs */
6363 private final AtomicLong requestCounter = new AtomicLong (0 );
6464
65- private final Disposable connection ;
66-
6765 /**
6866 * Functional interface for handling incoming JSON-RPC requests. Implementations
6967 * should process the request parameters and return a response.
@@ -108,7 +106,7 @@ public interface NotificationHandler {
108106 public McpClientSession (Duration requestTimeout , McpClientTransport transport ,
109107 Map <String , RequestHandler <?>> requestHandlers , Map <String , NotificationHandler > notificationHandlers ) {
110108
111- Assert .notNull (requestTimeout , "The requstTimeout can not be null" );
109+ Assert .notNull (requestTimeout , "The requestTimeout can not be null" );
112110 Assert .notNull (transport , "The transport can not be null" );
113111 Assert .notNull (requestHandlers , "The requestHandlers can not be null" );
114112 Assert .notNull (notificationHandlers , "The notificationHandlers can not be null" );
@@ -123,33 +121,41 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport,
123121 // Observation associated with the individual message - it can be used to
124122 // create child Observation and emit it together with the message to the
125123 // consumer
126- this .connection = this .transport .connect (mono -> mono .doOnNext (message -> {
124+ this .transport .connect (mono -> mono .doOnNext (this ::handle ));
125+ }
126+
127+ public Mono <Void > handle (McpSchema .JSONRPCMessage message ) {
128+ return Mono .defer (() -> {
127129 if (message instanceof McpSchema .JSONRPCResponse response ) {
128130 logger .debug ("Received Response: {}" , response );
129131 var sink = pendingResponses .remove (response .id ());
130132 if (sink == null ) {
131- logger .warn ("Unexpected response for unkown id {}" , response .id ());
133+ logger .warn ("Unexpected response for unknown id {}" , response .id ());
132134 }
133135 else {
134136 sink .success (response );
135137 }
138+ return Mono .empty ();
136139 }
137140 else if (message instanceof McpSchema .JSONRPCRequest request ) {
138141 logger .debug ("Received request: {}" , request );
139- handleIncomingRequest (request ). flatMap ( transport :: sendMessage ).onErrorResume (error -> {
142+ return handleIncomingRequest (request ).onErrorResume (error -> {
140143 var errorResponse = new McpSchema .JSONRPCResponse (McpSchema .JSONRPC_VERSION , request .id (), null ,
141144 new McpSchema .JSONRPCResponse .JSONRPCError (McpSchema .ErrorCodes .INTERNAL_ERROR ,
142145 error .getMessage (), null ));
143- return transport .sendMessage (errorResponse );
144- }).subscribe ();
145-
146+ return this .transport .sendMessage (errorResponse ).then (Mono .empty ());
147+ }).flatMap (this .transport ::sendMessage );
146148 }
147149 else if (message instanceof McpSchema .JSONRPCNotification notification ) {
148150 logger .debug ("Received notification: {}" , notification );
149- handleIncomingNotification (notification ). subscribe ( null ,
150- error -> logger .error ("Error handling notification: {}" , error .getMessage ()));
151+ return handleIncomingNotification (notification )
152+ . doOnError ( error -> logger .error ("Error handling notification: {}" , error .getMessage ()));
151153 }
152- })).subscribe ();
154+ else {
155+ logger .warn ("Received unknown message type: {}" , message );
156+ return Mono .empty ();
157+ }
158+ });
153159 }
154160
155161 /**
@@ -271,18 +277,14 @@ public Mono<Void> sendNotification(String method, Map<String, Object> params) {
271277 */
272278 @ Override
273279 public Mono <Void > closeGracefully () {
274- return Mono .defer (() -> {
275- this .connection .dispose ();
276- return transport .closeGracefully ();
277- });
280+ return Mono .defer (transport ::closeGracefully );
278281 }
279282
280283 /**
281284 * Closes the session immediately, potentially interrupting pending operations.
282285 */
283286 @ Override
284287 public void close () {
285- this .connection .dispose ();
286288 transport .close ();
287289 }
288290
0 commit comments