@@ -122,7 +122,12 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport,
122122 // Observation associated with the individual message - it can be used to
123123 // create child Observation and emit it together with the message to the
124124 // consumer
125- this .connection = this .transport .connect (mono -> mono .doOnNext (message -> {
125+ this .connection = this .transport .connect (mono -> mono .doOnNext (message -> handle (message ).subscribe ()))
126+ .subscribe ();
127+ }
128+
129+ public Mono <Void > handle (McpSchema .JSONRPCMessage message ) {
130+ return Mono .defer (() -> {
126131 if (message instanceof McpSchema .JSONRPCResponse response ) {
127132 logger .debug ("Received Response: {}" , response );
128133 var sink = pendingResponses .remove (response .id ());
@@ -132,23 +137,27 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport,
132137 else {
133138 sink .success (response );
134139 }
140+ return Mono .empty ();
135141 }
136142 else if (message instanceof McpSchema .JSONRPCRequest request ) {
137143 logger .debug ("Received request: {}" , request );
138- handleIncomingRequest (request ).subscribe (response -> transport .sendMessage (response ).subscribe (),
139- error -> {
140- var errorResponse = new McpSchema .JSONRPCResponse (McpSchema .JSONRPC_VERSION , request .id (),
141- null , new McpSchema .JSONRPCResponse .JSONRPCError (
142- McpSchema .ErrorCodes .INTERNAL_ERROR , error .getMessage (), null ));
143- transport .sendMessage (errorResponse ).subscribe ();
144- });
144+ return handleIncomingRequest (request ).onErrorResume (error -> {
145+ var errorResponse = new McpSchema .JSONRPCResponse (McpSchema .JSONRPC_VERSION , request .id (), null ,
146+ new McpSchema .JSONRPCResponse .JSONRPCError (McpSchema .ErrorCodes .INTERNAL_ERROR ,
147+ error .getMessage (), null ));
148+ return this .transport .sendMessage (errorResponse ).then (Mono .empty ());
149+ }).flatMap (this .transport ::sendMessage );
145150 }
146151 else if (message instanceof McpSchema .JSONRPCNotification notification ) {
147152 logger .debug ("Received notification: {}" , notification );
148- handleIncomingNotification (notification ). subscribe ( null ,
149- error -> logger .error ("Error handling notification: {}" , error .getMessage ()));
153+ return handleIncomingNotification (notification )
154+ . doOnError ( error -> logger .error ("Error handling notification: {}" , error .getMessage ()));
150155 }
151- })).subscribe ();
156+ else {
157+ logger .warn ("Received unknown message type: {}" , message );
158+ return Mono .empty ();
159+ }
160+ });
152161 }
153162
154163 /**
0 commit comments