Skip to content

Commit 7403e36

Browse files
garyrussellartembilan
authored andcommitted
INT-4541: Fix Reactive MessagingGateway Errors
JIRA: https://jira.spring.io/browse/INT-4541 Add test case to reproduce. The gateway correctly sets up the `errorChannel` header so that a downstream `MPEH` will send exceptions back to the gateway, but the `map()` function did not check for an error message. Check for an error message and throw the payload so that the `onErrorResume` will route to the error channel. # Conflicts: # spring-integration-webflux/src/test/java/org/springframework/integration/webflux/dsl/WebFluxDslTests.java
1 parent 7faeee8 commit 7403e36

File tree

2 files changed

+47
-6
lines changed

2 files changed

+47
-6
lines changed

spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -623,12 +623,23 @@ private Mono<Message<?>> doSendAndReceiveMessageReactive(MessageChannel requestC
623623
this.messageCount.incrementAndGet();
624624
}
625625
})
626-
.<Message<?>>map(replyMessage ->
627-
MessageBuilder.fromMessage(replyMessage)
628-
.setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader)
629-
.setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader)
630-
.build())
631-
626+
.<Message<?>>map(replyMessage -> {
627+
if (!error && replyMessage instanceof ErrorMessage) {
628+
ErrorMessage em = (ErrorMessage) replyMessage;
629+
if (em.getPayload() instanceof MessagingException) {
630+
throw (MessagingException) em.getPayload();
631+
}
632+
else {
633+
throw new MessagingException(requestMessage, em.getPayload());
634+
}
635+
}
636+
else {
637+
return MessageBuilder.fromMessage(replyMessage)
638+
.setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader)
639+
.setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader)
640+
.build();
641+
}
642+
})
632643
.onErrorResume(t -> error ? Mono.error(t) : handleSendError(requestMessage, t));
633644
});
634645
}

spring-integration-webflux/src/test/java/org/springframework/integration/webflux/dsl/WebFluxDslTests.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.springframework.integration.config.EnableIntegration;
5252
import org.springframework.integration.dsl.IntegrationFlow;
5353
import org.springframework.integration.dsl.IntegrationFlows;
54+
import org.springframework.integration.dsl.channel.MessageChannels;
5455
import org.springframework.integration.http.HttpHeaders;
5556
import org.springframework.integration.http.dsl.Http;
5657
import org.springframework.integration.support.MessageBuilder;
@@ -231,6 +232,15 @@ public void testHttpReactivePost() {
231232

232233
}
233234

235+
@Test
236+
public void testHttpReactivePostWithError() {
237+
this.webTestClient.post().uri("/reactivePostErrors")
238+
.attributes(basicAuthenticationCredentials("guest", "guest"))
239+
.body(Mono.just("foo\nbar\nbaz"), String.class)
240+
.exchange()
241+
.expectStatus().isEqualTo(HttpStatus.BAD_GATEWAY);
242+
}
243+
234244
@Test
235245
public void testSse() {
236246
Flux<String> responseBody =
@@ -332,6 +342,26 @@ public IntegrationFlow httpReactiveInboundChannelAdapterFlow() {
332342
.get();
333343
}
334344

345+
@Bean
346+
public IntegrationFlow httpReactiveInboundGatewayFlowWithErrors() {
347+
return IntegrationFlows
348+
.from(WebFlux.inboundGateway("/reactivePostErrors")
349+
.requestMapping(m -> m.methods(HttpMethod.POST))
350+
.requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
351+
.errorChannel(errorFlow().getInputChannel()))
352+
.channel(MessageChannels.flux())
353+
.handle((p, h) -> {
354+
throw new RuntimeException("errorTest");
355+
})
356+
.get();
357+
}
358+
359+
@Bean
360+
public IntegrationFlow errorFlow() {
361+
return f -> f
362+
.enrichHeaders(h -> h.header(HttpHeaders.STATUS_CODE, HttpStatus.BAD_GATEWAY));
363+
}
364+
335365
@Bean
336366
public IntegrationFlow sseFlow() {
337367
return IntegrationFlows

0 commit comments

Comments
 (0)