Skip to content

Commit 5680cde

Browse files
committed
Restart connection recovery if no running stream member for consumer
1 parent 771aa64 commit 5680cde

File tree

5 files changed

+105
-16
lines changed

5 files changed

+105
-16
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,24 @@ private void recoverConsumers() throws InterruptedException {
512512
consumer.queue(),
513513
ex);
514514
throw ex;
515+
} catch (AmqpException.AmqpResourceClosedException ex) {
516+
if (ExceptionUtils.noRunningStreamMemberOnNode(ex)) {
517+
LOGGER.warn(
518+
"Could not recover consumer {} (queue '{}') because there is "
519+
+ "running stream member on the node, restarting recovery",
520+
consumer.id(),
521+
consumer.queue(),
522+
ex);
523+
throw new AmqpException.AmqpConnectionException(
524+
"No running stream member on the node", ex);
525+
} else {
526+
LOGGER.warn(
527+
"Error while trying to recover consumer {} (queue '{}')",
528+
consumer.id(),
529+
consumer.queue(),
530+
ex);
531+
failedConsumers.add(consumer);
532+
}
515533
} catch (Exception ex) {
516534
LOGGER.warn(
517535
"Error while trying to recover consumer {} (queue '{}')",

src/main/java/com/rabbitmq/client/amqp/impl/ExceptionUtils.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,17 @@ static boolean unauthorizedAccess(ClientResourceRemotelyClosedException e) {
129129
return isUnauthorizedAccess(e.getErrorCondition());
130130
}
131131

132+
static boolean noRunningStreamMemberOnNode(AmqpException e) {
133+
if (e instanceof AmqpException.AmqpResourceClosedException) {
134+
String message = e.getMessage();
135+
return message != null
136+
&& message.contains("stream queue")
137+
&& message.contains("does not have a running replica on the local node");
138+
} else {
139+
return false;
140+
}
141+
}
142+
132143
private static boolean isUnauthorizedAccess(ErrorCondition errorCondition) {
133144
return errorConditionEquals(errorCondition, ERROR_UNAUTHORIZED_ACCESS);
134145
}

src/test/java/com/rabbitmq/client/amqp/impl/Cli.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,6 @@ static ProcessState rabbitmqStreams(String command) {
9999
return executeCommand(rabbitmqStreamsCommand() + " " + command);
100100
}
101101

102-
static ProcessState rabbitmqUpgrade(String command) {
103-
return executeCommand(rabbitmqUpgradeCommand() + " " + command);
104-
}
105-
106102
static ProcessState rabbitmqctlIgnoreError(String command) {
107103
return executeCommand(rabbitmqctlCommand() + " " + command, true);
108104
}

src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,26 @@
2020
import static com.rabbitmq.client.amqp.ConnectionSettings.Affinity.Operation.CONSUME;
2121
import static com.rabbitmq.client.amqp.ConnectionSettings.Affinity.Operation.PUBLISH;
2222
import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
23+
import static com.rabbitmq.client.amqp.impl.ExceptionUtils.noRunningStreamMemberOnNode;
2324
import static com.rabbitmq.client.amqp.impl.TestUtils.*;
2425
import static java.time.Duration.ofMillis;
2526
import static java.time.Duration.ofSeconds;
27+
import static java.util.Arrays.stream;
2628
import static java.util.stream.Collectors.toList;
2729
import static java.util.stream.IntStream.range;
2830
import static org.assertj.core.api.Assertions.assertThat;
31+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2932

3033
import com.rabbitmq.client.amqp.*;
34+
import com.rabbitmq.client.amqp.AmqpException.AmqpResourceClosedException;
3135
import com.rabbitmq.client.amqp.impl.TestUtils.Sync;
32-
import java.util.Arrays;
3336
import java.util.List;
3437
import java.util.Set;
3538
import java.util.concurrent.ConcurrentHashMap;
3639
import java.util.concurrent.atomic.AtomicInteger;
3740
import java.util.function.Consumer;
3841
import java.util.stream.Collectors;
42+
import org.assertj.core.api.Condition;
3943
import org.junit.jupiter.api.*;
4044
import org.junit.jupiter.params.ParameterizedTest;
4145
import org.junit.jupiter.params.provider.EnumSource;
@@ -440,7 +444,7 @@ void connectionShouldBeOnOwningNodeWhenAffinityIsActivatedForClassicQueues(TestI
440444
List<String> names = range(0, URIS.length).mapToObj(ignored -> name(info)).collect(toList());
441445
try {
442446
List<Connection> connections =
443-
Arrays.stream(URIS)
447+
stream(URIS)
444448
.map(
445449
uri ->
446450
connection(
@@ -479,6 +483,57 @@ void connectionShouldBeOnOwningNodeWhenAffinityIsActivatedForClassicQueues(TestI
479483
}
480484
}
481485

486+
@Test
487+
void consumerOnNodeWithoutStreamMemberShouldThrow() {
488+
List<AmqpConnection> connections = List.of();
489+
try {
490+
int memberCount = URIS.length - 1;
491+
this.management.queue(this.name).stream().initialMemberCount(memberCount).queue().declare();
492+
waitAtMost(() -> this.management.queueInfo(this.name).members().size() == memberCount);
493+
494+
List<String> members = this.management.queueInfo(this.name).members();
495+
496+
connections =
497+
stream(URIS)
498+
.map(uri -> (AmqpConnection) this.environment.connectionBuilder().uri(uri).build())
499+
.collect(toList());
500+
501+
Connection cWithMember =
502+
connections.stream()
503+
.filter(c -> members.contains(c.connectionNodename()))
504+
.findAny()
505+
.get();
506+
cWithMember
507+
.consumerBuilder()
508+
.queue(this.name)
509+
.messageHandler((ctx, msg) -> ctx.accept())
510+
.build();
511+
512+
Connection cWithNoMember =
513+
connections.stream()
514+
.filter(c -> !members.contains(c.connectionNodename()))
515+
.findAny()
516+
.get();
517+
518+
assertThatThrownBy(
519+
() ->
520+
cWithNoMember
521+
.consumerBuilder()
522+
.queue(this.name)
523+
.messageHandler((ctx, msg) -> ctx.accept())
524+
.build())
525+
.isInstanceOf(AmqpResourceClosedException.class)
526+
.is(
527+
new Condition<>(
528+
e -> noRunningStreamMemberOnNode((AmqpException) e),
529+
"detected as a no-running-stream-member-on-connection-node exception"));
530+
531+
} finally {
532+
this.connection.management().queueDeletion().delete(this.name);
533+
connections.forEach(Connection::close);
534+
}
535+
}
536+
482537
String moveQqLeader() {
483538
String initialLeader = deleteQqLeader();
484539
addQqMember(initialLeader);
@@ -491,10 +546,6 @@ String deleteQqLeader() {
491546
return deleteLeader(this::deleteQqMember);
492547
}
493548

494-
String deleteStreamLeader() {
495-
return deleteLeader(leader -> Cli.deleteStreamMember(q, leader));
496-
}
497-
498549
String deleteLeader(Consumer<String> deleteMemberOperation) {
499550
Management.QueueInfo info = queueInfo();
500551
String initialLeader = info.leader();

src/test/java/com/rabbitmq/client/amqp/impl/ExceptionUtilsTest.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import static org.assertj.core.api.Assertions.assertThat;
2222

2323
import com.rabbitmq.client.amqp.AmqpException;
24+
import com.rabbitmq.client.amqp.AmqpException.AmqpConnectionException;
25+
import com.rabbitmq.client.amqp.AmqpException.AmqpResourceClosedException;
2426
import javax.net.ssl.SSLException;
2527
import org.apache.qpid.protonj2.client.ErrorCondition;
2628
import org.apache.qpid.protonj2.client.exceptions.*;
@@ -39,23 +41,23 @@ void convertTest() {
3941
convert(new ClientSessionRemotelyClosedException("", errorCondition(ERROR_NOT_FOUND))))
4042
.isInstanceOf(AmqpException.AmqpEntityDoesNotExistException.class);
4143
assertThat(convert(new ClientSessionRemotelyClosedException("")))
42-
.isInstanceOf(AmqpException.AmqpResourceClosedException.class);
44+
.isInstanceOf(AmqpResourceClosedException.class);
4345
assertThat(convert(new ClientLinkRemotelyClosedException("", errorCondition(ERROR_NOT_FOUND))))
4446
.isInstanceOf(AmqpException.AmqpEntityDoesNotExistException.class);
4547
assertThat(
4648
convert(
4749
new ClientLinkRemotelyClosedException("", errorCondition(ERROR_RESOURCE_DELETED))))
4850
.isInstanceOf(AmqpException.AmqpEntityDoesNotExistException.class);
4951
assertThat(convert(new ClientLinkRemotelyClosedException("")))
50-
.isInstanceOf(AmqpException.AmqpResourceClosedException.class);
52+
.isInstanceOf(AmqpResourceClosedException.class);
5153
assertThat(convert(new ClientConnectionRemotelyClosedException("connection reset")))
52-
.isInstanceOf(AmqpException.AmqpConnectionException.class);
54+
.isInstanceOf(AmqpConnectionException.class);
5355
assertThat(convert(new ClientConnectionRemotelyClosedException("connection refused")))
54-
.isInstanceOf(AmqpException.AmqpConnectionException.class);
56+
.isInstanceOf(AmqpConnectionException.class);
5557
assertThat(convert(new ClientConnectionRemotelyClosedException("connection forced")))
56-
.isInstanceOf(AmqpException.AmqpConnectionException.class);
58+
.isInstanceOf(AmqpConnectionException.class);
5759
assertThat(convert(new ClientConnectionRemotelyClosedException("", new RuntimeException())))
58-
.isInstanceOf(AmqpException.AmqpConnectionException.class)
60+
.isInstanceOf(AmqpConnectionException.class)
5961
.hasCauseInstanceOf(ClientConnectionRemotelyClosedException.class);
6062
assertThat(convert(new ClientConnectionRemotelyClosedException("", new SSLException(""))))
6163
.isInstanceOf(AmqpException.AmqpSecurityException.class)
@@ -74,6 +76,17 @@ void convertTest() {
7476
.hasCauseInstanceOf(ClientLinkRemotelyClosedException.class);
7577
}
7678

79+
@Test
80+
void testNoRunningStreamMemberOnNode() {
81+
assertThat(
82+
noRunningStreamMemberOnNode(
83+
new AmqpResourceClosedException(
84+
"stream queue 'stream-RecoveryClusterTest_clusterRestart-a69d-db752afee52a' in vhost '/' does not have a running replica on the local node [condition = amqp:internal-error]")))
85+
.isTrue();
86+
assertThat(noRunningStreamMemberOnNode(new AmqpResourceClosedException("foo"))).isFalse();
87+
assertThat(noRunningStreamMemberOnNode(new AmqpConnectionException("foo", null))).isFalse();
88+
}
89+
7790
ErrorCondition errorCondition(String condition) {
7891
return ErrorCondition.create(condition, null);
7992
}

0 commit comments

Comments
 (0)