@@ -122,42 +122,38 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport,
122122 // Observation associated with the individual message - it can be used to
123123 // create child Observation and emit it together with the message to the
124124 // consumer
125- this .connection = this .transport .connect (mono -> mono .doOnNext (message -> handle (message ).subscribe ()))
126- .subscribe ();
125+ this .connection = this .transport .connect (mono -> mono .doOnNext (this ::handle )).subscribe ();
127126 }
128127
129- public Mono <Void > handle (McpSchema .JSONRPCMessage message ) {
130- return Mono .defer (() -> {
131- if (message instanceof McpSchema .JSONRPCResponse response ) {
132- logger .debug ("Received Response: {}" , response );
133- var sink = pendingResponses .remove (response .id ());
134- if (sink == null ) {
135- logger .warn ("Unexpected response for unknown id {}" , response .id ());
136- }
137- else {
138- sink .success (response );
139- }
140- return Mono .empty ();
141- }
142- else if (message instanceof McpSchema .JSONRPCRequest request ) {
143- logger .debug ("Received request: {}" , request );
144- return handleIncomingRequest (request ).onErrorResume (error -> {
145- var errorResponse = new McpSchema .JSONRPCResponse (McpSchema .JSONRPC_VERSION , request .id (), null ,
146- new McpSchema .JSONRPCResponse .JSONRPCError (McpSchema .ErrorCodes .INTERNAL_ERROR ,
147- error .getMessage (), null ));
148- return this .transport .sendMessage (errorResponse ).then (Mono .empty ());
149- }).flatMap (this .transport ::sendMessage );
150- }
151- else if (message instanceof McpSchema .JSONRPCNotification notification ) {
152- logger .debug ("Received notification: {}" , notification );
153- return handleIncomingNotification (notification )
154- .doOnError (error -> logger .error ("Error handling notification: {}" , error .getMessage ()));
128+ private void handle (McpSchema .JSONRPCMessage message ) {
129+ if (message instanceof McpSchema .JSONRPCResponse response ) {
130+ logger .debug ("Received Response: {}" , response );
131+ var sink = pendingResponses .remove (response .id ());
132+ if (sink == null ) {
133+ logger .warn ("Unexpected response for unknown id {}" , response .id ());
155134 }
156135 else {
157- logger .warn ("Received unknown message type: {}" , message );
158- return Mono .empty ();
136+ sink .success (response );
159137 }
160- });
138+ }
139+ else if (message instanceof McpSchema .JSONRPCRequest request ) {
140+ logger .debug ("Received request: {}" , request );
141+ handleIncomingRequest (request ).onErrorResume (error -> {
142+ var errorResponse = new McpSchema .JSONRPCResponse (McpSchema .JSONRPC_VERSION , request .id (), null ,
143+ new McpSchema .JSONRPCResponse .JSONRPCError (McpSchema .ErrorCodes .INTERNAL_ERROR ,
144+ error .getMessage (), null ));
145+ return this .transport .sendMessage (errorResponse ).then (Mono .empty ());
146+ }).flatMap (this .transport ::sendMessage ).subscribe ();
147+ }
148+ else if (message instanceof McpSchema .JSONRPCNotification notification ) {
149+ logger .debug ("Received notification: {}" , notification );
150+ handleIncomingNotification (notification )
151+ .doOnError (error -> logger .error ("Error handling notification: {}" , error .getMessage ()))
152+ .subscribe ();
153+ }
154+ else {
155+ logger .warn ("Received unknown message type: {}" , message );
156+ }
161157 }
162158
163159 /**
0 commit comments