Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 49 additions & 39 deletions src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ class AmqpManagement implements Management {
private final TopologyListener topologyListener;
private final Supplier<String> nameSupplier;
private final AtomicReference<State> state = new AtomicReference<>(CREATED);
private final AtomicBoolean initializing = new AtomicBoolean(false);
// private final AtomicBoolean initializing = new AtomicBoolean(false);
private volatile boolean initializing = false;
private final Lock initializationLock = new ReentrantLock();
private final Duration receiveLoopIdleTimeout;
private final Lock instanceLock = new ReentrantLock();

Expand Down Expand Up @@ -170,7 +172,7 @@ public UnbindSpecification unbind() {

@Override
public void close() {
if (this.initializing.get()) {
if (this.initializing) {
throw new AmqpException.AmqpResourceInvalidStateException(
"Management is initializing, retry closing later.");
}
Expand Down Expand Up @@ -203,45 +205,53 @@ public void close() {

void init() {
if (this.state() != OPEN) {
if (this.initializing.compareAndSet(false, true)) {
LOGGER.debug("Initializing management ({}).", this);
this.state(UNAVAILABLE);
if (!this.initializing) {
try {
if (this.receiveLoop != null) {
this.receiveLoop.cancel(true);
this.receiveLoop = null;
initializationLock.lock();
if (!this.initializing) {
this.initializing = true;
LOGGER.debug("Initializing management ({}).", this);
this.state(UNAVAILABLE);
try {
if (this.receiveLoop != null) {
this.receiveLoop.cancel(true);
this.receiveLoop = null;
}
LOGGER.debug("Creating management session ({}).", this);
this.session = this.connection.nativeConnection().openSession();
String linkPairName = "management-link-pair";
Map<String, Object> properties = Collections.singletonMap("paired", Boolean.TRUE);
LOGGER.debug("Creating management sender ({}).", this);
this.sender =
session.openSender(
MANAGEMENT_NODE_ADDRESS,
new SenderOptions()
.deliveryMode(DeliveryMode.AT_MOST_ONCE)
.linkName(linkPairName)
.properties(properties));

LOGGER.debug("Creating management receiver ({}).", this);
this.receiver =
session.openReceiver(
MANAGEMENT_NODE_ADDRESS,
new ReceiverOptions()
.deliveryMode(DeliveryMode.AT_MOST_ONCE)
.linkName(linkPairName)
.properties(properties)
.creditWindow(100));

this.sender.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS);
LOGGER.debug("Management sender created ({}).", this);
this.receiver.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS);
LOGGER.debug("Management receiver created ({}).", this);
this.state(OPEN);
this.initializing = false;
} catch (Exception e) {
throw new AmqpException(e);
}
}
LOGGER.debug("Creating management session ({}).", this);
this.session = this.connection.nativeConnection().openSession();
String linkPairName = "management-link-pair";
Map<String, Object> properties = Collections.singletonMap("paired", Boolean.TRUE);
LOGGER.debug("Creating management sender ({}).", this);
this.sender =
session.openSender(
MANAGEMENT_NODE_ADDRESS,
new SenderOptions()
.deliveryMode(DeliveryMode.AT_MOST_ONCE)
.linkName(linkPairName)
.properties(properties));

LOGGER.debug("Creating management receiver ({}).", this);
this.receiver =
session.openReceiver(
MANAGEMENT_NODE_ADDRESS,
new ReceiverOptions()
.deliveryMode(DeliveryMode.AT_MOST_ONCE)
.linkName(linkPairName)
.properties(properties)
.creditWindow(100));

this.sender.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS);
LOGGER.debug("Management sender created ({}).", this);
this.receiver.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS);
LOGGER.debug("Management receiver created ({}).", this);
this.state(OPEN);
this.initializing.set(false);
} catch (Exception e) {
throw new AmqpException(e);
} finally {
initializationLock.unlock();
}
}
}
Expand Down