Skip to content

Commit 6d46590

Browse files
committed
Avoid concurrent recovery attempts
1 parent 680fc69 commit 6d46590

File tree

4 files changed

+31
-19
lines changed

4 files changed

+31
-19
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -322,14 +322,15 @@ TopologyListener createTopologyListener(AmqpConnectionBuilder builder) {
322322
// nothing to do in this listener
323323
return;
324324
}
325+
AmqpException exception = ExceptionUtils.convert(event.failureCause());
326+
LOGGER.debug("Converted native exception to {}", exception.getClass().getSimpleName());
325327
if (this.recoveringConnection.get()) {
326328
LOGGER.debug(
327329
"Filtering recovery task scheduling, connection recovery of '{}' already in progress",
328330
this.name());
331+
this.releaseManagementResources(exception);
329332
return;
330333
}
331-
AmqpException exception = ExceptionUtils.convert(event.failureCause());
332-
LOGGER.debug("Converted native exception to {}", exception.getClass().getSimpleName());
333334

334335
if (RECOVERY_PREDICATE.test(exception) && this.state() != OPENING) {
335336
LOGGER.debug(
@@ -343,6 +344,8 @@ TopologyListener createTopologyListener(AmqpConnectionBuilder builder) {
343344
if (!this.recoveringConnection.get()) {
344345
recoverAfterConnectionFailure(
345346
recoveryConfiguration, name, exception, resultReference);
347+
} else {
348+
this.releaseManagementResources(exception);
346349
}
347350
});
348351
} else {
@@ -361,11 +364,11 @@ TopologyListener createTopologyListener(AmqpConnectionBuilder builder) {
361364
private void recoverAfterConnectionFailure(
362365
AmqpConnectionBuilder.AmqpRecoveryConfiguration recoveryConfiguration,
363366
String connectionName,
364-
Exception failureCause,
367+
AmqpException failureCause,
365368
AtomicReference<BiConsumer<org.apache.qpid.protonj2.client.Connection, DisconnectionEvent>>
366369
disconnectedHandlerReference) {
367370
LOGGER.info(
368-
"Connection '{}' to '{}' has been disconnected, trying to recover.",
371+
"Connection '{}' to '{}' has been disconnected, initializing recovery.",
369372
this.name(),
370373
this.currentConnectionLabel());
371374
LOGGER.debug("Notifying listeners of connection '{}'.", this.name());
@@ -376,11 +379,11 @@ private void recoverAfterConnectionFailure(
376379
this.nativeSession = null;
377380
this.connectionAddress = null;
378381
LOGGER.debug("Releasing management resource of connection '{}'.", this.name());
379-
this.releaseManagementResources();
382+
this.releaseManagementResources(failureCause);
380383
CompletableFuture<NativeConnectionWrapper> ncwFuture;
381384
if (this.recoveringConnection.compareAndSet(false, true)) {
382385
this.recoveringConnection.set(true);
383-
LOGGER.debug("Connection attempt for '{}'.", this.name());
386+
LOGGER.debug("Scheduling connection attempt for '{}'.", this.name());
384387
ncwFuture =
385388
recoverNativeConnection(
386389
recoveryConfiguration, connectionName, disconnectedHandlerReference);
@@ -394,7 +397,6 @@ private void recoverAfterConnectionFailure(
394397
ncw -> {
395398
this.sync(ncw);
396399
LOGGER.debug("Reconnected '{}' to {}", this.name(), this.currentConnectionLabel());
397-
this.recoveringConnection.set(false);
398400
try {
399401
if (recoveryConfiguration.topology()) {
400402
this.management.init();
@@ -407,13 +409,15 @@ private void recoverAfterConnectionFailure(
407409
LOGGER.info(
408410
"Recovered connection '{}' to {}", this.name(), this.currentConnectionLabel());
409411
this.state(OPEN);
412+
this.recoveringConnection.set(false);
410413
} catch (Exception ex) {
411414
// likely InterruptedException or IO exception
412415
LOGGER.warn(
413416
"Error while trying to recover topology for connection '{}': {}",
414417
this.name(),
415418
ex.getMessage());
416-
if (RECOVERY_PREDICATE.test(ex)) {
419+
AmqpException amqpException = ExceptionUtils.convert(ex);
420+
if (RECOVERY_PREDICATE.test(amqpException)) {
417421
LOGGER.debug(
418422
"Error during topology recovery, queueing recovery task for '{}', error is {}",
419423
this.name(),
@@ -424,7 +428,10 @@ private void recoverAfterConnectionFailure(
424428
() -> {
425429
if (!this.recoveringConnection.get()) {
426430
recoverAfterConnectionFailure(
427-
recoveryConfiguration, name, ex, disconnectedHandlerReference);
431+
recoveryConfiguration,
432+
name,
433+
amqpException,
434+
disconnectedHandlerReference);
428435
}
429436
});
430437
}
@@ -478,7 +485,7 @@ public <T> T maybeRetry(Supplier<T> task) {
478485
}
479486
},
480487
connectionName))
481-
.description("Trying to recover native connection for '%s'.", connectionName)
488+
.description("Recovering native connection for '%s'.", connectionName)
482489
.delayPolicy(recoveryConfiguration.backOffDelayPolicy())
483490
.retry(RECOVERY_PREDICATE)
484491
.scheduler(this.scheduledExecutorService())
@@ -582,9 +589,9 @@ private void closeManagement() {
582589
this.management.close();
583590
}
584591

585-
private void releaseManagementResources() {
592+
private void releaseManagementResources(AmqpException e) {
586593
if (this.management != null) {
587-
this.management.releaseResources();
594+
this.management.releaseResources(e);
588595
}
589596
}
590597

src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ public void close() {
202202
}
203203
if (this.closed.compareAndSet(false, true)) {
204204
this.state(CLOSED);
205-
this.releaseResources();
205+
this.releaseResources(null);
206206
if (this.receiver != null) {
207207
try {
208208
this.receiver.close();
@@ -361,12 +361,13 @@ private void failRequests(AmqpException exception) {
361361
}
362362
}
363363

364-
void releaseResources() {
364+
void releaseResources(AmqpException e) {
365365
this.markUnavailable();
366366
if (this.receiveLoop != null) {
367367
this.receiveLoop.cancel(true);
368368
this.receiveLoop = null;
369369
}
370+
this.failRequests(e);
370371
}
371372

372373
QueueInfo declareQueue(String name, Map<String, Object> body) {
@@ -599,6 +600,7 @@ private OutstandingRequest(Duration timeout) {
599600

600601
void block() {
601602
boolean completed;
603+
long start = System.nanoTime();
602604
try {
603605
completed = this.latch.await(timeout.toMillis(), MILLISECONDS);
604606
} catch (InterruptedException e) {
@@ -609,7 +611,8 @@ void block() {
609611
throw this.exception.get();
610612
}
611613
if (!completed) {
612-
throw new AmqpException("Could not get management response in %d ms", timeout.toMillis());
614+
Duration duration = Duration.ofNanos(System.nanoTime() - start);
615+
throw new AmqpException("Could not get management response in %d ms", duration.toMillis());
613616
}
614617
}
615618

src/main/java/com/rabbitmq/client/amqp/impl/ConnectionUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
115115
pickedConnection = connectionWrapper;
116116
} else {
117117
LOGGER.debug("Affinity no longer valid, retrying.");
118-
management.releaseResources();
118+
management.releaseResources(null);
119119
connectionWrapper.connection().close();
120120
}
121121
}
@@ -143,13 +143,13 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
143143
queueInfoRefreshed = true;
144144
}
145145
}
146-
management.releaseResources();
146+
management.releaseResources(null);
147147
connectionWrapper.connection().close();
148148
}
149149
}
150150
return pickedConnection;
151151
} catch (AmqpException.AmqpConnectionException e) {
152-
management.releaseResources();
152+
management.releaseResources(e);
153153
try {
154154
if (connectionWrapper != null) {
155155
connectionWrapper.connection().close();

src/main/java/com/rabbitmq/client/amqp/impl/ExceptionUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ static <T> T wrapGet(Future<T> future) throws ClientException {
4949
}
5050

5151
static AmqpException convert(Exception e) {
52-
if (e instanceof ClientException) {
52+
if (e instanceof AmqpException) {
53+
return (AmqpException) e;
54+
} else if (e instanceof ClientException) {
5355
return convert((ClientException) e);
5456
} else {
5557
return new AmqpException(e);

0 commit comments

Comments
 (0)