Skip to content

Commit a1f57e4

Browse files
artembilangaryrussell
authored andcommitted
Fix reactive error handling for request-reply
Related to https://stackoverflow.com/questions/75109345/spring-integration-webflux-inboundgateway-replychannel-for-error-response The `MessagingGatewaySupport.doSendAndReceiveMessageReactive()` uses `MutableMessageBuilder` to build a new message for next `send` operation. Even if this is an error flow, the provided `ErrorMessage` becomes a plain `MutableMessage`. This may break some downstream logics, like `TracingChannelInterceptor` from Spring Cloud Sleuth, which checks for the `Message` class to rebuild or reuse a message content. Therefore, an error handling flow is not able to extract error info because it is just lost. * Fix `MessagingGatewaySupport.doSendAndReceiveMessageReactive()` to check for message type before choosing an `AbstractIntegrationMessageBuilder` impl for building a new message. The regular `MessageBuilder` just builds a new `ErrorMessage` for an exception payload. * Add `filter()` into a `WebFluxDslTests` error handling flow to be sure that message for error sub-flow is really an `ErrorMessage`
1 parent 2c72285 commit a1f57e4

File tree

2 files changed

+19
-13
lines changed

2 files changed

+19
-13
lines changed

spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -43,10 +43,12 @@
4343
import org.springframework.integration.mapping.InboundMessageMapper;
4444
import org.springframework.integration.mapping.MessageMappingException;
4545
import org.springframework.integration.mapping.OutboundMessageMapper;
46+
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
4647
import org.springframework.integration.support.DefaultErrorMessageStrategy;
4748
import org.springframework.integration.support.DefaultMessageBuilderFactory;
4849
import org.springframework.integration.support.ErrorMessageStrategy;
4950
import org.springframework.integration.support.ErrorMessageUtils;
51+
import org.springframework.integration.support.MessageBuilder;
5052
import org.springframework.integration.support.MessageBuilderFactory;
5153
import org.springframework.integration.support.MutableMessageBuilder;
5254
import org.springframework.integration.support.converter.SimpleMessageConverter;
@@ -71,7 +73,6 @@
7173
import org.springframework.messaging.SubscribableChannel;
7274
import org.springframework.messaging.core.MessagePostProcessor;
7375
import org.springframework.messaging.support.ErrorMessage;
74-
import org.springframework.messaging.support.MessageBuilder;
7576
import org.springframework.util.Assert;
7677

7778
/**
@@ -263,7 +264,7 @@ public void setReplyTimeout(long replyTimeout) {
263264

264265
/**
265266
* Provide an {@link InboundMessageMapper} for creating request Messages
266-
* from any object passed in a send or sendAndReceive operation.
267+
* from any object passed in a {@code send} or {@code sendAndReceive} operation.
267268
* @param requestMapper The request mapper.
268269
*/
269270
@SuppressWarnings("unchecked")
@@ -725,7 +726,12 @@ private Mono<Message<?>> doSendAndReceiveMessageReactive(MessageChannel requestC
725726

726727
MonoReplyChannel replyChan = new MonoReplyChannel();
727728

728-
Message<?> messageToSend = MutableMessageBuilder.fromMessage(requestMessage)
729+
AbstractIntegrationMessageBuilder<?> messageBuilder =
730+
requestMessage instanceof ErrorMessage
731+
? MessageBuilder.fromMessage(requestMessage)
732+
: MutableMessageBuilder.fromMessage(requestMessage);
733+
734+
Message<?> messageToSend = messageBuilder
729735
.setReplyChannel(replyChan)
730736
.setHeader(this.messagingTemplate.getSendTimeoutHeader(), null)
731737
.setHeader(this.messagingTemplate.getReceiveTimeoutHeader(), null)
@@ -734,16 +740,15 @@ private Mono<Message<?>> doSendAndReceiveMessageReactive(MessageChannel requestC
734740

735741
sendMessageForReactiveFlow(requestChannel, messageToSend);
736742

737-
return buildReplyMono(requestMessage, replyChan.replyMono.asMono(), error, originalReplyChannelHeader,
738-
originalErrorChannelHeader);
743+
return buildReplyMono(requestMessage, replyChan.replyMono.asMono(), error,
744+
originalReplyChannelHeader, originalErrorChannelHeader);
739745
})
740746
.onErrorResume(t -> error ? Mono.error(t) : handleSendError(requestMessage, t));
741747
}
742748

743749
private void sendMessageForReactiveFlow(MessageChannel requestChannel, Message<?> requestMessage) {
744-
if (requestChannel instanceof ReactiveStreamsSubscribableChannel) {
745-
((ReactiveStreamsSubscribableChannel) requestChannel)
746-
.subscribeTo(Mono.just(requestMessage));
750+
if (requestChannel instanceof ReactiveStreamsSubscribableChannel reactiveChannel) {
751+
reactiveChannel.subscribeTo(Mono.just(requestMessage));
747752
}
748753
else {
749754
long sendTimeout = sendTimeout(requestMessage);
@@ -771,9 +776,8 @@ private Mono<Message<?>> buildReplyMono(Message<?> requestMessage, Mono<Message<
771776
captor.start().stop(sendTimer());
772777
}
773778
})
774-
.<Message<?>>map(replyMessage -> {
775-
if (!error && replyMessage instanceof ErrorMessage) {
776-
ErrorMessage em = (ErrorMessage) replyMessage;
779+
.map(replyMessage -> {
780+
if (!error && replyMessage instanceof ErrorMessage em) {
777781
if (em.getPayload() instanceof MessagingException) {
778782
throw (MessagingException) em.getPayload();
779783
}

spring-integration-webflux/src/test/java/org/springframework/integration/webflux/dsl/WebFluxDslTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -56,6 +56,7 @@
5656
import org.springframework.messaging.Message;
5757
import org.springframework.messaging.MessageChannel;
5858
import org.springframework.messaging.PollableChannel;
59+
import org.springframework.messaging.support.ErrorMessage;
5960
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
6061
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
6162
import org.springframework.security.config.web.server.ServerHttpSecurity;
@@ -462,6 +463,7 @@ public IntegrationFlow httpReactiveInboundGatewayFlowWithErrors() {
462463
@Bean
463464
public IntegrationFlow errorFlow() {
464465
return f -> f
466+
.filter(Message.class, ErrorMessage.class::isInstance)
465467
.enrichHeaders(h -> h.header(HttpHeaders.STATUS_CODE, HttpStatus.BAD_GATEWAY));
466468
}
467469

0 commit comments

Comments
 (0)