@@ -171,9 +171,18 @@ private void startInboundProcessing() {
171171 }
172172 }
173173 catch (IOException e ) {
174- if (!isClosing ) {
174+ // Check isClosing before the error occurs to properly categorize it
175+ boolean wasClosing = isClosing ;
176+ isClosing = true ;
177+ if (!wasClosing && e .getMessage ().equals ("Pipe closed" )) {
178+ logger .debug ("Stream closed during shutdown" , e );
179+ }
180+ else if (!wasClosing ) {
175181 logger .error ("Error reading from stdin" , e );
176182 }
183+ else {
184+ logger .debug ("Stream error during shutdown" , e );
185+ }
177186 }
178187 finally {
179188 isClosing = true ;
@@ -209,6 +218,9 @@ private void startOutboundProcessing() {
209218 logger .error ("Error writing message" , e );
210219 sink .error (new RuntimeException (e ));
211220 }
221+ else {
222+ logger .debug ("Stream closed during shutdown" , e );
223+ }
212224 }
213225 }
214226 else if (isClosing ) {
@@ -236,35 +248,46 @@ public Mono<Void> closeGracefully() {
236248 return Mono .fromRunnable (() -> {
237249 isClosing = true ;
238250 logger .debug ("Initiating graceful shutdown" );
239- })
240- // .then(Mono.defer(() -> {
241- // inboundSink.tryEmitComplete();
242- // outboundSink.tryEmitComplete();
243- // return Mono.delay(Duration.ofMillis(100));
244- // }))
245-
246- .then (Mono .fromRunnable (() -> {
247- try {
248-
249- inboundScheduler .dispose ();
250- outboundScheduler .dispose ();
251+ }).then (Mono .defer (() -> {
252+ // First complete the sinks to stop processing
253+ inboundSink .tryEmitComplete ();
254+ outboundSink .tryEmitComplete ();
255+ return Mono .delay (Duration .ofMillis (100 ));
256+ })).then (Mono .fromRunnable (() -> {
257+ try {
258+ // Dispose schedulers first
259+ inboundScheduler .dispose ();
260+ outboundScheduler .dispose ();
261+
262+ // Wait for schedulers to terminate
263+ if (!inboundScheduler .isDisposed ()) {
264+ inboundScheduler .disposeGracefully ().block (Duration .ofSeconds (5 ));
265+ }
266+ if (!outboundScheduler .isDisposed ()) {
267+ outboundScheduler .disposeGracefully ().block (Duration .ofSeconds (5 ));
268+ }
251269
252- // Wait for schedulers to terminate
253- if (!inboundScheduler .isDisposed ()) {
254- inboundScheduler .disposeGracefully ().block (Duration .ofSeconds (5 ));
270+ // Only after schedulers are disposed, close the streams
271+ try {
272+ if (inputStream != System .in ) {
273+ inputStream .close ();
255274 }
256- if (!outboundScheduler .isDisposed ()) {
257- outboundScheduler .disposeGracefully ().block (Duration .ofSeconds (5 ));
275+ if (outputStream != System .out ) {
276+ outputStream .flush ();
277+ outputStream .close ();
258278 }
259-
260- logger .info ("Graceful shutdown completed" );
261279 }
262- catch (Exception e ) {
263- logger .error ("Error during graceful shutdown" , e );
280+ catch (IOException e ) {
281+ // Log but don't throw since we're shutting down
282+ logger .debug ("Error closing streams during shutdown" , e );
264283 }
265- }))
266- .then ()
267- .subscribeOn (Schedulers .boundedElastic ());
284+
285+ logger .info ("Graceful shutdown completed" );
286+ }
287+ catch (Exception e ) {
288+ logger .error ("Error during graceful shutdown" , e );
289+ }
290+ })).then ().subscribeOn (Schedulers .boundedElastic ());
268291 }
269292
270293 @ Override
0 commit comments