Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit a923eed

Browse files
committed
Emit publish nack when retry times out
Fixes #156
1 parent 35d3e9d commit a923eed

File tree

6 files changed

+112
-8
lines changed

6 files changed

+112
-8
lines changed

src/main/java/reactor/rabbitmq/ExceptionHandlers.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2018-2020 VMware, Inc. or its affiliates.
2+
* Copyright (c) 2018-2021 VMware, Inc. or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -72,7 +72,14 @@ public static class SimpleRetryTemplate {
7272

7373
private final Predicate<Throwable> predicate;
7474

75+
private boolean failOnTimeout;
76+
7577
public SimpleRetryTemplate(Duration timeout, Duration waitingTime, Predicate<Throwable> predicate) {
78+
this(timeout, waitingTime, predicate, true);
79+
}
80+
81+
public SimpleRetryTemplate(Duration timeout, Duration waitingTime, Predicate<Throwable> predicate,
82+
boolean failOnTimeout) {
7683
if (timeout == null || timeout.isNegative() || timeout.isZero()) {
7784
throw new IllegalArgumentException("Timeout must be greater than 0");
7885
}
@@ -88,11 +95,13 @@ public SimpleRetryTemplate(Duration timeout, Duration waitingTime, Predicate<Thr
8895
this.timeout = timeout.toMillis();
8996
this.waitingTime = waitingTime.toMillis();
9097
this.predicate = predicate;
98+
this.failOnTimeout = failOnTimeout;
9199
}
92100

93101
public void retry(Callable<Void> operation, Exception e) {
94102
if (predicate.test(e)) {
95103
int elapsedTime = 0;
104+
boolean callSucceeded = false;
96105
while (elapsedTime < timeout) {
97106
try {
98107
Thread.sleep(waitingTime);
@@ -102,13 +111,17 @@ public void retry(Callable<Void> operation, Exception e) {
102111
elapsedTime += waitingTime;
103112
try {
104113
operation.call();
114+
callSucceeded = true;
105115
break;
106116
} catch (Throwable sendingException) {
107117
if (!predicate.test(sendingException)) {
108118
throw new RabbitFluxException("Not retryable exception thrown during retry", sendingException);
109119
}
110120
}
111121
}
122+
if (!callSucceeded && failOnTimeout) {
123+
throw new RabbitFluxRetryTimeoutException("Retry timed out after " + this.timeout + " ms", e);
124+
}
112125
} else {
113126
throw new RabbitFluxException("Not retryable exception, cannot retry", e);
114127
}
@@ -122,7 +135,7 @@ public static class RetryAcknowledgmentExceptionHandler implements BiConsumer<Re
122135
public RetryAcknowledgmentExceptionHandler(Duration timeout, Duration waitingTime,
123136
Predicate<Throwable> predicate) {
124137
this.retryTemplate = new SimpleRetryTemplate(
125-
timeout, waitingTime, predicate
138+
timeout, waitingTime, predicate, false
126139
);
127140
}
128141

@@ -140,8 +153,13 @@ public static class RetrySendingExceptionHandler implements BiConsumer<Sender.Se
140153
private final SimpleRetryTemplate retryTemplate;
141154

142155
public RetrySendingExceptionHandler(Duration timeout, Duration waitingTime, Predicate<Throwable> predicate) {
156+
this(timeout, waitingTime, predicate, true);
157+
}
158+
159+
public RetrySendingExceptionHandler(Duration timeout, Duration waitingTime, Predicate<Throwable> predicate,
160+
boolean failOnTimeout) {
143161
this.retryTemplate = new SimpleRetryTemplate(
144-
timeout, waitingTime, predicate
162+
timeout, waitingTime, predicate, failOnTimeout
145163
);
146164
}
147165

src/main/java/reactor/rabbitmq/OutboundMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2017-2020 VMware, Inc. or its affiliates.
2+
* Copyright (c) 2017-2021 VMware, Inc. or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright (c) 2021 VMware, Inc. or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package reactor.rabbitmq;
18+
19+
/**
20+
*
21+
*/
22+
public class RabbitFluxRetryTimeoutException extends RabbitFluxException {
23+
24+
public RabbitFluxRetryTimeoutException(String message, Throwable cause) {
25+
super(message, cause);
26+
}
27+
28+
}

src/main/java/reactor/rabbitmq/Sender.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2017-2020 VMware, Inc. or its affiliates.
2+
* Copyright (c) 2017-2021 VMware, Inc. or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -1020,6 +1020,8 @@ public void onNext(OMSG message) {
10201020
unconfirmed.remove(nextPublishSeqNo);
10211021
try {
10221022
this.exceptionHandler.accept(new ConfirmSendContext<>(channel, message, this), e);
1023+
} catch (RabbitFluxRetryTimeoutException timeoutException) {
1024+
subscriber.onNext(new OutboundMessageResult<>(message, false, false));
10231025
} catch (Exception innerException) {
10241026
handleError(innerException, new OutboundMessageResult<>(message, false, false));
10251027
}

src/test/java/reactor/rabbitmq/ExceptionHandlersTests.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2018-2020 VMware, Inc. or its affiliates.
2+
* Copyright (c) 2018-2021 VMware, Inc. or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919
import com.rabbitmq.client.AlreadyClosedException;
2020
import com.rabbitmq.client.AuthenticationFailureException;
2121
import com.rabbitmq.client.ShutdownSignalException;
22+
import org.assertj.core.api.Assertions;
2223
import org.junit.jupiter.api.Test;
2324

2425
import java.io.IOException;
@@ -30,6 +31,7 @@
3031

3132
import static java.time.Duration.ofMillis;
3233
import static java.util.Collections.singletonMap;
34+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3335
import static org.junit.jupiter.api.Assertions.assertEquals;
3436
import static org.junit.jupiter.api.Assertions.assertFalse;
3537
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -83,13 +85,27 @@ public void connectionRecoveryTriggering() {
8385
@Test
8486
void retryTimeoutIsReached() {
8587
exceptionHandler = new ExceptionHandlers.RetrySendingExceptionHandler(
86-
ofMillis(100), ofMillis(10), new ExceptionHandlers.ExceptionPredicate(singletonMap(Exception.class, true))
88+
ofMillis(100), ofMillis(10),
89+
new ExceptionHandlers.ExceptionPredicate(singletonMap(Exception.class, true)), false
8790
);
8891
exceptionHandler.accept(sendContext(() -> {
8992
throw new Exception();
9093
}), new Exception());
9194
}
9295

96+
@Test
97+
void shouldThrowTimeoutExceptionWhenConfigured() {
98+
exceptionHandler = new ExceptionHandlers.RetrySendingExceptionHandler(
99+
ofMillis(100), ofMillis(10),
100+
new ExceptionHandlers.ExceptionPredicate(singletonMap(Exception.class, true)), true
101+
);
102+
assertThatThrownBy(() -> {
103+
exceptionHandler.accept(sendContext(() -> {
104+
throw new Exception();
105+
}), new Exception());
106+
}).isInstanceOf(RabbitFluxRetryTimeoutException.class);
107+
}
108+
93109
@Test
94110
void retrySucceeds() {
95111
exceptionHandler = new ExceptionHandlers.RetrySendingExceptionHandler(

src/test/java/reactor/rabbitmq/RabbitFluxTests.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2017-2020 VMware, Inc. or its affiliates.
2+
* Copyright (c) 2017-2021 VMware, Inc. or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -873,6 +873,46 @@ public void publishConfirmsEmitNackForUnconfirmedMessagesOnConnectionFailure() t
873873
assertThat(nackLatch.await(5, TimeUnit.SECONDS)).isTrue();
874874
}
875875

876+
@Test
877+
public void publishConfirmsEmitNackOnRetryTimeout() throws Exception {
878+
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
879+
Connection mockConnection = mock(Connection.class);
880+
Channel mockChannel = mock(Channel.class);
881+
when(mockConnectionFactory.newConnection()).thenReturn(mockConnection);
882+
when(mockConnection.createChannel()).thenReturn(mockChannel);
883+
when(mockConnection.isOpen()).thenReturn(true);
884+
when(mockChannel.getConnection()).thenReturn(mockConnection);
885+
886+
AtomicLong publishSequence = new AtomicLong();
887+
when(mockChannel.getNextPublishSeqNo()).thenAnswer(invocation -> publishSequence.incrementAndGet());
888+
when(mockChannel.isOpen()).thenReturn(true);
889+
890+
doAnswer(answer -> {
891+
throw new RuntimeException();
892+
})
893+
.when(mockChannel).basicPublish(anyString(), anyString(), eq(false), nullable(AMQP.BasicProperties.class), any(byte[].class));
894+
895+
Flux<OutboundMessage> msgFlux = Flux.just(new OutboundMessage("", queue, "".getBytes()));
896+
CountDownLatch nackLatch = new CountDownLatch(1);
897+
sender = createSender(new SenderOptions()
898+
.connectionFactory(mockConnectionFactory));
899+
sender
900+
.sendWithPublishConfirms(msgFlux,
901+
new SendOptions().exceptionHandler((ctx, ex) -> {
902+
throw new RabbitFluxRetryTimeoutException(null, null);
903+
}))
904+
.subscribe(
905+
outboundMessageResult -> {
906+
if (!outboundMessageResult.isAck()) {
907+
nackLatch.countDown();
908+
}
909+
},
910+
error -> {
911+
});
912+
913+
assertThat(nackLatch.await(5, TimeUnit.SECONDS)).isTrue();
914+
}
915+
876916
@Test
877917
public void publishConfirmsErrorWhilePublishing() throws Exception {
878918
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);

0 commit comments

Comments
 (0)