Skip to content

Commit c2b8124

Browse files
garyrussellartembilan
authored andcommitted
GH-1406: Fix Possible Double Ack in Consumer Batch
Resolves #1406 Previously, if an MPP caused the last record in an ack to be skipped two acks for the same deliveryTag would be sent causine a channel shutdown. There is no need to ack skipped records because the entire batch is acked (with basicAck multiple=true). **cherry-pick to 2.3.x, 2.2.x** # Conflicts: # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java
1 parent 9800b8f commit c2b8124

File tree

2 files changed

+47
-1
lines changed

2 files changed

+47
-1
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -937,7 +937,6 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep
937937
for (MessagePostProcessor processor : getAfterReceivePostProcessors()) {
938938
message = processor.postProcessMessage(message);
939939
if (message == null) {
940-
channel.basicAck(deliveryTag, false);
941940
if (this.logger.isDebugEnabled()) {
942941
this.logger.debug(
943942
"Message Post Processor returned 'null', discarding message " + original);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,18 @@
2626
import static org.mockito.ArgumentMatchers.anyLong;
2727
import static org.mockito.ArgumentMatchers.anyMap;
2828
import static org.mockito.ArgumentMatchers.anyString;
29+
import static org.mockito.ArgumentMatchers.eq;
2930
import static org.mockito.BDDMockito.given;
3031
import static org.mockito.BDDMockito.willAnswer;
3132
import static org.mockito.BDDMockito.willReturn;
3233
import static org.mockito.BDDMockito.willThrow;
3334
import static org.mockito.Mockito.atLeastOnce;
3435
import static org.mockito.Mockito.mock;
36+
import static org.mockito.Mockito.never;
3537
import static org.mockito.Mockito.spy;
3638
import static org.mockito.Mockito.times;
3739
import static org.mockito.Mockito.verify;
40+
import static org.mockito.Mockito.verifyNoMoreInteractions;
3841

3942
import java.io.IOException;
4043
import java.net.URL;
@@ -67,6 +70,7 @@
6770
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
6871
import org.springframework.amqp.core.AcknowledgeMode;
6972
import org.springframework.amqp.core.AnonymousQueue;
73+
import org.springframework.amqp.core.BatchMessageListener;
7074
import org.springframework.amqp.core.Message;
7175
import org.springframework.amqp.core.MessageBuilder;
7276
import org.springframework.amqp.core.MessagePostProcessor;
@@ -650,6 +654,49 @@ public Message postProcessMessage(Message message) throws AmqpException {
650654
assertThat(afterReceivePostProcessors).containsExactly(mpp2, mpp3);
651655
}
652656

657+
@Test
658+
void filterMppNoDoubleAck() throws Exception {
659+
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
660+
Connection connection = mock(Connection.class);
661+
Channel channel = mock(Channel.class);
662+
given(connectionFactory.createConnection()).willReturn(connection);
663+
given(connection.createChannel(false)).willReturn(channel);
664+
final AtomicReference<Consumer> consumer = new AtomicReference<>();
665+
willAnswer(invocation -> {
666+
consumer.set(invocation.getArgument(6));
667+
consumer.get().handleConsumeOk("1");
668+
return "1";
669+
}).given(channel)
670+
.basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(), anyMap(),
671+
any(Consumer.class));
672+
final CountDownLatch latch = new CountDownLatch(1);
673+
willAnswer(invocation -> {
674+
latch.countDown();
675+
return null;
676+
}).given(channel).basicAck(anyLong(), anyBoolean());
677+
678+
final SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
679+
container.setAfterReceivePostProcessors(msg -> null);
680+
container.setQueueNames("foo");
681+
MessageListener listener = mock(BatchMessageListener.class);
682+
container.setMessageListener(listener);
683+
container.setBatchSize(2);
684+
container.setConsumerBatchEnabled(true);
685+
container.start();
686+
BasicProperties props = new BasicProperties();
687+
byte[] payload = "baz".getBytes();
688+
Envelope envelope = new Envelope(1L, false, "foo", "bar");
689+
consumer.get().handleDelivery("1", envelope, props, payload);
690+
envelope = new Envelope(2L, false, "foo", "bar");
691+
consumer.get().handleDelivery("1", envelope, props, payload);
692+
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
693+
verify(channel, never()).basicAck(eq(1), anyBoolean());
694+
verify(channel).basicAck(2, true);
695+
container.stop();
696+
verify(listener).containerAckMode(AcknowledgeMode.AUTO);
697+
verifyNoMoreInteractions(listener);
698+
}
699+
653700
private Answer<Object> messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container,
654701
final boolean cancel, final CountDownLatch latch) {
655702
return invocation -> {

0 commit comments

Comments
 (0)