Skip to content

Commit 0f1f5e8

Browse files
committed
Recover also on resource invalid state exceptions
References #118
1 parent 96f7a84 commit 0f1f5e8

File tree

3 files changed

+21
-5
lines changed

3 files changed

+21
-5
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,24 @@
4747

4848
final class AmqpConnection extends ResourceBase implements Connection {
4949

50+
/** Connection-related issues */
51+
private static final Predicate<Exception> CONNECTION_EXCEPTION_PREDICATE =
52+
e -> e instanceof AmqpException.AmqpConnectionException;
53+
54+
/**
55+
* Issues related to underlying resources.
56+
*
57+
* <p>E.g. the connection used for enforcing affinity gets closed, the management is marked as
58+
* unavailable and throws an invalid state exception when it is called. The recovery process
59+
* should restart.
60+
*/
61+
private static final Predicate<Exception> RESOURCE_INVALID_STATE_PREDICATE =
62+
e ->
63+
e instanceof AmqpException.AmqpResourceInvalidStateException
64+
&& !(e instanceof AmqpException.AmqpResourceClosedException);
65+
5066
private static final Predicate<Exception> RECOVERY_PREDICATE =
51-
t -> t instanceof AmqpException.AmqpConnectionException;
67+
CONNECTION_EXCEPTION_PREDICATE.or(RESOURCE_INVALID_STATE_PREDICATE);
5268

5369
private static final AtomicLong ID_SEQUENCE = new AtomicLong(0);
5470

@@ -399,7 +415,7 @@ private void recoverAfterConnectionFailure(
399415
ex.getMessage());
400416
if (RECOVERY_PREDICATE.test(ex)) {
401417
LOGGER.debug(
402-
"Error during topology recoverable, queueing recovery task for '{}', error is {}",
418+
"Error during topology recovery, queueing recovery task for '{}', error is {}",
403419
this.name(),
404420
ex.getMessage());
405421
this.environment

src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ void init() {
233233
if (!this.initializing && this.state() != OPEN) {
234234
this.initializing = true;
235235
LOGGER.debug("Initializing management ({}).", this);
236-
this.state(UNAVAILABLE);
236+
this.markUnavailable();
237237
try {
238238
if (this.receiveLoop != null) {
239239
this.receiveLoop.cancel(true);
@@ -313,7 +313,7 @@ private Runnable receiveTask() {
313313
} catch (ClientConnectionRemotelyClosedException | ClientLinkRemotelyClosedException e) {
314314
// receiver is closed
315315
} catch (ClientSessionRemotelyClosedException e) {
316-
this.state(UNAVAILABLE);
316+
this.markUnavailable();
317317
LOGGER.info("Management session closed in receive loop: {} ({})", e.getMessage(), this);
318318
AmqpException exception = ExceptionUtils.convert(e);
319319
this.failRequests(exception);

src/test/resources/logback-test.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
<logger name="com.rabbitmq.client.amqp.impl.AsyncRetry" level="warn" />
1919
<logger name="com.rabbitmq.client.amqp.impl.EntityRecovery" level="warn" />
2020
<logger name="com.rabbitmq.client.amqp.impl.EventLoop" level="warn" />
21-
<logger name="com.rabbitmq.client.amqp.impl.AmqpManagement" level="warn" />
21+
<logger name="com.rabbitmq.client.amqp.impl.AmqpManagement" level="info" />
2222
<logger name="com.rabbitmq.client.amqp.impl.AmqpPublisher" level="warn" />
2323
<logger name="com.rabbitmq.client.amqp.impl.AmqpConsumer" level="warn" />
2424
<logger name="com.rabbitmq.client.amqp.impl.RetryUtils" level="warn" />

0 commit comments

Comments
 (0)