@@ -82,7 +82,9 @@ class AmqpManagement implements Management {
8282 private final TopologyListener topologyListener ;
8383 private final Supplier <String > nameSupplier ;
8484 private final AtomicReference <State > state = new AtomicReference <>(CREATED );
85- private final AtomicBoolean initializing = new AtomicBoolean (false );
85+ // private final AtomicBoolean initializing = new AtomicBoolean(false);
86+ private volatile boolean initializing = false ;
87+ private final Lock initializationLock = new ReentrantLock ();
8688 private final Duration receiveLoopIdleTimeout ;
8789 private final Lock instanceLock = new ReentrantLock ();
8890
@@ -170,7 +172,7 @@ public UnbindSpecification unbind() {
170172
171173 @ Override
172174 public void close () {
173- if (this .initializing . get () ) {
175+ if (this .initializing ) {
174176 throw new AmqpException .AmqpResourceInvalidStateException (
175177 "Management is initializing, retry closing later." );
176178 }
@@ -203,45 +205,53 @@ public void close() {
203205
204206 void init () {
205207 if (this .state () != OPEN ) {
206- if (this .initializing .compareAndSet (false , true )) {
207- LOGGER .debug ("Initializing management ({})." , this );
208- this .state (UNAVAILABLE );
208+ if (!this .initializing ) {
209209 try {
210- if (this .receiveLoop != null ) {
211- this .receiveLoop .cancel (true );
212- this .receiveLoop = null ;
210+ initializationLock .lock ();
211+ if (!this .initializing ) {
212+ this .initializing = true ;
213+ LOGGER .debug ("Initializing management ({})." , this );
214+ this .state (UNAVAILABLE );
215+ try {
216+ if (this .receiveLoop != null ) {
217+ this .receiveLoop .cancel (true );
218+ this .receiveLoop = null ;
219+ }
220+ LOGGER .debug ("Creating management session ({})." , this );
221+ this .session = this .connection .nativeConnection ().openSession ();
222+ String linkPairName = "management-link-pair" ;
223+ Map <String , Object > properties = Collections .singletonMap ("paired" , Boolean .TRUE );
224+ LOGGER .debug ("Creating management sender ({})." , this );
225+ this .sender =
226+ session .openSender (
227+ MANAGEMENT_NODE_ADDRESS ,
228+ new SenderOptions ()
229+ .deliveryMode (DeliveryMode .AT_MOST_ONCE )
230+ .linkName (linkPairName )
231+ .properties (properties ));
232+
233+ LOGGER .debug ("Creating management receiver ({})." , this );
234+ this .receiver =
235+ session .openReceiver (
236+ MANAGEMENT_NODE_ADDRESS ,
237+ new ReceiverOptions ()
238+ .deliveryMode (DeliveryMode .AT_MOST_ONCE )
239+ .linkName (linkPairName )
240+ .properties (properties )
241+ .creditWindow (100 ));
242+
243+ this .sender .openFuture ().get (this .rpcTimeout .toMillis (), MILLISECONDS );
244+ LOGGER .debug ("Management sender created ({})." , this );
245+ this .receiver .openFuture ().get (this .rpcTimeout .toMillis (), MILLISECONDS );
246+ LOGGER .debug ("Management receiver created ({})." , this );
247+ this .state (OPEN );
248+ this .initializing = false ;
249+ } catch (Exception e ) {
250+ throw new AmqpException (e );
251+ }
213252 }
214- LOGGER .debug ("Creating management session ({})." , this );
215- this .session = this .connection .nativeConnection ().openSession ();
216- String linkPairName = "management-link-pair" ;
217- Map <String , Object > properties = Collections .singletonMap ("paired" , Boolean .TRUE );
218- LOGGER .debug ("Creating management sender ({})." , this );
219- this .sender =
220- session .openSender (
221- MANAGEMENT_NODE_ADDRESS ,
222- new SenderOptions ()
223- .deliveryMode (DeliveryMode .AT_MOST_ONCE )
224- .linkName (linkPairName )
225- .properties (properties ));
226-
227- LOGGER .debug ("Creating management receiver ({})." , this );
228- this .receiver =
229- session .openReceiver (
230- MANAGEMENT_NODE_ADDRESS ,
231- new ReceiverOptions ()
232- .deliveryMode (DeliveryMode .AT_MOST_ONCE )
233- .linkName (linkPairName )
234- .properties (properties )
235- .creditWindow (100 ));
236-
237- this .sender .openFuture ().get (this .rpcTimeout .toMillis (), MILLISECONDS );
238- LOGGER .debug ("Management sender created ({})." , this );
239- this .receiver .openFuture ().get (this .rpcTimeout .toMillis (), MILLISECONDS );
240- LOGGER .debug ("Management receiver created ({})." , this );
241- this .state (OPEN );
242- this .initializing .set (false );
243- } catch (Exception e ) {
244- throw new AmqpException (e );
253+ } finally {
254+ initializationLock .unlock ();
245255 }
246256 }
247257 }
0 commit comments