Skip to content

Commit c70c98b

Browse files
djgraff209artembilan
authored andcommitted
GH-3610: Fix WebFluxMH for error handling
Fixes #3610 This fix changes the WebFluxRequestExecutingMessageHandler The change specifically changes the contruction of the WebClientResponseException to use the `create` factory method. * Added changes and unit test to cover updated code Updated code to use simplified exception construction that is more tolerant of larger payloads. Updated unit tests to check for specific exception types. * Corrected checkstyle errors Corrected checkstyle error in imports. Corrected whitespace between casts. Corrected trailing whitespace. * Added @author **Cherry-pick to `5.4.x` & `5.3.x`** # Conflicts: # spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java # spring-integration-webflux/src/test/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandlerTests.java
1 parent 125ac45 commit c70c98b

File tree

3 files changed

+73
-43
lines changed

3 files changed

+73
-43
lines changed

spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java

Lines changed: 10 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2020 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,15 +17,13 @@
1717
package org.springframework.integration.webflux.outbound;
1818

1919
import java.net.URI;
20-
import java.nio.charset.StandardCharsets;
2120
import java.util.Map;
2221

2322
import org.reactivestreams.Publisher;
2423

2524
import org.springframework.core.ParameterizedTypeReference;
2625
import org.springframework.core.io.Resource;
2726
import org.springframework.core.io.buffer.DataBuffer;
28-
import org.springframework.core.io.buffer.DataBufferUtils;
2927
import org.springframework.expression.Expression;
3028
import org.springframework.expression.common.LiteralExpression;
3129
import org.springframework.http.HttpEntity;
@@ -41,15 +39,13 @@
4139
import org.springframework.lang.Nullable;
4240
import org.springframework.messaging.Message;
4341
import org.springframework.util.Assert;
44-
import org.springframework.util.MimeType;
4542
import org.springframework.util.MultiValueMap;
4643
import org.springframework.web.reactive.function.BodyExtractor;
4744
import org.springframework.web.reactive.function.BodyExtractors;
4845
import org.springframework.web.reactive.function.BodyInserter;
4946
import org.springframework.web.reactive.function.BodyInserters;
5047
import org.springframework.web.reactive.function.client.ClientResponse;
5148
import org.springframework.web.reactive.function.client.WebClient;
52-
import org.springframework.web.reactive.function.client.WebClientResponseException;
5349
import org.springframework.web.util.DefaultUriBuilderFactory;
5450

5551
import reactor.core.publisher.Flux;
@@ -62,6 +58,7 @@
6258
* @author Shiliang Li
6359
* @author Artem Bilan
6460
* @author Gary Russell
61+
* @author David Graff
6562
*
6663
* @since 5.0
6764
*
@@ -298,38 +295,14 @@ private WebClient.RequestBodySpec createRequestBodySpec(Object uri, HttpMethod h
298295
}
299296

300297
private Mono<ClientResponse> exchangeForResponseMono(WebClient.RequestBodySpec requestSpec) {
301-
return requestSpec.exchange()
302-
.flatMap(response -> {
303-
HttpStatus httpStatus = response.statusCode();
304-
if (httpStatus.isError()) {
305-
return response.body(BodyExtractors.toDataBuffers())
306-
.reduce(DataBuffer::write)
307-
.map(dataBuffer -> {
308-
byte[] bytes = new byte[dataBuffer.readableByteCount()];
309-
dataBuffer.read(bytes);
310-
DataBufferUtils.release(dataBuffer);
311-
return bytes;
312-
})
313-
.defaultIfEmpty(new byte[0])
314-
.map(bodyBytes -> {
315-
throw new WebClientResponseException(
316-
"ClientResponse has erroneous status code: "
317-
+ httpStatus.value() + " "
318-
+ httpStatus.getReasonPhrase(),
319-
httpStatus.value(),
320-
httpStatus.getReasonPhrase(),
321-
response.headers().asHttpHeaders(),
322-
bodyBytes,
323-
response.headers().contentType()
324-
.map(MimeType::getCharset)
325-
.orElse(StandardCharsets.ISO_8859_1));
326-
}
327-
);
328-
}
329-
else {
330-
return Mono.just(response);
331-
}
332-
});
298+
return requestSpec.retrieve()
299+
.onStatus(HttpStatus::isError, ClientResponse::createException)
300+
.toEntityList(DataBuffer.class)
301+
.map((entity) ->
302+
ClientResponse.create(entity.getStatusCode())
303+
.headers((headers) -> headers.addAll(entity.getHeaders()))
304+
.body(Flux.fromIterable(entity.getBody())) // NOSONAR - not null according toEntityList()
305+
.build());
333306
}
334307

335308
@Nullable

spring-integration-webflux/src/test/java/org/springframework/integration/webflux/dsl/WebFluxDslTests.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,8 +25,6 @@
2525
import java.time.Duration;
2626
import java.util.Collections;
2727

28-
import javax.annotation.Resource;
29-
3028
import org.hamcrest.Matchers;
3129
import org.junit.jupiter.api.BeforeEach;
3230
import org.junit.jupiter.api.Test;
@@ -118,7 +116,8 @@ public class WebFluxDslTests {
118116
@Qualifier("webFluxWithReplyPayloadToFlux.handler")
119117
private WebFluxRequestExecutingMessageHandler webFluxWithReplyPayloadToFlux;
120118

121-
@Resource(name = "httpReactiveProxyFlow.webflux:outbound-gateway#0")
119+
@Autowired
120+
@Qualifier("httpReactiveProxyFlow.webflux:outbound-gateway#0")
122121
private WebFluxRequestExecutingMessageHandler httpReactiveProxyFlow;
123122

124123
@Autowired

spring-integration-webflux/src/test/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandlerTests.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.nio.charset.StandardCharsets;
2122
import java.time.Duration;
2223

2324
import org.junit.jupiter.api.Test;
@@ -47,6 +48,7 @@
4748
/**
4849
* @author Shiliang Li
4950
* @author Artem Bilan
51+
* @author David Graff
5052
*
5153
* @since 5.0
5254
*/
@@ -108,6 +110,8 @@ void testReactiveErrorOneWay() {
108110
assertThat(errorMessage).isNotNull();
109111
assertThat(errorMessage).isInstanceOf(ErrorMessage.class);
110112
Throwable throwable = (Throwable) errorMessage.getPayload();
113+
assertThat(throwable).isInstanceOf(MessageHandlingException.class);
114+
assertThat(throwable.getCause()).isInstanceOf(WebClientResponseException.Unauthorized.class);
111115
assertThat(throwable.getMessage()).contains("401 Unauthorized");
112116
}
113117

@@ -173,7 +177,8 @@ void testServiceUnavailableWithoutBody() {
173177
assertThat(payload).isInstanceOf(MessageHandlingException.class);
174178

175179
Exception exception = (Exception) payload;
176-
assertThat(exception.getCause()).isInstanceOf(WebClientResponseException.class);
180+
assertThat(exception).isInstanceOf(MessageHandlingException.class);
181+
assertThat(exception.getCause()).isInstanceOf(WebClientResponseException.ServiceUnavailable.class);
177182
assertThat(exception.getMessage()).contains("503 Service Unavailable");
178183

179184
Message<?> replyMessage = errorChannel.receive(10);
@@ -273,4 +278,57 @@ void testClientHttpResponseAsReply() {
273278
.verifyComplete();
274279
}
275280

281+
282+
@Test
283+
void testClientHttpResponseErrorAsReply() {
284+
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
285+
response.setStatusCode(HttpStatus.NOT_FOUND);
286+
response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
287+
288+
DataBufferFactory bufferFactory = response.bufferFactory();
289+
290+
Flux<DataBuffer> data =
291+
Flux.just(
292+
bufferFactory.wrap("{".getBytes(StandardCharsets.UTF_8)),
293+
bufferFactory.wrap(" \"error\": \"Not Found\",".getBytes(StandardCharsets.UTF_8)),
294+
bufferFactory.wrap(" \"message\": \"404 NOT_FOUND\",".getBytes(StandardCharsets.UTF_8)),
295+
bufferFactory.wrap(" \"path\": \"/spring-integration\",".getBytes(StandardCharsets.UTF_8)),
296+
bufferFactory.wrap(" \"status\": 404,".getBytes(StandardCharsets.UTF_8)),
297+
bufferFactory.wrap(" \"timestamp\": \"1970-01-01T00:00:00.000+00:00\",".getBytes(StandardCharsets.UTF_8)),
298+
bufferFactory.wrap(" \"trace\": \"some really\nlong\ntrace\",".getBytes(StandardCharsets.UTF_8)),
299+
bufferFactory.wrap("}".getBytes(StandardCharsets.UTF_8))
300+
);
301+
302+
return response.writeWith(data)
303+
.then(Mono.defer(response::setComplete));
304+
});
305+
306+
WebClient webClient = WebClient.builder()
307+
.clientConnector(httpConnector)
308+
.build();
309+
310+
String destinationUri = "https://www.springsource.org/spring-integration";
311+
WebFluxRequestExecutingMessageHandler reactiveHandler =
312+
new WebFluxRequestExecutingMessageHandler(destinationUri, webClient);
313+
314+
QueueChannel replyChannel = new QueueChannel();
315+
QueueChannel errorChannel = new QueueChannel();
316+
reactiveHandler.setOutputChannel(replyChannel);
317+
reactiveHandler.setBodyExtractor(new ClientHttpResponseBodyExtractor());
318+
319+
final Message<?> message =
320+
MessageBuilder.withPayload("hello, world")
321+
.setErrorChannel(errorChannel)
322+
.build();
323+
reactiveHandler.handleMessage(message);
324+
325+
Message<?> errorMessage = errorChannel.receive(10_000);
326+
327+
assertThat(errorMessage).isNotNull();
328+
assertThat(errorMessage).isInstanceOf(ErrorMessage.class);
329+
final Throwable throwable = (Throwable) errorMessage.getPayload();
330+
assertThat(throwable).isInstanceOf(MessageHandlingException.class);
331+
assertThat(throwable.getCause()).isInstanceOf(WebClientResponseException.NotFound.class);
332+
assertThat(throwable.getMessage()).contains("404 Not Found");
333+
}
276334
}

0 commit comments

Comments
 (0)