Skip to content

Commit c65a4bc

Browse files
committed
GH-2478 Handle conversion exception in AsyncRabbitTemplate
Previously, conversion errors in AsyncRabbitTemplate lead to AmqpReplyTimeoutException
1 parent d139c98 commit c65a4bc

File tree

2 files changed

+32
-2
lines changed

2 files changed

+32
-2
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.ChannelHolder;
5050
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
5151
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
52+
import org.springframework.amqp.support.converter.MessageConversionException;
5253
import org.springframework.amqp.support.converter.MessageConverter;
5354
import org.springframework.amqp.support.converter.SmartMessageConverter;
5455
import org.springframework.amqp.utils.JavaUtils;
@@ -604,12 +605,17 @@ public void onMessage(Message message, Channel channel) {
604605
if (future instanceof RabbitConverterFuture) {
605606
MessageConverter messageConverter = this.template.getMessageConverter();
606607
RabbitConverterFuture<Object> rabbitFuture = (RabbitConverterFuture<Object>) future;
607-
Object converted = rabbitFuture.getReturnType() != null
608+
try {
609+
Object converted = rabbitFuture.getReturnType() != null
608610
&& messageConverter instanceof SmartMessageConverter smart
609611
? smart.fromMessage(message,
610612
rabbitFuture.getReturnType())
611613
: messageConverter.fromMessage(message);
612-
rabbitFuture.complete(converted);
614+
rabbitFuture.complete(converted);
615+
}
616+
catch (MessageConversionException e) {
617+
rabbitFuture.completeExceptionally(e);
618+
}
613619
}
614620
else {
615621
((RabbitMessageFuture) future).complete(message);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
5858
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
5959
import org.springframework.amqp.rabbit.listener.adapter.ReplyingMessageListener;
60+
import org.springframework.amqp.support.converter.MessageConversionException;
6061
import org.springframework.amqp.support.converter.SimpleMessageConverter;
6162
import org.springframework.amqp.support.postprocessor.GUnzipPostProcessor;
6263
import org.springframework.amqp.support.postprocessor.GZipPostProcessor;
@@ -394,6 +395,29 @@ public void testStopCancelled() throws Exception {
394395
assertThat(callback.result).isNull();
395396
}
396397

398+
@Test
399+
@DirtiesContext
400+
public void testConversionException() throws InterruptedException {
401+
this.asyncTemplate.getRabbitTemplate().setMessageConverter(new SimpleMessageConverter() {
402+
@Override
403+
public Object fromMessage(Message message) throws MessageConversionException {
404+
throw new MessageConversionException("Failed to convert message");
405+
}
406+
});
407+
408+
RabbitConverterFuture<String> replyFuture = this.asyncTemplate.convertSendAndReceive("conversionException");
409+
410+
final CountDownLatch cdl = new CountDownLatch(1);
411+
final AtomicReference<Object> resultRef = new AtomicReference<>();
412+
replyFuture.whenComplete((result, ex) -> {
413+
resultRef.set(result);
414+
cdl.countDown();
415+
});
416+
assertThat(cdl.await(10, TimeUnit.SECONDS)).isTrue();
417+
assertThat(replyFuture).isCompletedExceptionally();
418+
assertThat(resultRef.get()).isNull();
419+
}
420+
397421
@Test
398422
void ctorCoverage() {
399423
AsyncRabbitTemplate template = new AsyncRabbitTemplate(mock(ConnectionFactory.class), "ex", "rk");

0 commit comments

Comments
 (0)