Skip to content

Commit f1546f1

Browse files
committed
Release management if connection gets closed in affinity lookup
1 parent eb0f5e3 commit f1546f1

File tree

4 files changed

+35
-8
lines changed

4 files changed

+35
-8
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,10 @@ private CompletableFuture<NativeConnectionWrapper> recoverNativeConnection(
468468
public <T> T maybeRetry(Supplier<T> task) {
469469
return RetryUtils.callAndMaybeRetry(
470470
task::get,
471-
e -> true,
471+
// no need to retry if the connection is closed
472+
// the affinity task will fail and AsyncRetry will take care
473+
// of retrying later
474+
e -> RECOVERY_PREDICATE.negate().test(e),
472475
Duration.ofMillis(10),
473476
5,
474477
"Connection affinity operation");

src/main/java/com/rabbitmq/client/amqp/impl/ConnectionUtils.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
6060
// no affinity asked, we create a connection and return it
6161
return retryStrategy.maybeRetry(() -> connectionFactory.apply(null));
6262
}
63+
AmqpConnection.NativeConnectionWrapper connectionWrapper = null;
6364
try {
6465
AmqpConnection.NativeConnectionWrapper pickedConnection = null;
6566
int attemptCount = 0;
@@ -68,7 +69,7 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
6869
Management.QueueInfo info = affinityCache.queueInfo(context.queue());
6970
while (pickedConnection == null) {
7071
attemptCount++;
71-
AmqpConnection.NativeConnectionWrapper connectionWrapper = null;
72+
connectionWrapper = null;
7273
if (info == null) {
7374
connectionWrapper = retryStrategy.maybeRetry(() -> connectionFactory.apply(null));
7475
info = lookUpQueueInfo(management, context, affinityCache, retryStrategy);
@@ -147,6 +148,23 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
147148
}
148149
}
149150
return pickedConnection;
151+
} catch (AmqpException.AmqpConnectionException e) {
152+
management.releaseResources();
153+
try {
154+
if (connectionWrapper != null) {
155+
connectionWrapper.connection().close();
156+
}
157+
} catch (Exception ex) {
158+
LOGGER.debug(
159+
"Error while closing native connection while enforcing affinity: {}", ex.getMessage());
160+
}
161+
management.markUnavailable();
162+
LOGGER.warn(
163+
"Cannot enforce affinity {} of '{}' because connection has been closed",
164+
context,
165+
connectionName,
166+
e);
167+
throw e;
150168
} catch (RuntimeException e) {
151169
LOGGER.warn(
152170
"Cannot enforce affinity {} of '{}' error when looking up queue",

src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionAffinityUnitTest.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static com.rabbitmq.client.amqp.Management.QueueType.QUORUM;
2121
import static com.rabbitmq.client.amqp.Management.QueueType.STREAM;
22+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2223
import static org.assertj.core.api.Assertions.fail;
2324
import static org.mockito.ArgumentMatchers.anyList;
2425
import static org.mockito.Mockito.*;
@@ -201,6 +202,16 @@ void useAddressReturnedByCache() {
201202
.hasMapping(LEADER_NODENAME, LEADER_ADDRESS);
202203
}
203204

205+
@Test
206+
void connectionClosedExceptionDuringQueueInfoLookupShouldMarkManagementAsUnavailable() {
207+
when(management.queueInfo(Q)).thenThrow(new AmqpException.AmqpConnectionException("", null));
208+
when(cf.apply(null)).thenReturn(follower1Connection());
209+
assertThatThrownBy(() -> enforceAffinity(cf, management, affinity(), cache))
210+
.isInstanceOf(AmqpException.AmqpConnectionException.class);
211+
verify(management, times(1)).markUnavailable();
212+
verify(nativeConnection, times(1)).close();
213+
}
214+
204215
AmqpConnection.NativeConnectionWrapper leaderConnection() {
205216
return new AmqpConnection.NativeConnectionWrapper(
206217
this.nativeConnection, LEADER_NODENAME, LEADER_ADDRESS);
@@ -216,11 +227,6 @@ AmqpConnection.NativeConnectionWrapper follower2Connection() {
216227
this.nativeConnection, FOLLOWER2_NODENAME, FOLLOWER2_ADDRESS);
217228
}
218229

219-
AmqpConnection.NativeConnectionWrapper connection(String nodename) {
220-
return new AmqpConnection.NativeConnectionWrapper(
221-
this.nativeConnection, nodename, NODES.get(nodename));
222-
}
223-
224230
static ConnectionUtils.AffinityContext affinity() {
225231
return new ConnectionUtils.AffinityContext(Q, ConnectionSettings.Affinity.Operation.PUBLISH);
226232
}

src/test/resources/logback-test.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<logger name="com.rabbitmq.client.amqp.impl.AmqpManagement" level="debug" />
2222
<logger name="com.rabbitmq.client.amqp.impl.AmqpPublisher" level="warn" />
2323
<logger name="com.rabbitmq.client.amqp.impl.AmqpConsumer" level="warn" />
24-
<logger name="com.rabbitmq.client.amqp.impl.RetryUtils" level="warn" />
24+
<logger name="com.rabbitmq.client.amqp.impl.RetryUtils" level="debug" />
2525

2626
<root level="warn">
2727
<appender-ref ref="STDOUT" />

0 commit comments

Comments
 (0)