Skip to content

Commit 1ced9ab

Browse files
garyrussellartembilan
authored andcommitted
AMQP-814: Add retry to RabbitAdmin
JIRA: https://jira.spring.io/browse/AMQP-814 Add retry to avoid race conditions with auto-delete, exclusive queues. **cherry-pick to 2.0.x** **back port to 1.7.x, without lambda in RabbitAdmin** # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitAdmin.java # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminTests.java
1 parent daa78ff commit 1ced9ab

File tree

2 files changed

+106
-9
lines changed

2 files changed

+106
-9
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitAdmin.java

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@
4949
import org.springframework.context.ApplicationEventPublisher;
5050
import org.springframework.context.ApplicationEventPublisherAware;
5151
import org.springframework.jmx.export.annotation.ManagedOperation;
52+
import org.springframework.retry.RetryCallback;
53+
import org.springframework.retry.RetryContext;
54+
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
55+
import org.springframework.retry.policy.SimpleRetryPolicy;
56+
import org.springframework.retry.support.RetryTemplate;
5257
import org.springframework.util.Assert;
5358

5459
import com.rabbitmq.client.AMQP.Queue.DeclareOk;
@@ -97,13 +102,17 @@ public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, Applicat
97102

98103
private final RabbitTemplate rabbitTemplate;
99104

105+
private RetryTemplate retryTemplate;
106+
107+
private boolean retryDisabled;
108+
100109
private volatile boolean running = false;
101110

102-
private volatile boolean autoStartup = true;
111+
private boolean autoStartup = true;
103112

104-
private volatile ApplicationContext applicationContext;
113+
private ApplicationContext applicationContext;
105114

106-
private volatile boolean ignoreDeclarationExceptions;
115+
private boolean ignoreDeclarationExceptions;
107116

108117
private final Object lifecycleMonitor = new Object();
109118

@@ -169,6 +178,7 @@ public RabbitTemplate getRabbitTemplate() {
169178
public void declareExchange(final Exchange exchange) {
170179
try {
171180
this.rabbitTemplate.execute(new ChannelCallback<Object>() {
181+
172182
@Override
173183
public Object doInRabbit(Channel channel) throws Exception {
174184
declareExchanges(channel, exchange);
@@ -185,6 +195,7 @@ public Object doInRabbit(Channel channel) throws Exception {
185195
@ManagedOperation
186196
public boolean deleteExchange(final String exchangeName) {
187197
return this.rabbitTemplate.execute(new ChannelCallback<Boolean>() {
198+
188199
@Override
189200
public Boolean doInRabbit(Channel channel) throws Exception {
190201
if (isDeletingDefaultExchange(exchangeName)) {
@@ -219,6 +230,7 @@ public Boolean doInRabbit(Channel channel) throws Exception {
219230
public String declareQueue(final Queue queue) {
220231
try {
221232
return this.rabbitTemplate.execute(new ChannelCallback<String>() {
233+
222234
@Override
223235
public String doInRabbit(Channel channel) throws Exception {
224236
DeclareOk[] declared = declareQueues(channel, queue);
@@ -244,6 +256,7 @@ public String doInRabbit(Channel channel) throws Exception {
244256
public Queue declareQueue() {
245257
try {
246258
DeclareOk declareOk = this.rabbitTemplate.execute(new ChannelCallback<DeclareOk>() {
259+
247260
@Override
248261
public DeclareOk doInRabbit(Channel channel) throws Exception {
249262
return channel.queueDeclare();
@@ -262,6 +275,7 @@ public DeclareOk doInRabbit(Channel channel) throws Exception {
262275
@ManagedOperation
263276
public boolean deleteQueue(final String queueName) {
264277
return this.rabbitTemplate.execute(new ChannelCallback<Boolean>() {
278+
265279
@Override
266280
public Boolean doInRabbit(Channel channel) throws Exception {
267281
try {
@@ -279,6 +293,7 @@ public Boolean doInRabbit(Channel channel) throws Exception {
279293
@ManagedOperation
280294
public void deleteQueue(final String queueName, final boolean unused, final boolean empty) {
281295
this.rabbitTemplate.execute(new ChannelCallback<Object>() {
296+
282297
@Override
283298
public Object doInRabbit(Channel channel) throws Exception {
284299
channel.queueDelete(queueName, unused, empty);
@@ -291,6 +306,7 @@ public Object doInRabbit(Channel channel) throws Exception {
291306
@ManagedOperation
292307
public void purgeQueue(final String queueName, final boolean noWait) {
293308
this.rabbitTemplate.execute(new ChannelCallback<Object>() {
309+
294310
@Override
295311
public Object doInRabbit(Channel channel) throws Exception {
296312
channel.queuePurge(queueName);
@@ -305,6 +321,7 @@ public Object doInRabbit(Channel channel) throws Exception {
305321
public void declareBinding(final Binding binding) {
306322
try {
307323
this.rabbitTemplate.execute(new ChannelCallback<Object>() {
324+
308325
@Override
309326
public Object doInRabbit(Channel channel) throws Exception {
310327
declareBindings(channel, binding);
@@ -321,6 +338,7 @@ public Object doInRabbit(Channel channel) throws Exception {
321338
@ManagedOperation
322339
public void removeBinding(final Binding binding) {
323340
this.rabbitTemplate.execute(new ChannelCallback<Object>() {
341+
324342
@Override
325343
public Object doInRabbit(Channel channel) throws Exception {
326344
if (binding.isDestinationQueue()) {
@@ -348,6 +366,7 @@ public Object doInRabbit(Channel channel) throws Exception {
348366
public Properties getQueueProperties(final String queueName) {
349367
Assert.hasText(queueName, "'queueName' cannot be null or empty");
350368
return this.rabbitTemplate.execute(new ChannelCallback<Properties>() {
369+
351370
@Override
352371
public Properties doInRabbit(Channel channel) throws Exception {
353372
try {
@@ -382,6 +401,25 @@ public Properties doInRabbit(Channel channel) throws Exception {
382401
});
383402
}
384403

404+
/**
405+
* Set a retry template for auto declarations. There is a race condition with
406+
* auto-delete, exclusive queues in that the queue might still exist for a short time,
407+
* preventing the redeclaration. The default retry configuration will try 5 times with
408+
* an exponential backOff starting at 1 second a multiplier of 2.0 and a max interval
409+
* of 5 seconds. To disable retry, set the argument to {@code null}. Note that this
410+
* retry is at the macro level - all declarations will be retried within the scope of
411+
* this template. If you supplied a {@link RabbitTemplate} that is configured with a
412+
* {@link RetryTemplate}, its template will retry each individual declaration.
413+
* @param retryTemplate the retry template.
414+
* @since 1.7.8
415+
*/
416+
public void setRetryTemplate(RetryTemplate retryTemplate) {
417+
this.retryTemplate = retryTemplate;
418+
if (retryTemplate == null) {
419+
this.retryDisabled = true;
420+
}
421+
}
422+
385423
// Lifecycle implementation
386424

387425
public boolean isAutoStartup() {
@@ -406,6 +444,15 @@ public void afterPropertiesSet() {
406444
return;
407445
}
408446

447+
if (this.retryTemplate == null && !this.retryDisabled) {
448+
this.retryTemplate = new RetryTemplate();
449+
this.retryTemplate.setRetryPolicy(new SimpleRetryPolicy(5));
450+
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
451+
backOffPolicy.setInitialInterval(1000);
452+
backOffPolicy.setMultiplier(2.0);
453+
backOffPolicy.setMaxInterval(5000);
454+
this.retryTemplate.setBackOffPolicy(backOffPolicy);
455+
}
409456
if (this.connectionFactory instanceof CachingConnectionFactory &&
410457
((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {
411458
this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");
@@ -430,7 +477,20 @@ public void onCreate(Connection connection) {
430477
* chatter). In fact it might even be a good thing: exclusive queues only make sense if they are
431478
* declared for every connection. If anyone has a problem with it: use auto-startup="false".
432479
*/
433-
initialize();
480+
if (RabbitAdmin.this.retryTemplate != null) {
481+
RabbitAdmin.this.retryTemplate.execute(
482+
new RetryCallback<Object, RuntimeException>() {
483+
484+
@Override
485+
public Object doWithRetry(RetryContext c) throws RuntimeException {
486+
initialize();
487+
return null;
488+
}
489+
});
490+
}
491+
else {
492+
initialize();
493+
}
434494
}
435495
finally {
436496
initializing.compareAndSet(true, false);
@@ -469,8 +529,8 @@ public void initialize() {
469529

470530
@SuppressWarnings("rawtypes")
471531
Collection<Collection> collections = this.declareCollections
472-
? this.applicationContext.getBeansOfType(Collection.class, false, false).values()
473-
: Collections.<Collection>emptyList();
532+
? this.applicationContext.getBeansOfType(Collection.class, false, false).values()
533+
: Collections.<Collection>emptyList();
474534
for (Collection<?> collection : collections) {
475535
if (collection.size() > 0 && collection.iterator().next() instanceof Declarable) {
476536
for (Object declarable : collection) {
@@ -492,7 +552,7 @@ else if (declarable instanceof Binding) {
492552
final Collection<Binding> bindings = filterDeclarables(contextBindings);
493553

494554
for (Exchange exchange : exchanges) {
495-
if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) {
555+
if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) {
496556
this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
497557
+ exchange.getName()
498558
+ ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
@@ -517,6 +577,7 @@ else if (declarable instanceof Binding) {
517577
return;
518578
}
519579
this.rabbitTemplate.execute(new ChannelCallback<Object>() {
580+
520581
@Override
521582
public Object doInRabbit(Channel channel) throws Exception {
522583
declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
@@ -540,7 +601,7 @@ private <T extends Declarable> Collection<T> filterDeclarables(Collection<T> dec
540601
for (T declarable : declarables) {
541602
Collection<?> adminsWithWhichToDeclare = declarable.getDeclaringAdmins();
542603
if (declarable.shouldDeclare() &&
543-
(adminsWithWhichToDeclare.isEmpty() || adminsWithWhichToDeclare.contains(this))) {
604+
(adminsWithWhichToDeclare.isEmpty() || adminsWithWhichToDeclare.contains(this))) {
544605
filtered.add(declarable);
545606
}
546607
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminTests.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,12 @@
2626
import static org.junit.Assert.assertThat;
2727
import static org.junit.Assert.assertTrue;
2828
import static org.junit.Assert.fail;
29+
import static org.mockito.BDDMockito.given;
30+
import static org.mockito.BDDMockito.willThrow;
2931
import static org.mockito.Matchers.any;
32+
import static org.mockito.Matchers.anyBoolean;
3033
import static org.mockito.Matchers.anyString;
34+
import static org.mockito.Matchers.isNull;
3135
import static org.mockito.Mockito.doAnswer;
3236
import static org.mockito.Mockito.doReturn;
3337
import static org.mockito.Mockito.doThrow;
@@ -50,12 +54,14 @@
5054

5155
import org.apache.commons.logging.Log;
5256
import org.hamcrest.Matchers;
57+
import org.junit.Ignore;
5358
import org.junit.Rule;
5459
import org.junit.Test;
5560
import org.junit.rules.ExpectedException;
5661
import org.mockito.ArgumentCaptor;
5762
import org.mockito.internal.stubbing.answers.DoesNothing;
5863

64+
import org.springframework.amqp.UncategorizedAmqpException;
5965
import org.springframework.amqp.core.AnonymousQueue;
6066
import org.springframework.amqp.core.Binding;
6167
import org.springframework.amqp.core.Binding.DestinationType;
@@ -270,9 +276,10 @@ public void testAvoidHangAMQP_508() {
270276
String longName = new String(new byte[300]).replace('\u0000', 'x');
271277
try {
272278
admin.declareQueue(new Queue(longName));
279+
fail("expected exception");
273280
}
274281
catch (Exception e) {
275-
e.printStackTrace();
282+
// NOSONAR
276283
}
277284
String goodName = "foobar";
278285
admin.declareQueue(new Queue(goodName));
@@ -311,6 +318,35 @@ public void testIgnoreDeclarationExeptionsTimeout() throws Exception {
311318
assertSame(events.get(3), admin.getLastDeclarationExceptionEvent());
312319
}
313320

321+
@Test
322+
@Ignore // too long; not much value
323+
public void testRetry() throws Exception {
324+
com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory = mock(com.rabbitmq.client.ConnectionFactory.class);
325+
com.rabbitmq.client.Connection connection = mock(com.rabbitmq.client.Connection.class);
326+
given(rabbitConnectionFactory.newConnection((ExecutorService) isNull(), anyString())).willReturn(connection);
327+
Channel channel = mock(Channel.class);
328+
given(connection.createChannel()).willReturn(channel);
329+
given(channel.isOpen()).willReturn(true);
330+
willThrow(new RuntimeException()).given(channel)
331+
.queueDeclare(anyString(), anyBoolean(), anyBoolean(), anyBoolean(), any());
332+
CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
333+
RabbitAdmin admin = new RabbitAdmin(ccf);
334+
GenericApplicationContext ctx = new GenericApplicationContext();
335+
ctx.getBeanFactory().registerSingleton("foo", new AnonymousQueue());
336+
ctx.getBeanFactory().registerSingleton("admin", admin);
337+
admin.setApplicationContext(ctx);
338+
ctx.getBeanFactory().initializeBean(admin, "admin");
339+
ctx.refresh();
340+
try {
341+
ccf.createConnection();
342+
fail("expected exception");
343+
}
344+
catch (UncategorizedAmqpException e) {
345+
// NOSONAR
346+
}
347+
ctx.close();
348+
}
349+
314350
@Configuration
315351
public static class Config {
316352

0 commit comments

Comments
 (0)