8787import java .util .concurrent .atomic .AtomicBoolean ;
8888import java .util .concurrent .atomic .AtomicReference ;
8989import java .util .function .BiFunction ;
90+ import java .util .function .BooleanSupplier ;
9091import java .util .function .Consumer ;
9192import java .util .function .Function ;
9293import java .util .function .LongConsumer ;
@@ -103,6 +104,7 @@ class StreamEnvironment implements Environment {
103104 private static final Logger LOGGER = LoggerFactory .getLogger (StreamEnvironment .class );
104105
105106 private final EventLoopGroup eventLoopGroup ;
107+ private final boolean privateEventLoopGroup ;
106108 private final ScheduledExecutorService scheduledExecutorService ;
107109 private final ScheduledExecutorService locatorReconnectionScheduledExecutorService ;
108110 private final boolean privateScheduleExecutorService ;
@@ -272,17 +274,16 @@ class StreamEnvironment implements Environment {
272274
273275 if (clientParametersPrototype .eventLoopGroup == null ) {
274276 this .eventLoopGroup = Utils .eventLoopGroup ();
277+ this .privateEventLoopGroup = true ;
275278 shutdownService .wrap (() -> closeEventLoopGroup (this .eventLoopGroup ));
276- this .clientParametersPrototype =
277- clientParametersPrototype .duplicate ().eventLoopGroup (this .eventLoopGroup );
278279 } else {
279- this .eventLoopGroup = null ;
280- this .clientParametersPrototype =
281- clientParametersPrototype
282- .duplicate ()
283- .eventLoopGroup (clientParametersPrototype .eventLoopGroup );
280+ this .eventLoopGroup = clientParametersPrototype .eventLoopGroup ;
281+ this .privateEventLoopGroup = false ;
284282 }
285283
284+ this .clientParametersPrototype =
285+ clientParametersPrototype .duplicate ().eventLoopGroup (this .eventLoopGroup );
286+
286287 this .producersCoordinator =
287288 new ProducersCoordinator (
288289 this ,
@@ -357,15 +358,15 @@ class StreamEnvironment implements Environment {
357358 this .locatorReconnectionScheduledExecutorService ,
358359 this .recoveryBackOffDelayPolicy ,
359360 l .label (),
360- this . closed );
361+ this :: shouldTryLocatorConnection );
361362 }
362363 });
363364 }
364365 };
365366 if (lazyInit ) {
366367 this .locatorInitializationSequence =
367368 () -> {
368- if (! this .closed . get ()) {
369+ if (this .shouldTryLocatorConnection ()) {
369370 locatorInitSequence .run ();
370371 }
371372 };
@@ -397,7 +398,7 @@ private ShutdownListener shutdownListener(
397398 shutdownContext -> {
398399 String label = locator .label ();
399400 LOGGER .debug ("Locator {} disconnected" , label );
400- if (shutdownContext .isShutdownUnexpected () && ! this . closed . get () ) {
401+ if (shutdownContext .isShutdownUnexpected ()) {
401402 locator .client (null );
402403 BackOffDelayPolicy delayPolicy = recoveryBackOffDelayPolicy ;
403404 LOGGER .debug (
@@ -415,7 +416,7 @@ private ShutdownListener shutdownListener(
415416 this .locatorReconnectionScheduledExecutorService ,
416417 delayPolicy ,
417418 label ,
418- this . closed );
419+ this :: shouldTryLocatorConnection );
419420 } else {
420421 LOGGER .debug ("Locator connection '{}' closing normally" , label );
421422 }
@@ -433,7 +434,7 @@ private static void scheduleLocatorConnection(
433434 ScheduledExecutorService scheduler ,
434435 BackOffDelayPolicy delayPolicy ,
435436 String locatorLabel ,
436- AtomicBoolean closed ) {
437+ BooleanSupplier shouldRetry ) {
437438 LOGGER .debug (
438439 "Scheduling locator '{}' connection with delay policy {}" , locatorLabel , delayPolicy );
439440 try {
@@ -460,7 +461,7 @@ private static void scheduleLocatorConnection(
460461 .description ("Locator '%s' connection" , locatorLabel )
461462 .scheduler (scheduler )
462463 .delayPolicy (delayPolicy )
463- .retry (ignored -> ! closed . get ())
464+ .retry (ignored -> shouldRetry . getAsBoolean ())
464465 .build ()
465466 .thenAccept (locator ::client )
466467 .exceptionally (
@@ -786,14 +787,19 @@ public void close() {
786787 if (this .locatorReconnectionScheduledExecutorService != null ) {
787788 this .locatorReconnectionScheduledExecutorService .shutdownNow ();
788789 }
789- closeEventLoopGroup (this .eventLoopGroup );
790+ if (this .privateEventLoopGroup ) {
791+ closeEventLoopGroup (this .eventLoopGroup );
792+ }
790793 }
791794 }
792795
796+ private boolean shouldTryLocatorConnection () {
797+ return !this .closed .get () && !this .eventLoopGroup .isShuttingDown ();
798+ }
799+
793800 private static void closeEventLoopGroup (EventLoopGroup eventLoopGroup ) {
794801 try {
795- if (eventLoopGroup != null
796- && (!eventLoopGroup .isShuttingDown () || !eventLoopGroup .isShutdown ())) {
802+ if (!eventLoopGroup .isShuttingDown ()) {
797803 LOGGER .debug ("Closing Netty event loop group" );
798804 eventLoopGroup .shutdownGracefully (1 , 10 , SECONDS ).get (10 , SECONDS );
799805 }
0 commit comments