Skip to content

Commit b5a198f

Browse files
committed
Add the "noproc" case to detect the no-local-stream-member error
1 parent 5680cde commit b5a198f

File tree

2 files changed

+4
-7
lines changed

2 files changed

+4
-7
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -307,11 +307,7 @@ void recoverAfterConnectionFailure() {
307307
this.nativeHandler,
308308
this.nativeCloseHandler),
309309
e -> {
310-
boolean shouldRetry =
311-
e instanceof AmqpException.AmqpResourceClosedException
312-
&& e.getMessage().contains("stream queue")
313-
&& e.getMessage()
314-
.contains("does not have a running replica on the local node");
310+
boolean shouldRetry = ExceptionUtils.noRunningStreamMemberOnNode(e);
315311
LOGGER.debug("Retrying receiver creation on consumer recovery: {}", shouldRetry);
316312
return shouldRetry;
317313
},

src/main/java/com/rabbitmq/client/amqp/impl/ExceptionUtils.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,12 +129,13 @@ static boolean unauthorizedAccess(ClientResourceRemotelyClosedException e) {
129129
return isUnauthorizedAccess(e.getErrorCondition());
130130
}
131131

132-
static boolean noRunningStreamMemberOnNode(AmqpException e) {
132+
static boolean noRunningStreamMemberOnNode(Exception e) {
133133
if (e instanceof AmqpException.AmqpResourceClosedException) {
134134
String message = e.getMessage();
135135
return message != null
136136
&& message.contains("stream queue")
137-
&& message.contains("does not have a running replica on the local node");
137+
&& message.contains("does not have a running replica on the local node")
138+
&& message.contains("noproc");
138139
} else {
139140
return false;
140141
}

0 commit comments

Comments
 (0)