Skip to content
This repository was archived by the owner on Sep 26, 2025. It is now read-only.

Commit b9544f2

Browse files
authored
Merge pull request #56 from pmackowski/retry-ack
Retry ack
2 parents 2463ec1 + d5a41ea commit b9544f2

File tree

7 files changed

+110
-47
lines changed

7 files changed

+110
-47
lines changed

src/docs/asciidoc/api-guide.adoc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,11 @@ include::{test-examples}/ApiGuideReceiver.java[tag=auto-ack-retry-settings]
385385
<1> Retry acknowledgment for 20 seconds every 500 milliseconds on connection failure
386386

387387
When using `Receiver#consumeManualAck`, acknowledgment is handled by the developer, who
388-
can do pretty anything they want on acknowledgment failure. It is possible to benefit from Reactor RabbitMQ
388+
can do pretty anything they want on acknowledgment failure.
389+
390+
`AcknowledgableDelivery#ack` and `AcknowledgableDelivery#nack` methods handle retry internally
391+
based on `BiConsumer<Receiver.AcknowledgmentContext, Exception> exceptionHandler` in the `ConsumeOptions`.
392+
Developer does not have to execute retry explicitly on acknowledgment failure and benefits from Reactor RabbitMQ
389393
retry support when acknowledging a message:
390394

391395
[source,java,indent=0]
@@ -395,7 +399,6 @@ include::{test-examples}/ApiGuideReceiver.java[tag=manual-ack-retry]
395399
<1> Configure retry logic when exception occurs
396400
<2> Process message
397401
<3> Send acknowledgment after business processing
398-
<4> Execute retry on acknowledgment failure
399402

400403
Note the exception handler is a `BiConsumer<Receiver.AcknowledgmentContext, Exception>`. This means
401404
acknowledgment failure can be handled in any way, here we choose to retry the acknowledgment.

src/main/java/reactor/rabbitmq/AcknowledgableDelivery.java

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@
2121

2222
import java.io.IOException;
2323
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.function.BiConsumer;
25+
import java.util.function.Consumer;
2426

2527
/**
2628
* A RabbitMQ {@link Delivery} that can be manually acknowledged or rejected.
2729
*/
2830
public class AcknowledgableDelivery extends Delivery {
2931

3032
private final Channel channel;
33+
private final BiConsumer<Receiver.AcknowledgmentContext, Exception> exceptionHandler;
3134

3235
private final AtomicBoolean notAckedOrNacked = new AtomicBoolean(true);
3336

@@ -37,29 +40,28 @@ public class AcknowledgableDelivery extends Delivery {
3740
*
3841
* @param delivery
3942
* @param channel
43+
* @param exceptionHandler
4044
*/
41-
public AcknowledgableDelivery(Delivery delivery, Channel channel) {
45+
public AcknowledgableDelivery(Delivery delivery, Channel channel, BiConsumer<Receiver.AcknowledgmentContext, Exception> exceptionHandler) {
4246
super(delivery.getEnvelope(), delivery.getProperties(), delivery.getBody());
4347
this.channel = channel;
48+
this.exceptionHandler = exceptionHandler;
4449
}
4550

4651
/**
4752
* Acknowledges this message if it has not been previously acked or nacked.
4853
* Subsequent calls to the method for previously acknowledged message will not produce errors and will simply
4954
* return instantly.
55+
* In case of connection failure, {@link AcknowledgableDelivery#exceptionHandler} is executed.
5056
* @param multiple Defines whether all messages up to and including the supplied delivery tag should be
5157
* acknowledged or not.
5258
*/
5359
public void ack(boolean multiple) {
5460
if (notAckedOrNacked.getAndSet(false)) {
5561
try {
56-
channel.basicAck(getEnvelope().getDeliveryTag(), multiple);
57-
} catch (RuntimeException e) {
58-
notAckedOrNacked.set(true);
59-
throw e;
60-
} catch (IOException e) {
61-
notAckedOrNacked.set(true);
62-
throw new RabbitFluxException(e);
62+
basicAck(multiple);
63+
} catch (Exception e) {
64+
retry(e, (delivery) -> delivery.basicAck(multiple));
6365
}
6466
}
6567
}
@@ -68,6 +70,7 @@ public void ack(boolean multiple) {
6870
* Acknowledges this message if it has not been previously acked or nacked.
6971
* Subsequent calls to the method for previously acknowledged message will not produce errors and will simply
7072
* return instantly.
73+
* In case of connection failure, {@link AcknowledgableDelivery#exceptionHandler} is executed.
7174
*/
7275
public void ack() {
7376
ack(false);
@@ -77,20 +80,17 @@ public void ack() {
7780
* Rejects this message if it has not been previously acked or nacked.
7881
* Subsequent calls to the method for previously acknowledged or rejected message will not produce errors and
7982
* will simply return instantly.
83+
* In case of connection failure, {@link AcknowledgableDelivery#exceptionHandler} is executed.
8084
* @param multiple Defines whether all messages up to and including the supplied delivery tag should be
8185
* rejected or not.
8286
* @param requeue Defines if the message should be added to the queue again instead of being discarded.
8387
*/
8488
public void nack(boolean multiple, boolean requeue) {
8589
if (notAckedOrNacked.getAndSet(false)) {
8690
try {
87-
channel.basicNack(getEnvelope().getDeliveryTag(), multiple, requeue);
88-
} catch (RuntimeException e) {
89-
notAckedOrNacked.set(true);
90-
throw e;
91-
} catch (IOException e) {
92-
notAckedOrNacked.set(true);
93-
throw new RabbitFluxException(e);
91+
basicNack(multiple, requeue);
92+
} catch (Exception e) {
93+
retry(e, (delivery) -> delivery.basicNack(multiple, requeue));
9494
}
9595
}
9696
}
@@ -99,8 +99,38 @@ public void nack(boolean multiple, boolean requeue) {
9999
* Rejects this message if it has not been previously acked or nacked.
100100
* Subsequent calls to the method for previously acknowledged or rejected message will not produce errors and
101101
* will simply return instantly.
102+
* In case of connection failure, {@link AcknowledgableDelivery#exceptionHandler} is executed.
102103
*/
103104
public void nack(boolean requeue) {
104105
nack(false, requeue);
105106
}
107+
108+
private void basicAck(boolean multiple) {
109+
try {
110+
channel.basicAck(getEnvelope().getDeliveryTag(), multiple);
111+
} catch (RuntimeException e) {
112+
throw e;
113+
} catch (IOException e) {
114+
throw new RabbitFluxException(e);
115+
}
116+
}
117+
118+
private void basicNack(boolean multiple, boolean requeue) {
119+
try {
120+
channel.basicNack(getEnvelope().getDeliveryTag(), multiple, requeue);
121+
} catch (RuntimeException e) {
122+
throw e;
123+
} catch (IOException e) {
124+
throw new RabbitFluxException(e);
125+
}
126+
}
127+
128+
private void retry(Exception e, Consumer<AcknowledgableDelivery> consumer) {
129+
try {
130+
exceptionHandler.accept(new Receiver.AcknowledgmentContext(this, consumer), e);
131+
} catch (Exception e2) {
132+
notAckedOrNacked.set(true);
133+
throw e2;
134+
}
135+
}
106136
}

src/main/java/reactor/rabbitmq/ExceptionHandlers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public RetryAcknowledgmentExceptionHandler(Duration timeout, Duration waitingTim
128128
@Override
129129
public void accept(Receiver.AcknowledgmentContext acknowledgmentContext, Exception e) {
130130
retryTemplate.retry(() -> {
131-
acknowledgmentContext.getDelivery().ack();
131+
acknowledgmentContext.ackOrNack();
132132
return null;
133133
}, e);
134134
}

src/main/java/reactor/rabbitmq/Receiver.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.io.IOException;
3535
import java.util.concurrent.TimeoutException;
3636
import java.util.concurrent.atomic.AtomicBoolean;
37+
import java.util.function.Consumer;
3738
import java.util.function.Function;
3839

3940
/**
@@ -134,13 +135,9 @@ public Flux<Delivery> consumeAutoAck(final String queue) {
134135

135136
public Flux<Delivery> consumeAutoAck(final String queue, ConsumeOptions options) {
136137
// TODO why acking here and not just after emitter.next()?
137-
return consumeManualAck(queue, options).doOnNext(msg -> {
138-
try {
139-
msg.ack();
140-
} catch (Exception e) {
141-
options.getExceptionHandler().accept(new AcknowledgmentContext(msg), e);
142-
}
143-
}).map(ackableMsg -> ackableMsg);
138+
return consumeManualAck(queue, options)
139+
.doOnNext(AcknowledgableDelivery::ack)
140+
.map(ackableMsg -> ackableMsg);
144141
}
145142

146143
public Flux<AcknowledgableDelivery> consumeManualAck(final String queue) {
@@ -157,7 +154,7 @@ public Flux<AcknowledgableDelivery> consumeManualAck(final String queue, Consume
157154
}
158155

159156
DeliverCallback deliverCallback = (consumerTag, message) -> {
160-
AcknowledgableDelivery delivery = new AcknowledgableDelivery(message, channel);
157+
AcknowledgableDelivery delivery = new AcknowledgableDelivery(message, channel, options.getExceptionHandler());
161158
if (options.getHookBeforeEmitBiFunction().apply(emitter.requestedFromDownstream(), delivery)) {
162159
emitter.next(delivery);
163160
}
@@ -215,13 +212,15 @@ public void close() {
215212
public static class AcknowledgmentContext {
216213

217214
private final AcknowledgableDelivery delivery;
215+
private final Consumer<AcknowledgableDelivery> consumer;
218216

219-
public AcknowledgmentContext(AcknowledgableDelivery delivery) {
217+
public AcknowledgmentContext(AcknowledgableDelivery delivery, Consumer<AcknowledgableDelivery> consumer) {
220218
this.delivery = delivery;
219+
this.consumer = consumer;
221220
}
222221

223-
public AcknowledgableDelivery getDelivery() {
224-
return delivery;
222+
public void ackOrNack() {
223+
consumer.accept(delivery);
225224
}
226225
}
227226

src/test/java/reactor/rabbitmq/ConnectionRecoveryTests.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -260,17 +260,12 @@ void consumeManualAckRetryOnAck() throws Exception {
260260
BiConsumer<Receiver.AcknowledgmentContext, Exception> exceptionHandler = new ExceptionHandlers.RetryAcknowledgmentExceptionHandler(
261261
ofSeconds(5), ofMillis(100), ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE
262262
);
263-
receiver.consumeManualAck("whatever")
263+
receiver.consumeManualAck("whatever", new ConsumeOptions().exceptionHandler(exceptionHandler))
264264
.subscribe(msg -> {
265265
// do business stuff
266266
// ...
267-
try {
268-
// trying to ack
269-
msg.ack();
270-
} catch (Exception e) {
271-
// when ack-ing fail, retry-ing
272-
exceptionHandler.accept(new Receiver.AcknowledgmentContext(msg), e);
273-
}
267+
// trying to ack
268+
msg.ack();
274269
ackedMessages.incrementAndGet();
275270
});
276271

src/test/java/reactor/rabbitmq/RabbitFluxTests.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,13 @@
4747
import java.util.concurrent.atomic.AtomicInteger;
4848
import java.util.concurrent.atomic.AtomicLong;
4949
import java.util.concurrent.atomic.AtomicReference;
50+
import java.util.function.BiConsumer;
5051
import java.util.function.BiFunction;
5152
import java.util.function.Function;
5253
import java.util.stream.IntStream;
5354
import java.util.stream.Stream;
5455

56+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5557
import static org.junit.jupiter.api.Assertions.*;
5658
import static org.junit.jupiter.params.provider.Arguments.of;
5759
import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -148,10 +150,11 @@ public void receiverCloseIsIdempotent() throws Exception {
148150
@Test
149151
public void acknowledgableDeliveryAckNackIsIdempotent() throws Exception {
150152
Channel channel = mock(Channel.class);
153+
BiConsumer<Receiver.AcknowledgmentContext, Exception> exceptionHandler = mock(ExceptionHandlers.RetryAcknowledgmentExceptionHandler.class);
151154
doNothing().doThrow(new IOException()).when(channel).basicAck(anyLong(), anyBoolean());
152155
doThrow(new IOException()).when(channel).basicNack(anyLong(), anyBoolean(), anyBoolean());
153156
AcknowledgableDelivery msg = new AcknowledgableDelivery(
154-
new Delivery(new Envelope(0, true, null, null), null, null), channel
157+
new Delivery(new Envelope(0, true, null, null), null, null), channel, exceptionHandler
155158
);
156159

157160
msg.ack();
@@ -161,8 +164,44 @@ public void acknowledgableDeliveryAckNackIsIdempotent() throws Exception {
161164

162165
verify(channel, times(1)).basicAck(anyLong(), anyBoolean());
163166
verify(channel, never()).basicNack(anyLong(), anyBoolean(), anyBoolean());
167+
verify(exceptionHandler, never()).accept(any(Receiver.AcknowledgmentContext.class), any(Exception.class));
164168
}
165169

170+
@Test
171+
public void acknowledgableDeliveryWithSuccessfulRetry() throws Exception {
172+
Channel channel = mock(Channel.class);
173+
BiConsumer<Receiver.AcknowledgmentContext, Exception> exceptionHandler =
174+
(acknowledgmentContext, e) -> acknowledgmentContext.ackOrNack();
175+
176+
doThrow(new IOException()).doNothing().when(channel).basicAck(anyLong(), anyBoolean());
177+
178+
AcknowledgableDelivery msg = new AcknowledgableDelivery(
179+
new Delivery(new Envelope(0, true, null, null), null, null), channel, exceptionHandler
180+
);
181+
182+
msg.ack();
183+
184+
verify(channel, times(2)).basicAck(anyLong(), anyBoolean());
185+
}
186+
187+
@Test
188+
public void acknowledgableDeliveryWithUnsuccessfulRetry() throws Exception {
189+
Channel channel = mock(Channel.class);
190+
BiConsumer<Receiver.AcknowledgmentContext, Exception> exceptionHandler =
191+
(acknowledgmentContext, e) -> acknowledgmentContext.ackOrNack();
192+
193+
doThrow(new RuntimeException("exc")).when(channel).basicAck(anyLong(), anyBoolean());
194+
195+
AcknowledgableDelivery msg = new AcknowledgableDelivery(
196+
new Delivery(new Envelope(0, true, null, null), null, null), channel, exceptionHandler
197+
);
198+
199+
assertThatThrownBy(msg::ack).hasMessage("exc");
200+
201+
verify(channel, times(2)).basicAck(anyLong(), anyBoolean());
202+
}
203+
204+
166205
@Test
167206
public void receiverConsumeNoAck() throws Exception {
168207
Channel channel = connection.createChannel();

src/test/java/reactor/rabbitmq/docs/ApiGuideReceiver.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,12 @@ void manualAckRetry() {
9191
Duration.ofSeconds(20), Duration.ofMillis(500),
9292
ExceptionHandlers.CONNECTION_RECOVERY_PREDICATE
9393
);
94-
receiver.consumeManualAck("queue")
95-
.subscribe(msg -> {
96-
// ... // <2>
97-
try {
98-
msg.ack(); // <3>
99-
} catch (Exception e) {
100-
exceptionHandler.accept(new Receiver.AcknowledgmentContext(msg), e); // <4>
101-
}
102-
});
94+
receiver.consumeManualAck("queue",
95+
new ConsumeOptions().exceptionHandler(exceptionHandler))
96+
.subscribe(msg -> {
97+
// ... // <2>
98+
msg.ack(); // <3>
99+
});
103100
// end::manual-ack-retry[]
104101
}
105102

0 commit comments

Comments
 (0)