3030import java .util .Map ;
3131import java .util .Set ;
3232import java .util .concurrent .*;
33+ import java .util .concurrent .atomic .AtomicBoolean ;
3334
3435/**
3536 * Manages a set of channels, indexed by channel number (<code><b>1.._channelMax</b></code>).
@@ -38,16 +39,15 @@ public class ChannelManager {
3839
3940 private static final Logger LOGGER = LoggerFactory .getLogger (ChannelManager .class );
4041
42+ private final AtomicBoolean closed = new AtomicBoolean (false );
4143 /** Monitor for <code>_channelMap</code> and <code>channelNumberAllocator</code> */
4244 private final Object monitor = new Object ();
4345 /** Mapping from <code><b>1.._channelMax</b></code> to {@link ChannelN} instance */
44- private final Map <Integer , ChannelN > _channelMap = new HashMap <Integer , ChannelN >();
46+ private final Map <Integer , ChannelN > _channelMap = new HashMap <>();
4547 private final IntAllocator channelNumberAllocator ;
4648
4749 private final ConsumerWorkService workService ;
4850
49- private final Set <CountDownLatch > shutdownSet = new HashSet <CountDownLatch >();
50-
5151 /** Maximum channel number available on this connection. */
5252 private final int _channelMax ;
5353 private ExecutorService shutdownExecutor ;
@@ -109,61 +109,54 @@ public ChannelN getChannel(int channelNumber) {
109109 * @param signal reason for shutdown
110110 */
111111 public void handleSignal (final ShutdownSignalException signal ) {
112- Set <ChannelN > channels ;
113- synchronized (this .monitor ) {
114- channels = new HashSet <ChannelN >(_channelMap .values ());
115- }
116-
117- for (final ChannelN channel : channels ) {
118- releaseChannelNumber (channel );
119- // async shutdown if possible
120- // see https://github.com/rabbitmq/rabbitmq-java-client/issues/194
121- Runnable channelShutdownRunnable = new Runnable () {
122- @ Override
123- public void run () {
124- channel .processShutdownSignal (signal , true , true );
125- }
126- };
127- if (this .shutdownExecutor == null ) {
128- channelShutdownRunnable .run ();
129- } else {
130- Future <?> channelShutdownTask = this .shutdownExecutor .submit (channelShutdownRunnable );
131- try {
132- channelShutdownTask .get (channelShutdownTimeout , TimeUnit .MILLISECONDS );
133- } catch (Exception e ) {
134- LOGGER .warn ("Couldn't properly close channel {} on shutdown after waiting for {} ms" , channel .getChannelNumber (), channelShutdownTimeout );
135- channelShutdownTask .cancel (true );
112+ if (this .closed .compareAndSet (false , true )) {
113+ Set <ChannelN > channels ;
114+ synchronized (this .monitor ) {
115+ channels = new HashSet <>(_channelMap .values ());
116+ }
117+ Set <CountDownLatch > shutdownSet = new HashSet <>();
118+ for (final ChannelN channel : channels ) {
119+ releaseChannelNumber (channel );
120+ // async shutdown if possible
121+ // see https://github.com/rabbitmq/rabbitmq-java-client/issues/194
122+ Runnable channelShutdownRunnable = () -> channel .processShutdownSignal (signal , true , true );
123+ if (this .shutdownExecutor == null ) {
124+ channelShutdownRunnable .run ();
125+ } else {
126+ Future <?> channelShutdownTask = this .shutdownExecutor .submit (channelShutdownRunnable );
127+ try {
128+ channelShutdownTask .get (channelShutdownTimeout , TimeUnit .MILLISECONDS );
129+ } catch (Exception e ) {
130+ LOGGER .warn ("Couldn't properly close channel {} on shutdown after waiting for {} ms" , channel .getChannelNumber (), channelShutdownTimeout );
131+ channelShutdownTask .cancel (true );
132+ }
136133 }
134+ shutdownSet .add (channel .getShutdownLatch ());
135+ channel .notifyListeners ();
137136 }
138- shutdownSet .add (channel .getShutdownLatch ());
139- channel .notifyListeners ();
137+ scheduleShutdownProcessing (shutdownSet );
140138 }
141- scheduleShutdownProcessing ();
142139 }
143140
144- private void scheduleShutdownProcessing () {
145- final Set <CountDownLatch > sdSet = new HashSet <CountDownLatch >(shutdownSet );
141+ private void scheduleShutdownProcessing (Set <CountDownLatch > shutdownSet ) {
146142 final ConsumerWorkService ssWorkService = workService ;
147- Runnable target = new Runnable () {
148- @ Override
149- public void run () {
150- for (CountDownLatch latch : sdSet ) {
151- try {
152- int shutdownTimeout = ssWorkService .getShutdownTimeout ();
153- if (shutdownTimeout == 0 ) {
154- latch .await ();
155- } else {
156- boolean completed = latch .await (shutdownTimeout , TimeUnit .MILLISECONDS );
157- if (!completed ) {
158- LOGGER .warn ("Consumer dispatcher for channel didn't shutdown after waiting for {} ms" , shutdownTimeout );
159- }
143+ Runnable target = () -> {
144+ for (CountDownLatch latch : shutdownSet ) {
145+ try {
146+ int shutdownTimeout = ssWorkService .getShutdownTimeout ();
147+ if (shutdownTimeout == 0 ) {
148+ latch .await ();
149+ } else {
150+ boolean completed = latch .await (shutdownTimeout , TimeUnit .MILLISECONDS );
151+ if (!completed ) {
152+ LOGGER .warn ("Consumer dispatcher for channel didn't shutdown after waiting for {} ms" , shutdownTimeout );
160153 }
161- } catch (Throwable e ) {
162- /*ignored*/
163154 }
155+ } catch (Throwable e ) {
156+ /*ignored*/
164157 }
165- ssWorkService .shutdown ();
166158 }
159+ ssWorkService .shutdown ();
167160 };
168161 if (this .shutdownExecutor != null ) {
169162 shutdownExecutor .execute (target );
0 commit comments