Skip to content

Commit 41a61eb

Browse files
artembilangaryrussell
authored andcommitted
AMQP-847: Close channel in RabbitTemplate.receive
JIRA: https://jira.spring.io/browse/AMQP-847 To avoid unacked messages race condition when client timeouts, but at this moment the message becomes available in queue, physically close a receive channel on the `TimeoutException` from the `Future.get()` **Cherry-pick to 2.0.x & 1.7.x**
1 parent c20e216 commit 41a61eb

File tree

2 files changed

+13
-0
lines changed

2 files changed

+13
-0
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -890,6 +890,7 @@ public Message doInRabbit(Channel channel) throws Exception {
890890
}
891891
channel.basicCancel(consumer.getConsumerTag());
892892
if (delivery == null) {
893+
RabbitUtils.setPhysicalCloseRequired(channel, true);
893894
return null;
894895
}
895896
else {
@@ -1029,6 +1030,9 @@ else if (channelTransacted) {
10291030
}
10301031
receiveMessage = buildMessageFromDelivery(delivery);
10311032
}
1033+
else {
1034+
RabbitUtils.setPhysicalCloseRequired(channel, true);
1035+
}
10321036
}
10331037
if (receiveMessage != null) {
10341038
Object receive = receiveMessage;

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateIntegrationTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.lang.reflect.Field;
4848
import java.util.Arrays;
4949
import java.util.HashMap;
50+
import java.util.List;
5051
import java.util.Map;
5152
import java.util.UUID;
5253
import java.util.concurrent.Callable;
@@ -348,6 +349,14 @@ public void testReceiveBlockingNoTimeout() throws Exception {
348349
assertNull(this.template.receive(ROUTE));
349350
}
350351

352+
@Test
353+
public void testReceiveTimeoutRequeue() {
354+
assertNull(this.template.receiveAndConvert(ROUTE, 1));
355+
assertEquals(0,
356+
TestUtils.getPropertyValue(this.connectionFactory, "cachedChannelsNonTransactional", List.class)
357+
.size());
358+
}
359+
351360
@Test
352361
public void testReceiveBlockingTx() throws Exception {
353362
this.template.convertAndSend(ROUTE, "blockTX");

0 commit comments

Comments
 (0)