Skip to content

Commit 9082c12

Browse files
artembilangaryrussell
authored andcommitted
Fix MessagingGatewaySupport for reactive error (#3319)
* Fix MessagingGatewaySupport for reactive error The `onErrorResume()` was in a wrong place for the `doSendAndReceiveMessageReactive()`: we have to catch all the exceptions from the top level `Mono`, not only a reply one as it was before. Ensure in HTTP and WebFlux test that behavior is fixed **Cherry-pick to `5.3.x` & `5.2.x`** * * Remove unused imports Co-authored-by: Artem Bilan <[email protected]>
1 parent 9cd8a4a commit 9082c12

File tree

3 files changed

+99
-36
lines changed

3 files changed

+99
-36
lines changed

spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -613,39 +613,40 @@ protected Mono<Message<?>> sendAndReceiveMessageReactive(Object object) {
613613

614614
private Mono<Message<?>> doSendAndReceiveMessageReactive(MessageChannel requestChannel, Object object,
615615
boolean error) {
616+
final Message<?> requestMessage;
617+
try {
618+
Message<?> message =
619+
object instanceof Message<?>
620+
? (Message<?>) object
621+
: this.requestMapper.toMessage(object);
616622

617-
return Mono.defer(() -> {
618-
Message<?> message;
619-
try {
620-
message = object instanceof Message<?>
621-
? (Message<?>) object
622-
: this.requestMapper.toMessage(object);
623-
624-
message = this.historyWritingPostProcessor.postProcessMessage(message);
623+
message = this.historyWritingPostProcessor.postProcessMessage(message);
624+
requestMessage = message;
625+
}
626+
catch (Exception e) {
627+
throw new MessageMappingException("Cannot map to message: " + object, e);
628+
}
625629

626-
}
627-
catch (Exception e) {
628-
throw new MessageMappingException("Cannot map to message: " + object, e);
629-
}
630+
return Mono.defer(() -> {
630631

631-
Object originalReplyChannelHeader = message.getHeaders().getReplyChannel();
632-
Object originalErrorChannelHeader = message.getHeaders().getErrorChannel();
632+
Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel();
633+
Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel();
633634

634635
MonoReplyChannel replyChan = new MonoReplyChannel();
635636

636-
Message<?> requestMessage = MutableMessageBuilder.fromMessage(message)
637+
Message<?> messageToSend = MutableMessageBuilder.fromMessage(requestMessage)
637638
.setReplyChannel(replyChan)
638639
.setHeader(this.messagingTemplate.getSendTimeoutHeader(), null)
639640
.setHeader(this.messagingTemplate.getReceiveTimeoutHeader(), null)
640641
.setErrorChannel(replyChan)
641642
.build();
642643

643-
sendMessageForReactiveFlow(requestChannel, requestMessage);
644+
sendMessageForReactiveFlow(requestChannel, messageToSend);
644645

645646
return buildReplyMono(requestMessage, replyChan.replyMono, error, originalReplyChannelHeader,
646-
originalErrorChannelHeader)
647-
.onErrorResume(t -> error ? Mono.error(t) : handleSendError(requestMessage, t));
648-
});
647+
originalErrorChannelHeader);
648+
})
649+
.onErrorResume(t -> error ? Mono.error(t) : handleSendError(requestMessage, t));
649650
}
650651

651652
private void sendMessageForReactiveFlow(MessageChannel requestChannel, Message<?> requestMessage) {

spring-integration-http/src/test/java/org/springframework/integration/http/dsl/HttpDslTests.java

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,8 @@
3131
import java.util.List;
3232
import java.util.Map;
3333

34-
import org.junit.Before;
35-
import org.junit.Test;
36-
import org.junit.runner.RunWith;
34+
import org.junit.jupiter.api.BeforeEach;
35+
import org.junit.jupiter.api.Test;
3736

3837
import org.springframework.beans.factory.annotation.Autowired;
3938
import org.springframework.beans.factory.annotation.Qualifier;
@@ -45,10 +44,12 @@
4544
import org.springframework.http.client.ClientHttpRequestFactory;
4645
import org.springframework.http.client.ClientHttpResponse;
4746
import org.springframework.integration.channel.DirectChannel;
47+
import org.springframework.integration.channel.FixedSubscriberChannel;
4848
import org.springframework.integration.config.EnableIntegration;
4949
import org.springframework.integration.dsl.IntegrationFlow;
5050
import org.springframework.integration.dsl.IntegrationFlows;
5151
import org.springframework.integration.dsl.context.IntegrationFlowContext;
52+
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
5253
import org.springframework.integration.http.multipart.UploadedMultipartFile;
5354
import org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler;
5455
import org.springframework.integration.security.channel.ChannelSecurityInterceptor;
@@ -68,8 +69,7 @@
6869
import org.springframework.security.crypto.factory.PasswordEncoderFactories;
6970
import org.springframework.security.provisioning.InMemoryUserDetailsManager;
7071
import org.springframework.test.annotation.DirtiesContext;
71-
import org.springframework.test.context.junit4.SpringRunner;
72-
import org.springframework.test.context.web.WebAppConfiguration;
72+
import org.springframework.test.context.junit.jupiter.web.SpringJUnitWebConfig;
7373
import org.springframework.test.web.client.MockMvcClientHttpRequestFactory;
7474
import org.springframework.test.web.servlet.MockMvc;
7575
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
@@ -91,8 +91,7 @@
9191
*
9292
* @since 5.0
9393
*/
94-
@RunWith(SpringRunner.class)
95-
@WebAppConfiguration
94+
@SpringJUnitWebConfig
9695
@DirtiesContext
9796
public class HttpDslTests {
9897

@@ -107,7 +106,7 @@ public class HttpDslTests {
107106

108107
private MockMvc mockMvc;
109108

110-
@Before
109+
@BeforeEach
111110
public void setup() {
112111
this.mockMvc =
113112
MockMvcBuilders.webAppContextSetup(this.wac)
@@ -236,6 +235,37 @@ public void testValidation() throws Exception {
236235
}
237236

238237

238+
@Test
239+
public void testErrorChannelFlow() throws Exception {
240+
IntegrationFlow flow =
241+
IntegrationFlows.from(
242+
Http.inboundGateway("/errorFlow")
243+
.errorChannel(new FixedSubscriberChannel(
244+
new AbstractReplyProducingMessageHandler() {
245+
246+
@Override
247+
protected Object handleRequestMessage(Message<?> requestMessage) {
248+
return "Error Response";
249+
}
250+
251+
})))
252+
.transform((payload) -> {
253+
throw new RuntimeException("Error!");
254+
})
255+
.get();
256+
257+
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
258+
this.integrationFlowContext.registration(flow).register();
259+
260+
this.mockMvc.perform(
261+
get("/errorFlow")
262+
.with(httpBasic("user", "user")))
263+
.andExpect(status().isOk())
264+
.andExpect(content().string("Error Response"));
265+
266+
flowRegistration.destroy();
267+
}
268+
239269
@Configuration
240270
@EnableWebSecurity
241271
@EnableIntegration

spring-integration-webflux/src/test/java/org/springframework/integration/webflux/dsl/WebFluxDslTests.java

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,8 @@
2828
import javax.annotation.Resource;
2929

3030
import org.hamcrest.Matchers;
31-
import org.junit.Before;
32-
import org.junit.Test;
33-
import org.junit.runner.RunWith;
31+
import org.junit.jupiter.api.BeforeEach;
32+
import org.junit.jupiter.api.Test;
3433
import org.reactivestreams.Publisher;
3534

3635
import org.springframework.beans.DirectFieldAccessor;
@@ -44,12 +43,14 @@
4443
import org.springframework.http.HttpStatus;
4544
import org.springframework.http.MediaType;
4645
import org.springframework.http.client.reactive.ClientHttpConnector;
46+
import org.springframework.integration.channel.FixedSubscriberChannel;
4747
import org.springframework.integration.channel.QueueChannel;
4848
import org.springframework.integration.config.EnableIntegration;
4949
import org.springframework.integration.dsl.IntegrationFlow;
5050
import org.springframework.integration.dsl.IntegrationFlows;
5151
import org.springframework.integration.dsl.MessageChannels;
5252
import org.springframework.integration.dsl.context.IntegrationFlowContext;
53+
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
5354
import org.springframework.integration.http.HttpHeaders;
5455
import org.springframework.integration.http.dsl.Http;
5556
import org.springframework.integration.support.MessageBuilder;
@@ -76,8 +77,7 @@
7677
import org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers;
7778
import org.springframework.security.web.server.SecurityWebFilterChain;
7879
import org.springframework.test.annotation.DirtiesContext;
79-
import org.springframework.test.context.junit4.SpringRunner;
80-
import org.springframework.test.context.web.WebAppConfiguration;
80+
import org.springframework.test.context.junit.jupiter.web.SpringJUnitWebConfig;
8181
import org.springframework.test.web.reactive.server.HttpHandlerConnector;
8282
import org.springframework.test.web.reactive.server.WebTestClient;
8383
import org.springframework.test.web.servlet.MockMvc;
@@ -104,8 +104,7 @@
104104
*
105105
* @since 5.0
106106
*/
107-
@RunWith(SpringRunner.class)
108-
@WebAppConfiguration
107+
@SpringJUnitWebConfig
109108
@DirtiesContext
110109
public class WebFluxDslTests {
111110

@@ -130,7 +129,7 @@ public class WebFluxDslTests {
130129

131130
private WebTestClient webTestClient;
132131

133-
@Before
132+
@BeforeEach
134133
public void setup() {
135134
this.mockMvc =
136135
MockMvcBuilders.webAppContextSetup(this.wac)
@@ -320,6 +319,39 @@ public void testValidation() {
320319
flowRegistration.destroy();
321320
}
322321

322+
@Test
323+
public void testErrorChannelFlow() {
324+
IntegrationFlow flow =
325+
IntegrationFlows.from(
326+
WebFlux.inboundGateway("/errorFlow")
327+
.errorChannel(new FixedSubscriberChannel(
328+
new AbstractReplyProducingMessageHandler() {
329+
330+
@Override
331+
protected Object handleRequestMessage(Message<?> requestMessage) {
332+
return "Error Response";
333+
}
334+
335+
})))
336+
.channel(MessageChannels.flux())
337+
.transform((payload) -> {
338+
throw new RuntimeException("Error!");
339+
})
340+
.get();
341+
342+
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
343+
this.integrationFlowContext.registration(flow).register();
344+
345+
this.webTestClient.get().uri("/errorFlow")
346+
.headers(headers -> headers.setBasicAuth("guest", "guest"))
347+
.exchange()
348+
.expectStatus()
349+
.isOk()
350+
.expectBody(String.class)
351+
.isEqualTo("Error Response");
352+
353+
flowRegistration.destroy();
354+
}
323355

324356
@Configuration
325357
@EnableWebFlux
@@ -450,7 +482,7 @@ public IntegrationFlow sseFlow() {
450482
.from(WebFlux.inboundGateway("/sse")
451483
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE))
452484
.mappedResponseHeaders("*"))
453-
.enrichHeaders(Collections.singletonMap("aHeader", new String[] { "foo", "bar", "baz" }))
485+
.enrichHeaders(Collections.singletonMap("aHeader", new String[]{"foo", "bar", "baz"}))
454486
.handle((p, h) -> Flux.fromArray(h.get("aHeader", String[].class)))
455487
.get();
456488
}

0 commit comments

Comments
 (0)