Skip to content

Commit 7017e21

Browse files
committed
GH-2478 Handle conversion exception in AsyncRabbitTemplate
Previously, conversion errors in AsyncRabbitTemplate lead to AmqpReplyTimeoutException
1 parent d139c98 commit 7017e21

File tree

2 files changed

+34
-2
lines changed

2 files changed

+34
-2
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -604,12 +604,16 @@ public void onMessage(Message message, Channel channel) {
604604
if (future instanceof RabbitConverterFuture) {
605605
MessageConverter messageConverter = this.template.getMessageConverter();
606606
RabbitConverterFuture<Object> rabbitFuture = (RabbitConverterFuture<Object>) future;
607-
Object converted = rabbitFuture.getReturnType() != null
607+
try {
608+
Object converted = rabbitFuture.getReturnType() != null
608609
&& messageConverter instanceof SmartMessageConverter smart
609610
? smart.fromMessage(message,
610611
rabbitFuture.getReturnType())
611612
: messageConverter.fromMessage(message);
612-
rabbitFuture.complete(converted);
613+
rabbitFuture.complete(converted);
614+
} catch (MessageConversionException e) {
615+
rabbitFuture.completeExceptionally(e);
616+
}
613617
}
614618
else {
615619
((RabbitMessageFuture) future).complete(message);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
5858
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
5959
import org.springframework.amqp.rabbit.listener.adapter.ReplyingMessageListener;
60+
import org.springframework.amqp.support.converter.MessageConversionException;
6061
import org.springframework.amqp.support.converter.SimpleMessageConverter;
6162
import org.springframework.amqp.support.postprocessor.GUnzipPostProcessor;
6263
import org.springframework.amqp.support.postprocessor.GZipPostProcessor;
@@ -394,6 +395,33 @@ public void testStopCancelled() throws Exception {
394395
assertThat(callback.result).isNull();
395396
}
396397

398+
@Test
399+
public void testConversionException() throws InterruptedException {
400+
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
401+
connectionFactory.setChannelCacheSize(1);
402+
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
403+
rabbitTemplate.setMessageConverter(new SimpleMessageConverter(){
404+
@Override
405+
public Object fromMessage(Message message) throws MessageConversionException {
406+
throw new MessageConversionException("Failed to convert message");
407+
}
408+
});
409+
AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
410+
asyncRabbitTemplate.start();
411+
412+
RabbitConverterFuture<String> replyFuture = asyncRabbitTemplate.convertSendAndReceive("conversionException");
413+
414+
CountDownLatch cdl = new CountDownLatch(1);
415+
replyFuture.whenComplete((result, ex) -> {
416+
cdl.countDown();
417+
});
418+
assertThat(cdl.await(10, TimeUnit.SECONDS)).isTrue();
419+
assertThat(replyFuture).isCompletedExceptionally();
420+
421+
asyncRabbitTemplate.stop();
422+
connectionFactory.destroy();
423+
}
424+
397425
@Test
398426
void ctorCoverage() {
399427
AsyncRabbitTemplate template = new AsyncRabbitTemplate(mock(ConnectionFactory.class), "ex", "rk");

0 commit comments

Comments
 (0)