@@ -118,6 +118,11 @@ private void handleIncomingMessages(Function<Mono<JSONRPCMessage>, Mono<JSONRPCM
118118 .flatMap (message -> Mono .just (message )
119119 .transform (inboundMessageHandler )
120120 .contextWrite (ctx -> ctx .put ("observation" , "myObservation" )))
121+ .doOnComplete (() -> {
122+ this .outboundSink .tryEmitComplete ();
123+ this .inboundScheduler .dispose ();
124+ this .outboundScheduler .dispose ();
125+ })
121126 .subscribe ();
122127 }
123128
@@ -184,51 +189,51 @@ private void startInboundProcessing() {
184189 */
185190 private void startOutboundProcessing () {
186191 Function <Flux <JSONRPCMessage >, Flux <JSONRPCMessage >> outboundConsumer = messages -> messages // @formatter:off
187- .doOnSubscribe (subscription -> outboundReady .tryEmitValue (null ))
188- .publishOn (outboundScheduler )
189- .handle ((message , sink ) -> {
190- if (message != null && !isClosing ) {
191- try {
192- String jsonMessage = objectMapper .writeValueAsString (message );
193- // Escape any embedded newlines in the JSON message as per spec
194- jsonMessage = jsonMessage .replace ("\r \n " , "\\ n" ).replace ("\n " , "\\ n" ).replace ("\r " , "\\ n" );
195-
196- synchronized (outputStream ) {
197- outputStream .write (jsonMessage .getBytes (StandardCharsets .UTF_8 ));
198- outputStream .write ("\n " .getBytes (StandardCharsets .UTF_8 ));
199- outputStream .flush ();
200- }
201- sink .next (message );
202- }
203- catch (IOException e ) {
204- if (!isClosing ) {
205- logger .error ("Error writing message" , e );
206- sink .error (new RuntimeException (e ));
207- }
208- else {
209- logger .debug ("Stream closed during shutdown" , e );
210- }
211- }
212- }
213- else if (isClosing ) {
214- sink .complete ();
215- }
216- })
217- .doOnComplete (() -> {
218- isClosing = true ;
219- outboundSink .tryEmitComplete ();
220- })
221- .doOnError (e -> {
222- if (!isClosing ) {
223- logger .error ("Error in outbound processing" , e );
224- isClosing = true ;
225- outboundSink .tryEmitComplete ();
226- }
227- })
228- .map (msg -> (JSONRPCMessage ) msg );
229-
230- outboundConsumer .apply (outboundSink .asFlux ()).subscribe ();
231- } // @formatter:on
192+ .doOnSubscribe (subscription -> outboundReady .tryEmitValue (null ))
193+ .publishOn (outboundScheduler )
194+ .handle ((message , sink ) -> {
195+ if (message != null && !isClosing ) {
196+ try {
197+ String jsonMessage = objectMapper .writeValueAsString (message );
198+ // Escape any embedded newlines in the JSON message as per spec
199+ jsonMessage = jsonMessage .replace ("\r \n " , "\\ n" ).replace ("\n " , "\\ n" ).replace ("\r " , "\\ n" );
200+
201+ synchronized (outputStream ) {
202+ outputStream .write (jsonMessage .getBytes (StandardCharsets .UTF_8 ));
203+ outputStream .write ("\n " .getBytes (StandardCharsets .UTF_8 ));
204+ outputStream .flush ();
205+ }
206+ sink .next (message );
207+ }
208+ catch (IOException e ) {
209+ if (!isClosing ) {
210+ logger .error ("Error writing message" , e );
211+ sink .error (new RuntimeException (e ));
212+ }
213+ else {
214+ logger .debug ("Stream closed during shutdown" , e );
215+ }
216+ }
217+ }
218+ else if (isClosing ) {
219+ sink .complete ();
220+ }
221+ })
222+ .doOnComplete (() -> {
223+ isClosing = true ;
224+ outboundSink .tryEmitComplete ();
225+ })
226+ .doOnError (e -> {
227+ if (!isClosing ) {
228+ logger .error ("Error in outbound processing" , e );
229+ isClosing = true ;
230+ outboundSink .tryEmitComplete ();
231+ }
232+ })
233+ .map (msg -> (JSONRPCMessage ) msg );
234+
235+ outboundConsumer .apply (outboundSink .asFlux ()).subscribe ();
236+ } // @formatter:on
232237
233238 @ Override
234239 public Mono <Void > closeGracefully () {
0 commit comments