@@ -204,122 +204,136 @@ class StreamEnvironment implements Environment {
204204 .collect (toList ());
205205 this .locators = List .copyOf (lctrs );
206206
207- this .executorServiceFactory =
208- new DefaultExecutorServiceFactory (
209- this .addresses .size (), 1 , "rabbitmq-stream-locator-connection-" );
210-
211- if (clientParametersPrototype .eventLoopGroup == null ) {
212- this .eventLoopGroup = Utils .eventLoopGroup ();
213- this .clientParametersPrototype =
214- clientParametersPrototype .duplicate ().eventLoopGroup (this .eventLoopGroup );
215- } else {
216- this .eventLoopGroup = null ;
217- this .clientParametersPrototype =
218- clientParametersPrototype
219- .duplicate ()
220- .eventLoopGroup (clientParametersPrototype .eventLoopGroup );
221- }
222- ScheduledExecutorService executorService ;
223- if (scheduledExecutorService == null ) {
224- int threads = AVAILABLE_PROCESSORS ;
225- LOGGER .debug ("Creating scheduled executor service with {} thread(s)" , threads );
226- ThreadFactory threadFactory = threadFactory ("rabbitmq-stream-environment-scheduler-" );
227- executorService = Executors .newScheduledThreadPool (threads , threadFactory );
228- this .privateScheduleExecutorService = true ;
229- } else {
230- executorService = scheduledExecutorService ;
231- this .privateScheduleExecutorService = false ;
232- }
233- this .scheduledExecutorService = executorService ;
234-
235- this .producersCoordinator =
236- new ProducersCoordinator (
237- this ,
238- maxProducersByConnection ,
239- maxTrackingConsumersByConnection ,
240- connectionNamingStrategy ,
241- coordinatorClientFactory (this , producerNodeRetryDelay ),
242- forceLeaderForProducers );
243- this .consumersCoordinator =
244- new ConsumersCoordinator (
245- this ,
246- maxConsumersByConnection ,
247- connectionNamingStrategy ,
248- coordinatorClientFactory (this , consumerNodeRetryDelay ),
249- forceReplicaForConsumers ,
250- Utils .brokerPicker ());
251- this .offsetTrackingCoordinator = new OffsetTrackingCoordinator (this );
252-
253- ThreadFactory threadFactory = threadFactory ("rabbitmq-stream-environment-locator-scheduler-" );
254- this .locatorReconnectionScheduledExecutorService =
255- Executors .newScheduledThreadPool (this .locators .size (), threadFactory );
256-
257- ClientParameters clientParametersForInit = locatorParametersCopy ();
258- Runnable locatorInitSequence =
259- () -> {
260- RuntimeException lastException = null ;
261- for (int i = 0 ; i < locators .size (); i ++) {
262- Address address = addresses .get (i % addresses .size ());
263- Locator locator = locator (i );
264- address = addressResolver .resolve (address );
265- String connectionName = connectionNamingStrategy .apply (ClientConnectionType .LOCATOR );
266- Client .ClientParameters locatorParameters =
267- clientParametersForInit
268- .duplicate ()
269- .host (address .host ())
270- .port (address .port ())
271- .clientProperty ("connection_name" , connectionName )
272- .shutdownListener (
273- shutdownListener (locator , connectionNamingStrategy , clientFactory ));
274- try {
275- Client client = clientFactory .apply (locatorParameters );
276- locator .client (client );
277- LOGGER .debug ("Created locator connection '{}'" , connectionName );
278- LOGGER .debug ("Locator connected to {}" , address );
279- } catch (RuntimeException e ) {
280- LOGGER .debug ("Error while try to connect to {}: {}" , address , e .getMessage ());
281- lastException = e ;
207+ ShutdownService shutdownService = new ShutdownService ();
208+ try {
209+ this .executorServiceFactory =
210+ new DefaultExecutorServiceFactory (
211+ this .addresses .size (), 1 , "rabbitmq-stream-locator-connection-" );
212+ shutdownService .wrap (this .executorServiceFactory ::close );
213+
214+ if (clientParametersPrototype .eventLoopGroup == null ) {
215+ this .eventLoopGroup = Utils .eventLoopGroup ();
216+ shutdownService .wrap (() -> closeEventLoopGroup (this .eventLoopGroup ));
217+ this .clientParametersPrototype =
218+ clientParametersPrototype .duplicate ().eventLoopGroup (this .eventLoopGroup );
219+ } else {
220+ this .eventLoopGroup = null ;
221+ this .clientParametersPrototype =
222+ clientParametersPrototype
223+ .duplicate ()
224+ .eventLoopGroup (clientParametersPrototype .eventLoopGroup );
225+ }
226+ ScheduledExecutorService executorService ;
227+ if (scheduledExecutorService == null ) {
228+ int threads = AVAILABLE_PROCESSORS ;
229+ LOGGER .debug ("Creating scheduled executor service with {} thread(s)" , threads );
230+ ThreadFactory threadFactory = threadFactory ("rabbitmq-stream-environment-scheduler-" );
231+ executorService = Executors .newScheduledThreadPool (threads , threadFactory );
232+ shutdownService .wrap (executorService ::shutdownNow );
233+ this .privateScheduleExecutorService = true ;
234+ } else {
235+ executorService = scheduledExecutorService ;
236+ this .privateScheduleExecutorService = false ;
237+ }
238+ this .scheduledExecutorService = executorService ;
239+
240+ this .producersCoordinator =
241+ new ProducersCoordinator (
242+ this ,
243+ maxProducersByConnection ,
244+ maxTrackingConsumersByConnection ,
245+ connectionNamingStrategy ,
246+ coordinatorClientFactory (this , producerNodeRetryDelay ),
247+ forceLeaderForProducers );
248+ shutdownService .wrap (this .producersCoordinator ::close );
249+ this .consumersCoordinator =
250+ new ConsumersCoordinator (
251+ this ,
252+ maxConsumersByConnection ,
253+ connectionNamingStrategy ,
254+ coordinatorClientFactory (this , consumerNodeRetryDelay ),
255+ forceReplicaForConsumers ,
256+ Utils .brokerPicker ());
257+ shutdownService .wrap (this .consumersCoordinator ::close );
258+ this .offsetTrackingCoordinator = new OffsetTrackingCoordinator (this );
259+ shutdownService .wrap (this .offsetTrackingCoordinator ::close );
260+
261+ ThreadFactory threadFactory = threadFactory ("rabbitmq-stream-environment-locator-scheduler-" );
262+ this .locatorReconnectionScheduledExecutorService =
263+ Executors .newScheduledThreadPool (this .locators .size (), threadFactory );
264+ shutdownService .wrap (this .locatorReconnectionScheduledExecutorService ::shutdownNow );
265+
266+ ClientParameters clientParametersForInit = locatorParametersCopy ();
267+ Runnable locatorInitSequence =
268+ () -> {
269+ RuntimeException lastException = null ;
270+ for (int i = 0 ; i < locators .size (); i ++) {
271+ Address address = addresses .get (i % addresses .size ());
272+ Locator locator = locator (i );
273+ address = addressResolver .resolve (address );
274+ String connectionName = connectionNamingStrategy .apply (ClientConnectionType .LOCATOR );
275+ Client .ClientParameters locatorParameters =
276+ clientParametersForInit
277+ .duplicate ()
278+ .host (address .host ())
279+ .port (address .port ())
280+ .clientProperty ("connection_name" , connectionName )
281+ .shutdownListener (
282+ shutdownListener (locator , connectionNamingStrategy , clientFactory ));
283+ try {
284+ Client client = clientFactory .apply (locatorParameters );
285+ locator .client (client );
286+ LOGGER .debug ("Created locator connection '{}'" , connectionName );
287+ LOGGER .debug ("Locator connected to {}" , address );
288+ } catch (RuntimeException e ) {
289+ LOGGER .debug ("Error while try to connect to {}: {}" , address , e .getMessage ());
290+ lastException = e ;
291+ }
282292 }
283- }
284- if (this .locators .stream ().allMatch (Locator ::isNotSet )) {
285- throw lastException == null
286- ? new StreamException ("Not locator available" )
287- : lastException ;
288- } else {
289- this .locators .forEach (
290- l -> {
291- if (l .isNotSet ()) {
292- ShutdownListener shutdownListener =
293- shutdownListener (l , connectionNamingStrategy , clientFactory );
294- Client .ClientParameters newLocatorParameters =
295- this .locatorParametersCopy ().shutdownListener (shutdownListener );
296- scheduleLocatorConnection (
297- newLocatorParameters ,
298- this .addressResolver ,
299- l ,
300- connectionNamingStrategy ,
301- clientFactory ,
302- this .locatorReconnectionScheduledExecutorService ,
303- this .recoveryBackOffDelayPolicy ,
304- l .label ());
305- }
306- });
307- }
308- };
309- if (lazyInit ) {
310- this .locatorInitializationSequence = locatorInitSequence ;
311- } else {
312- locatorInitSequence .run ();
313- locatorsInitialized .set (true );
314- this .locatorInitializationSequence = () -> {};
293+ if (this .locators .stream ().allMatch (Locator ::isNotSet )) {
294+ throw lastException == null
295+ ? new StreamException ("Not locator available" )
296+ : lastException ;
297+ } else {
298+ this .locators .forEach (
299+ l -> {
300+ if (l .isNotSet ()) {
301+ ShutdownListener shutdownListener =
302+ shutdownListener (l , connectionNamingStrategy , clientFactory );
303+ Client .ClientParameters newLocatorParameters =
304+ this .locatorParametersCopy ().shutdownListener (shutdownListener );
305+ scheduleLocatorConnection (
306+ newLocatorParameters ,
307+ this .addressResolver ,
308+ l ,
309+ connectionNamingStrategy ,
310+ clientFactory ,
311+ this .locatorReconnectionScheduledExecutorService ,
312+ this .recoveryBackOffDelayPolicy ,
313+ l .label ());
314+ }
315+ });
316+ }
317+ };
318+ if (lazyInit ) {
319+ this .locatorInitializationSequence = locatorInitSequence ;
320+ } else {
321+ locatorInitSequence .run ();
322+ locatorsInitialized .set (true );
323+ this .locatorInitializationSequence = () -> {};
324+ }
325+ this .codec =
326+ clientParametersPrototype .codec () == null
327+ ? Codecs .DEFAULT
328+ : clientParametersPrototype .codec ();
329+ this .clockRefreshFuture =
330+ this .scheduledExecutorService .scheduleAtFixedRate (
331+ namedRunnable (this .clock ::refresh , "Background clock refresh" ), 1 , 1 , SECONDS );
332+ shutdownService .wrap (() -> this .clockRefreshFuture .cancel (false ));
333+ } catch (RuntimeException e ) {
334+ shutdownService .close ();
335+ throw e ;
315336 }
316- this .codec =
317- clientParametersPrototype .codec () == null
318- ? Codecs .DEFAULT
319- : clientParametersPrototype .codec ();
320- this .clockRefreshFuture =
321- this .scheduledExecutorService .scheduleAtFixedRate (
322- namedRunnable (this .clock ::refresh , "Background clock refresh" ), 1 , 1 , SECONDS );
323337 }
324338
325339 private ShutdownListener shutdownListener (
@@ -717,20 +731,24 @@ public void close() {
717731 if (this .locatorReconnectionScheduledExecutorService != null ) {
718732 this .locatorReconnectionScheduledExecutorService .shutdownNow ();
719733 }
720- try {
721- if (this .eventLoopGroup != null
722- && (!this .eventLoopGroup .isShuttingDown () || !this .eventLoopGroup .isShutdown ())) {
723- LOGGER .debug ("Closing Netty event loop group" );
724- this .eventLoopGroup .shutdownGracefully (1 , 10 , SECONDS ).get (10 , SECONDS );
725- }
726- } catch (InterruptedException e ) {
727- LOGGER .info ("Event loop group closing has been interrupted" );
728- Thread .currentThread ().interrupt ();
729- } catch (ExecutionException e ) {
730- LOGGER .info ("Event loop group closing failed" , e );
731- } catch (TimeoutException e ) {
732- LOGGER .info ("Could not close event loop group in 10 seconds" );
734+ closeEventLoopGroup (this .eventLoopGroup );
735+ }
736+ }
737+
738+ private static void closeEventLoopGroup (EventLoopGroup eventLoopGroup ) {
739+ try {
740+ if (eventLoopGroup != null
741+ && (!eventLoopGroup .isShuttingDown () || !eventLoopGroup .isShutdown ())) {
742+ LOGGER .debug ("Closing Netty event loop group" );
743+ eventLoopGroup .shutdownGracefully (1 , 10 , SECONDS ).get (10 , SECONDS );
733744 }
745+ } catch (InterruptedException e ) {
746+ LOGGER .info ("Event loop group closing has been interrupted" );
747+ Thread .currentThread ().interrupt ();
748+ } catch (ExecutionException e ) {
749+ LOGGER .info ("Event loop group closing failed" , e );
750+ } catch (TimeoutException e ) {
751+ LOGGER .info ("Could not close event loop group in 10 seconds" );
734752 }
735753 }
736754
0 commit comments