Skip to content

Commit d5663f4

Browse files
garyrussellartembilan
authored andcommitted
AMQP-828: AutoRecovery with template.receive()
JIRA: https://jira.spring.io/browse/AMQP-828 Close auto-recoving channels during recovery since the consumer is no longer there. This was previously fixed for the `BlockingQueueConsumer`, but not for `template.receive()` operations. * Polishing - PR Comments
1 parent dfcfda1 commit d5663f4

File tree

6 files changed

+176
-40
lines changed

6 files changed

+176
-40
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.springframework.amqp.AmqpApplicationContextClosedException;
5151
import org.springframework.amqp.AmqpException;
5252
import org.springframework.amqp.AmqpTimeoutException;
53+
import org.springframework.amqp.rabbit.support.ClosingRecoveryListener;
5354
import org.springframework.amqp.rabbit.support.PublisherCallbackChannel;
5455
import org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl;
5556
import org.springframework.amqp.support.ConditionalExceptionLogger;
@@ -66,6 +67,7 @@
6667
import com.rabbitmq.client.Channel;
6768
import com.rabbitmq.client.ShutdownListener;
6869
import com.rabbitmq.client.ShutdownSignalException;
70+
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
6971

7072
/**
7173
* A {@link ConnectionFactory} implementation that (when the cache mode is {@link CacheMode#CHANNEL} (default)
@@ -1209,6 +1211,9 @@ private void physicalClose() throws Exception {
12091211
}
12101212
else {
12111213
this.target.close();
1214+
if (this.target instanceof AutorecoveringChannel) {
1215+
ClosingRecoveryListener.removeChannel((AutorecoveringChannel) this.target);
1216+
}
12121217
}
12131218
}
12141219
catch (AlreadyClosedException e) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
6161
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer;
6262
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.ChannelHolder;
63+
import org.springframework.amqp.rabbit.support.ClosingRecoveryListener;
6364
import org.springframework.amqp.rabbit.support.ConsumerCancelledException;
6465
import org.springframework.amqp.rabbit.support.CorrelationData;
6566
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
@@ -100,6 +101,7 @@
100101
import com.rabbitmq.client.DefaultConsumer;
101102
import com.rabbitmq.client.Envelope;
102103
import com.rabbitmq.client.GetResponse;
104+
import com.rabbitmq.client.ShutdownListener;
103105

104106
/**
105107
* <p>
@@ -1203,6 +1205,9 @@ private Delivery consumeDelivery(Channel channel, String queueName, long timeout
12031205
Delivery delivery = null;
12041206
RuntimeException exception = null;
12051207
CompletableFuture<Delivery> future = new CompletableFuture<>();
1208+
ShutdownListener shutdownListener = c -> future.completeExceptionally(c);
1209+
channel.addShutdownListener(shutdownListener);
1210+
ClosingRecoveryListener.addRecoveryListenerIfNecessary(channel);
12061211
DefaultConsumer consumer = createConsumer(queueName, channel, future,
12071212
timeoutMillis < 0 ? DEFAULT_CONSUME_TIMEOUT : timeoutMillis);
12081213
try {
@@ -1226,9 +1231,11 @@ private Delivery consumeDelivery(Channel channel, String queueName, long timeout
12261231
// no result in time
12271232
}
12281233
finally {
1229-
if (exception == null || !(exception instanceof ConsumerCancelledException)) {
1234+
if ((exception == null || !(exception instanceof ConsumerCancelledException))
1235+
&& channel.isOpen()) {
12301236
cancelConsumerQuietly(channel, consumer);
12311237
}
1238+
channel.removeShutdownListener(shutdownListener);
12321239
}
12331240
return delivery;
12341241
}
@@ -1590,6 +1597,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
15901597
}
15911598

15921599
};
1600+
ClosingRecoveryListener.addRecoveryListenerIfNecessary(channel);
15931601
channel.basicConsume(replyTo, true, consumerTag, this.noLocalReplyConsumer, true, null, consumer);
15941602
Message reply = null;
15951603
try {
@@ -1598,7 +1606,9 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
15981606
}
15991607
finally {
16001608
this.replyHolder.remove(messageTag);
1601-
cancelConsumerQuietly(channel, consumer);
1609+
if (channel.isOpen()) {
1610+
cancelConsumerQuietly(channel, consumer);
1611+
}
16021612
}
16031613
return reply;
16041614
}, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java

Lines changed: 3 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.springframework.amqp.rabbit.connection.RabbitResourceHolder;
5454
import org.springframework.amqp.rabbit.connection.RabbitUtils;
5555
import org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException;
56+
import org.springframework.amqp.rabbit.support.ClosingRecoveryListener;
5657
import org.springframework.amqp.rabbit.support.ConsumerCancelledException;
5758
import org.springframework.amqp.rabbit.support.Delivery;
5859
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
@@ -68,10 +69,7 @@
6869
import com.rabbitmq.client.Channel;
6970
import com.rabbitmq.client.DefaultConsumer;
7071
import com.rabbitmq.client.Envelope;
71-
import com.rabbitmq.client.Recoverable;
72-
import com.rabbitmq.client.RecoveryListener;
7372
import com.rabbitmq.client.ShutdownSignalException;
74-
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
7573
import com.rabbitmq.utility.Utility;
7674

7775
/**
@@ -86,7 +84,7 @@
8684
* @author Alex Panchenko
8785
* @author Johno Crawford
8886
*/
89-
public class BlockingQueueConsumer implements RecoveryListener {
87+
public class BlockingQueueConsumer {
9088

9189
private static Log logger = LogFactory.getLog(BlockingQueueConsumer.class);
9290

@@ -571,7 +569,7 @@ public void start() throws AmqpException {
571569
this.resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(this.connectionFactory,
572570
this.transactional);
573571
this.channel = this.resourceHolder.getChannel();
574-
addRecoveryListener();
572+
ClosingRecoveryListener.addRecoveryListenerIfNecessary(this.channel);
575573
}
576574
catch (AmqpAuthenticationException e) {
577575
throw new FatalListenerStartupException("Authentication failure", e);
@@ -654,19 +652,6 @@ else if (e.getFailedQueues().size() < this.queues.length) {
654652
}
655653
}
656654

657-
/**
658-
* Add a listener if necessary so we can immediately close an autorecovered
659-
* channel if necessary since the async consumer will no longer exist.
660-
*/
661-
private void addRecoveryListener() {
662-
if (this.channel instanceof ChannelProxy) {
663-
if (((ChannelProxy) this.channel).getTargetChannel() instanceof AutorecoveringChannel) {
664-
((AutorecoveringChannel) ((ChannelProxy) this.channel).getTargetChannel())
665-
.addRecoveryListener(this);
666-
}
667-
}
668-
}
669-
670655
private void consumeFromQueue(String queue) throws IOException {
671656
InternalConsumer consumer = new InternalConsumer(this.channel, queue);
672657
String consumerTag = this.channel.basicConsume(queue, this.acknowledgeMode.isAutoAck(),
@@ -824,25 +809,6 @@ public boolean commitIfNecessary(boolean locallyTransacted) throws IOException {
824809

825810
}
826811

827-
@Override
828-
public void handleRecovery(Recoverable recoverable) {
829-
// should never get here
830-
handleRecoveryStarted(recoverable);
831-
}
832-
833-
@Override
834-
public void handleRecoveryStarted(Recoverable recoverable) {
835-
if (logger.isDebugEnabled()) {
836-
logger.debug("Closing an autorecovered channel: " + recoverable);
837-
}
838-
try {
839-
((Channel) recoverable).close();
840-
}
841-
catch (IOException | TimeoutException e) {
842-
logger.debug("Error closing an autorecovered channel");
843-
}
844-
}
845-
846812
@Override
847813
public String toString() {
848814
return "Consumer@" + ObjectUtils.getIdentityHexString(this) + ": "
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.support;
18+
19+
import java.io.IOException;
20+
import java.util.concurrent.ConcurrentHashMap;
21+
import java.util.concurrent.ConcurrentMap;
22+
import java.util.concurrent.TimeoutException;
23+
24+
import org.apache.commons.logging.Log;
25+
import org.apache.commons.logging.LogFactory;
26+
27+
import org.springframework.amqp.rabbit.connection.ChannelProxy;
28+
29+
import com.rabbitmq.client.Channel;
30+
import com.rabbitmq.client.Recoverable;
31+
import com.rabbitmq.client.RecoveryListener;
32+
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
33+
34+
/**
35+
* A {@link RecoveryListener} that closes the recovered channel, to avoid
36+
* orphaned consumers.
37+
*
38+
* @author Gary Russell
39+
* @since 1.7.10
40+
*
41+
*/
42+
public final class ClosingRecoveryListener implements RecoveryListener {
43+
44+
private static final Log logger = LogFactory.getLog(ClosingRecoveryListener.class);
45+
46+
private static final RecoveryListener INSTANCE = new ClosingRecoveryListener();
47+
48+
private static final ConcurrentMap<AutorecoveringChannel, Boolean> hasListener = new ConcurrentHashMap<>();
49+
50+
private ClosingRecoveryListener() {
51+
super();
52+
}
53+
54+
@Override
55+
public void handleRecovery(Recoverable recoverable) {
56+
// should never get here
57+
handleRecoveryStarted(recoverable);
58+
}
59+
60+
@Override
61+
public void handleRecoveryStarted(Recoverable recoverable) {
62+
if (logger.isDebugEnabled()) {
63+
logger.debug("Closing an autorecovered channel: " + recoverable);
64+
}
65+
try {
66+
((Channel) recoverable).close();
67+
}
68+
catch (IOException | TimeoutException e) {
69+
logger.error("Error closing an autorecovered channel", e);
70+
}
71+
finally {
72+
hasListener.remove(recoverable);
73+
}
74+
}
75+
76+
/**
77+
* Add a listener if necessary so we can immediately close an autorecovered
78+
* channel if necessary since the actual consumer will no longer exist.
79+
* Idempotent operation.
80+
* @param channel the channel.
81+
*/
82+
public static void addRecoveryListenerIfNecessary(Channel channel) {
83+
AutorecoveringChannel autorecoveringChannel = null;
84+
if (channel instanceof ChannelProxy) {
85+
if (((ChannelProxy) channel).getTargetChannel() instanceof AutorecoveringChannel) {
86+
autorecoveringChannel = (AutorecoveringChannel) ((ChannelProxy) channel)
87+
.getTargetChannel();
88+
}
89+
}
90+
else if (channel instanceof AutorecoveringChannel) {
91+
autorecoveringChannel = (AutorecoveringChannel) channel;
92+
}
93+
if (autorecoveringChannel != null
94+
&& hasListener.putIfAbsent(autorecoveringChannel, Boolean.TRUE) == null) {
95+
autorecoveringChannel.addRecoveryListener(INSTANCE);
96+
}
97+
}
98+
99+
/**
100+
* Remove the channel from the set used to ensure that
101+
* {@link #addRecoveryListenerIfNecessary(Channel)} is idempotent.
102+
* @param channel the channel to remove.
103+
*/
104+
public static void removeChannel(AutorecoveringChannel channel) {
105+
hasListener.remove(channel);
106+
}
107+
108+
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/PublisherCallbackChannelImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import com.rabbitmq.client.ReturnListener;
7676
import com.rabbitmq.client.ShutdownListener;
7777
import com.rabbitmq.client.ShutdownSignalException;
78+
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
7879

7980
/**
8081
* Channel wrapper to allow a single listener able to handle
@@ -171,6 +172,9 @@ public Connection getConnection() {
171172
@Override
172173
public void close(int closeCode, String closeMessage) throws IOException, TimeoutException {
173174
this.delegate.close(closeCode, closeMessage);
175+
if (this.delegate instanceof AutorecoveringChannel) {
176+
ClosingRecoveryListener.removeChannel((AutorecoveringChannel) this.delegate);
177+
}
174178
}
175179

176180
@Override

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateIntegrationTests.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.Collection;
5050
import java.util.HashMap;
5151
import java.util.Map;
52+
import java.util.Properties;
5253
import java.util.UUID;
5354
import java.util.concurrent.CountDownLatch;
5455
import java.util.concurrent.ExecutorService;
@@ -64,6 +65,7 @@
6465
import org.apache.logging.log4j.Level;
6566
import org.junit.After;
6667
import org.junit.Before;
68+
import org.junit.Ignore;
6769
import org.junit.Rule;
6870
import org.junit.Test;
6971
import org.junit.rules.TestName;
@@ -95,6 +97,7 @@
9597
import org.springframework.amqp.rabbit.junit.BrokerTestUtils;
9698
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
9799
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
100+
import org.springframework.amqp.rabbit.support.ClosingRecoveryListener;
98101
import org.springframework.amqp.rabbit.support.ConsumerCancelledException;
99102
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
100103
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
@@ -163,7 +166,7 @@ public class RabbitTemplateIntegrationTests {
163166

164167
@Rule
165168
public LogLevelAdjuster logAdjuster = new LogLevelAdjuster(Level.DEBUG, RabbitTemplate.class,
166-
RabbitAdmin.class, RabbitTemplateIntegrationTests.class, BrokerRunning.class);
169+
RabbitAdmin.class, RabbitTemplateIntegrationTests.class, BrokerRunning.class, ClosingRecoveryListener.class);
167170

168171
@Rule
169172
public TestName testName = new TestName();
@@ -1604,6 +1607,46 @@ public void waitForConfirms() {
16041607
assertTrue(result);
16051608
}
16061609

1610+
@Test
1611+
@Ignore("Not an automated test - requires broker restart")
1612+
public void testReceiveNoAutoRecovery() throws Exception {
1613+
CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
1614+
ccf.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
1615+
RabbitAdmin admin = new RabbitAdmin(ccf);
1616+
ExecutorService exec = Executors.newSingleThreadExecutor();
1617+
final RabbitTemplate template = new RabbitTemplate(ccf);
1618+
template.setReceiveTimeout(30_000);
1619+
exec.execute(() -> {
1620+
while (true) {
1621+
try {
1622+
Thread.sleep(2000);
1623+
template.receive(ROUTE);
1624+
}
1625+
catch (AmqpException e) {
1626+
e.printStackTrace();
1627+
if (e.getCause() != null
1628+
&& e.getCause().getClass().equals(InterruptedException.class)) {
1629+
Thread.currentThread().interrupt();
1630+
return;
1631+
}
1632+
}
1633+
catch (InterruptedException e) {
1634+
Thread.currentThread().interrupt();
1635+
return;
1636+
}
1637+
}
1638+
});
1639+
System .out .println("Wait for consumer; then bounce broker; then enter after it's back up");
1640+
System.in.read();
1641+
for (int i = 0; i < 20; i++) {
1642+
Properties queueProperties = admin.getQueueProperties(ROUTE);
1643+
System .out .println(queueProperties);
1644+
Thread.sleep(1000);
1645+
}
1646+
exec.shutdownNow();
1647+
ccf.destroy();
1648+
}
1649+
16071650
private Collection<String> getMessagesToSend() {
16081651
return Arrays.asList("foo", "bar");
16091652
}

0 commit comments

Comments
 (0)