Skip to content

Commit e9d553a

Browse files
garyrussellartembilan
authored andcommitted
AMQP-832: Async @RabbitListener Return Types
JIRA: https://jira.spring.io/browse/AMQP-832 Polishing - PR Comments - reactor optional * Polishing imports for Checkstyle rules * Use `ClassUtils.isPresent()` instead * Fix the sentence to be present only as a single line
1 parent b19ab35 commit e9d553a

File tree

5 files changed

+251
-10
lines changed

5 files changed

+251
-10
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ subprojects { subproject ->
8484
mockitoVersion = '2.18.0'
8585
rabbitmqVersion = project.hasProperty('rabbitmqVersion') ? project.rabbitmqVersion : '5.4.1'
8686
rabbitmqHttpClientVersion = '2.1.0.RELEASE'
87+
reactorVersion = '3.1.6.RELEASE'
8788

8889
springVersion = project.hasProperty('springVersion') ? project.springVersion : '5.1.0.RC2'
8990

@@ -264,6 +265,7 @@ project('spring-rabbit') {
264265
compile "org.springframework:spring-context:$springVersion"
265266
compile "org.springframework:spring-messaging:$springVersion"
266267
compile "org.springframework:spring-tx:$springVersion"
268+
compile ("io.projectreactor:reactor-core:$reactorVersion", optional)
267269

268270
compile ("ch.qos.logback:logback-classic:$logbackVersion", optional)
269271

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/AbstractAdaptableMessageListener.java

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.IOException;
2020
import java.lang.reflect.Type;
2121
import java.util.Arrays;
22+
import java.util.function.Consumer;
2223

2324
import org.apache.commons.logging.Log;
2425
import org.apache.commons.logging.LogFactory;
@@ -48,8 +49,11 @@
4849
import org.springframework.retry.RecoveryCallback;
4950
import org.springframework.retry.support.RetryTemplate;
5051
import org.springframework.util.Assert;
52+
import org.springframework.util.ClassUtils;
53+
import org.springframework.util.concurrent.ListenableFuture;
5154

5255
import com.rabbitmq.client.Channel;
56+
import reactor.core.publisher.Mono;
5357

5458
/**
5559
* An abstract {@link MessageListener} adapter providing the necessary infrastructure
@@ -73,6 +77,9 @@ public abstract class AbstractAdaptableMessageListener implements ChannelAwareMe
7377

7478
private static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}");
7579

80+
private static final boolean monoPresent =
81+
ClassUtils.isPresent("reactor.core.publisher.Mono", ChannelAwareMessageListener.class.getClassLoader());;
82+
7683
/** Logger available to subclasses. */
7784
protected final Log logger = LogFactory.getLog(getClass());
7885

@@ -301,18 +308,18 @@ protected void handleResult(InvocationResult resultArg, Message request, Channel
301308
*/
302309
protected void handleResult(InvocationResult resultArg, Message request, Channel channel, Object source) {
303310
if (channel != null) {
304-
if (this.logger.isDebugEnabled()) {
305-
this.logger.debug("Listener method returned result [" + resultArg
306-
+ "] - generating response message for it");
311+
if (resultArg.getReturnValue() instanceof ListenableFuture) {
312+
((ListenableFuture<?>) resultArg.getReturnValue()).addCallback(
313+
r -> asyncSuccess(resultArg, request, channel, source, r),
314+
t -> asyncFailure(request, channel, t));
307315
}
308-
try {
309-
Message response = buildMessage(channel, resultArg.getReturnValue(), resultArg.getReturnType());
310-
postProcessResponse(request, response);
311-
Address replyTo = getReplyToAddress(request, source, resultArg);
312-
sendResponse(channel, replyTo, response);
316+
else if (monoPresent && MonoHandler.isMono(resultArg.getReturnValue())) {
317+
MonoHandler.subscribe(resultArg.getReturnValue(),
318+
r -> asyncSuccess(resultArg, request, channel, source, r),
319+
t -> asyncFailure(request, channel, t));
313320
}
314-
catch (Exception ex) {
315-
throw new ReplyFailureException("Failed to send reply with payload '" + resultArg + "'", ex);
321+
else {
322+
doHandleResult(resultArg, request, channel, source);
316323
}
317324
}
318325
else if (this.logger.isWarnEnabled()) {
@@ -321,6 +328,43 @@ else if (this.logger.isWarnEnabled()) {
321328
}
322329
}
323330

331+
private void asyncSuccess(InvocationResult resultArg, Message request, Channel channel, Object source, Object r) {
332+
doHandleResult(new InvocationResult(r, resultArg.getSendTo(), resultArg.getReturnType()), request,
333+
channel, source);
334+
try {
335+
channel.basicAck(request.getMessageProperties().getDeliveryTag(), false);
336+
}
337+
catch (IOException e) {
338+
this.logger.error("Failed to nack message", e);
339+
}
340+
}
341+
342+
private void asyncFailure(Message request, Channel channel, Throwable t) {
343+
this.logger.error("Future was completed with an exception for " + request, t);
344+
try {
345+
channel.basicNack(request.getMessageProperties().getDeliveryTag(), false, true);
346+
}
347+
catch (IOException e) {
348+
this.logger.error("Failed to nack message", e);
349+
}
350+
}
351+
352+
protected void doHandleResult(InvocationResult resultArg, Message request, Channel channel, Object source) {
353+
if (this.logger.isDebugEnabled()) {
354+
this.logger.debug("Listener method returned result [" + resultArg
355+
+ "] - generating response message for it");
356+
}
357+
try {
358+
Message response = buildMessage(channel, resultArg.getReturnValue(), resultArg.getReturnType());
359+
postProcessResponse(request, response);
360+
Address replyTo = getReplyToAddress(request, source, resultArg);
361+
sendResponse(channel, replyTo, response);
362+
}
363+
catch (Exception ex) {
364+
throw new ReplyFailureException("Failed to send reply with payload '" + resultArg + "'", ex);
365+
}
366+
}
367+
324368
protected String getReceivedExchange(Message request) {
325369
return request.getMessageProperties().getReceivedExchange();
326370
}
@@ -517,4 +561,19 @@ public Object getResult() {
517561

518562
}
519563

564+
private static class MonoHandler {
565+
566+
static boolean isMono(Object result) {
567+
return result instanceof Mono;
568+
}
569+
570+
@SuppressWarnings("unchecked")
571+
static void subscribe(Object returnValue, Consumer<? super Object> success,
572+
Consumer<? super Throwable> failure) {
573+
574+
((Mono<? super Object>) returnValue).subscribe(success, failure);
575+
}
576+
577+
}
578+
520579
}
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.annotation;
18+
19+
import static org.junit.Assert.assertEquals;
20+
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
24+
import org.junit.Rule;
25+
import org.junit.Test;
26+
import org.junit.runner.RunWith;
27+
28+
import org.springframework.amqp.core.AcknowledgeMode;
29+
import org.springframework.amqp.core.AnonymousQueue;
30+
import org.springframework.amqp.core.Queue;
31+
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
32+
import org.springframework.amqp.rabbit.AsyncRabbitTemplate.RabbitConverterFuture;
33+
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
34+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
35+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
36+
import org.springframework.amqp.rabbit.core.RabbitAdmin;
37+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
38+
import org.springframework.amqp.rabbit.junit.BrokerRunning;
39+
import org.springframework.beans.factory.annotation.Autowired;
40+
import org.springframework.context.annotation.Bean;
41+
import org.springframework.context.annotation.Configuration;
42+
import org.springframework.stereotype.Component;
43+
import org.springframework.test.annotation.DirtiesContext;
44+
import org.springframework.test.context.ContextConfiguration;
45+
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
46+
import org.springframework.util.concurrent.ListenableFuture;
47+
import org.springframework.util.concurrent.SettableListenableFuture;
48+
49+
import reactor.core.publisher.Mono;
50+
51+
/**
52+
* @author Gary Russell
53+
* @since 2.1
54+
*
55+
*/
56+
@ContextConfiguration
57+
@RunWith(SpringJUnit4ClassRunner.class)
58+
@DirtiesContext
59+
public class AsyncListenerTests {
60+
61+
@Rule
62+
public BrokerRunning brokerRunning = BrokerRunning.isRunning();
63+
64+
@Autowired
65+
private RabbitTemplate rabbitTemplate;
66+
67+
@Autowired
68+
private AsyncRabbitTemplate asyncTemplate;
69+
70+
@Autowired
71+
private Queue queue1;
72+
73+
@Autowired
74+
private Queue queue2;
75+
76+
@Test
77+
public void testAsyncListener() throws Exception {
78+
assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue1.getName(), "foo"));
79+
RabbitConverterFuture<Object> future = this.asyncTemplate.convertSendAndReceive(this.queue1.getName(), "foo");
80+
assertEquals("FOO", future.get(10, TimeUnit.SECONDS));
81+
assertEquals("FOO", this.rabbitTemplate.convertSendAndReceive(this.queue2.getName(), "foo"));
82+
}
83+
84+
@Configuration
85+
@EnableRabbit
86+
public static class EnableRabbitConfig {
87+
88+
@Bean
89+
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
90+
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
91+
factory.setConnectionFactory(rabbitConnectionFactory());
92+
factory.setMismatchedQueuesFatal(true);
93+
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
94+
return factory;
95+
}
96+
97+
@Bean
98+
public ConnectionFactory rabbitConnectionFactory() {
99+
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
100+
connectionFactory.setHost("localhost");
101+
return connectionFactory;
102+
}
103+
104+
@Bean
105+
public RabbitTemplate rabbitTemplate() {
106+
return new RabbitTemplate(rabbitConnectionFactory());
107+
}
108+
109+
@Bean
110+
public AsyncRabbitTemplate asyncTemplate() {
111+
return new AsyncRabbitTemplate(rabbitTemplate());
112+
}
113+
114+
@Bean
115+
public RabbitAdmin rabbitAdmin() {
116+
return new RabbitAdmin(rabbitConnectionFactory());
117+
}
118+
119+
@Bean
120+
public Queue queue1() {
121+
return new AnonymousQueue();
122+
}
123+
124+
@Bean
125+
public Queue queue2() {
126+
return new AnonymousQueue();
127+
}
128+
129+
@Bean
130+
public Listener listener() {
131+
return new Listener();
132+
}
133+
134+
}
135+
136+
@Component
137+
public static class Listener {
138+
139+
private final AtomicBoolean fooFirst = new AtomicBoolean(true);
140+
141+
private final AtomicBoolean barFirst = new AtomicBoolean(true);
142+
143+
@RabbitListener(id = "foo", queues = "#{queue1.name}")
144+
public ListenableFuture<String> listen1(String foo) {
145+
SettableListenableFuture<String> future = new SettableListenableFuture<>();
146+
if (fooFirst.getAndSet(false)) {
147+
future.setException(new RuntimeException("Future.exception"));
148+
}
149+
else {
150+
future.set(foo.toUpperCase());
151+
}
152+
return future;
153+
}
154+
155+
@RabbitListener(id = "bar", queues = "#{queue2.name}")
156+
public Mono<String> listen2(String foo) {
157+
if (barFirst.getAndSet(false)) {
158+
return Mono.error(new RuntimeException("Mono.error()"));
159+
}
160+
else {
161+
return Mono.just(foo.toUpperCase());
162+
}
163+
}
164+
165+
}
166+
167+
}

src/reference/asciidoc/amqp.adoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2555,6 +2555,14 @@ These techniques are useful if you wish to create several containers with simila
25552555

25562556
IMPORTANT: Containers created this way are normal `@Bean` s and are not registered in the `RabbitListenerEndpointRegistry`.
25572557

2558+
[[async-returns]]
2559+
===== Asynchronous @RabbitListener Return Types
2560+
2561+
Starting with version 2.1, `@RabbitListener` (and `@RabbitHandler`) methods can be specified with asynchronous return types `ListenableFuture<?>` and `Mono<?>`, allowing the reply to be sent asynchronously.
2562+
2563+
IMPORTANT: The listener container factory must be configured with `AcknowledgeMode.MANUAL` so that the consumer thread will not ack the message; instead, the asynchronous completion will ack or nack (requeue) the message when the async operation completes.
2564+
If some exception occurs within the listener method that prevents creation of the async result object, you MUST catch that exception and return an appropriate return object that will cause the message to be acknowledged or requeued.
2565+
25582566
[[threading]]
25592567
===== Threading and Asynchronous Consumers
25602568

src/reference/asciidoc/whats-new.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ See <<management-rest-api>> for more information.
6666
The listener container factory can now be configured with a `RetryTemplate` and, optionally, a `RecoveryCallback` used when sending replies.
6767
See <<async-annotation-driven-enable>> for more information.
6868

69+
===== Async @RabbitListener Return
70+
71+
`@RabbitListener` methods can now return `ListenableFuture<?>` or `Mono<?>`.
72+
See <<async-return>> for more information.
73+
6974
===== Connection Factory Bean Changes
7075

7176
The `RabbitConnectionFactoryBean` now calls `enableHostnameVerification()` by default; to revert to the previous behavior, set the `enabaleHostnameVerification` property to `false`.

0 commit comments

Comments
 (0)