@@ -155,8 +155,12 @@ public class Client implements AutoCloseable {
155155 final ConcurrentMap <Integer , OutstandingRequest <?>> outstandingRequests =
156156 new ConcurrentHashMap <>();
157157 final List <SubscriptionOffset > subscriptionOffsets = new CopyOnWriteArrayList <>();
158+ // dispatches broker frames, except for delivery frames
158159 final ExecutorService executorService ;
160+ private final Consumer <ExecutorService > closeExecutorService ;
161+ // dispatches delivery frames only
159162 final ExecutorService dispatchingExecutorService ;
163+ private final Consumer <ExecutorService > closeDispatchingExecutorService ;
160164 final TuneState tuneState ;
161165 final AtomicBoolean closing = new AtomicBoolean (false );
162166 final AtomicBoolean shuttingDownDispatching = new AtomicBoolean (false );
@@ -174,7 +178,6 @@ public long applyAsLong(Object value) {
174178 }
175179 };
176180 private final AtomicInteger correlationSequence = new AtomicInteger (0 );
177- private final Runnable executorServiceClosing ;
178181 private final SaslConfiguration saslConfiguration ;
179182 private final CredentialsProvider credentialsProvider ;
180183 private final Runnable nettyClosing ;
@@ -331,44 +334,58 @@ public void initChannel(SocketChannel ch) {
331334 this .channel = f .channel ();
332335 ExecutorServiceFactory executorServiceFactory = parameters .executorServiceFactory ;
333336 if (executorServiceFactory == null ) {
337+ this .closeExecutorService =
338+ Utils .makeIdempotent (
339+ es -> {
340+ if (es != null ) {
341+ es .shutdownNow ();
342+ }
343+ });
334344 this .executorService =
335345 Executors .newSingleThreadExecutor (threadFactory (clientConnectionName + "-" ));
336346 } else {
347+ this .closeExecutorService =
348+ Utils .makeIdempotent (
349+ es -> {
350+ if (es != null ) {
351+ executorServiceFactory .clientClosed (es );
352+ }
353+ });
337354 this .executorService = executorServiceFactory .get ();
338355 }
339356 ExecutorServiceFactory dispatchingExecutorServiceFactory =
340357 parameters .dispatchingExecutorServiceFactory ;
341358 if (dispatchingExecutorServiceFactory == null ) {
359+ this .closeDispatchingExecutorService =
360+ Utils .makeIdempotent (
361+ es -> {
362+ if (es != null ) {
363+ List <Runnable > outstandingTasks = es .shutdownNow ();
364+ this .shuttingDownDispatching .set (true );
365+ for (Runnable outstandingTask : outstandingTasks ) {
366+ try {
367+ outstandingTask .run ();
368+ } catch (Exception e ) {
369+ LOGGER .info (
370+ "Error while releasing buffer in outstanding connection tasks: {}" ,
371+ e .getMessage ());
372+ }
373+ }
374+ }
375+ });
342376 this .dispatchingExecutorService =
343377 Executors .newSingleThreadExecutor (
344378 threadFactory ("dispatching-" + clientConnectionName + "-" ));
345379 } else {
380+ this .closeDispatchingExecutorService =
381+ Utils .makeIdempotent (
382+ es -> {
383+ if (es != null ) {
384+ dispatchingExecutorServiceFactory .clientClosed (es );
385+ }
386+ });
346387 this .dispatchingExecutorService = dispatchingExecutorServiceFactory .get ();
347388 }
348- this .executorServiceClosing =
349- Utils .makeIdempotent (
350- () -> {
351- if (dispatchingExecutorServiceFactory == null ) {
352- List <Runnable > outstandingTasks = this .dispatchingExecutorService .shutdownNow ();
353- this .shuttingDownDispatching .set (true );
354- for (Runnable outstandingTask : outstandingTasks ) {
355- try {
356- outstandingTask .run ();
357- } catch (Exception e ) {
358- LOGGER .info (
359- "Error while releasing buffer in outstanding connection tasks: {}" ,
360- e .getMessage ());
361- }
362- }
363- } else {
364- dispatchingExecutorServiceFactory .clientClosed (this .dispatchingExecutorService );
365- }
366- if (executorServiceFactory == null ) {
367- this .executorService .shutdownNow ();
368- } else {
369- executorServiceFactory .clientClosed (this .executorService );
370- }
371- });
372389 try {
373390 this .tuneState =
374391 new TuneState (
@@ -1451,7 +1468,12 @@ void closingSequence(ShutdownContext.ShutdownReason reason) {
14511468 this .shutdownListenerCallback .accept (reason );
14521469 }
14531470 this .nettyClosing .run ();
1454- this .executorServiceClosing .run ();
1471+ if (this .closeDispatchingExecutorService != null ) {
1472+ this .closeDispatchingExecutorService .accept (this .dispatchingExecutorService );
1473+ }
1474+ if (this .closeExecutorService != null ) {
1475+ this .closeExecutorService .accept (this .executorService );
1476+ }
14551477 }
14561478
14571479 private void closeNetty () {
0 commit comments