Skip to content

Commit 83a488c

Browse files
artembilangaryrussell
authored andcommitted
Fix WebFluxMH for proper response handling
The `WebFluxRequestExecutingMessageHandler` does direct `ClientResponse.create(entity.getStatusCode())` which comes with a `ExchangeStrategies.withDefaults()`. Even if end-user configures a `WebClient` properly, the response is created with default strategies. * Rework `WebFluxRequestExecutingMessageHandler` internal logic to call `ResponseSpec.toEntityFlux(BodyExtractor)` instead of manual `ClientResponse.create()` * Add unit test to the `WebFluxRequestExecutingMessageHandlerTests` to ensure that configured `maxInMemorySize` on the `WebClient` strategies has an effect when response body is bigger than expected size **Cherry-pick to `5.4.x`**
1 parent 81a50ff commit 83a488c

File tree

3 files changed

+140
-89
lines changed

3 files changed

+140
-89
lines changed

spring-integration-webflux/src/main/java/org/springframework/integration/webflux/dsl/WebFluxMessageHandlerSpec.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2020 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -88,7 +88,7 @@ public WebFluxMessageHandlerSpec replyPayloadToFlux(boolean replyPayloadToFlux)
8888
* @since 5.0.1
8989
* @see WebFluxRequestExecutingMessageHandler#setBodyExtractor(BodyExtractor)
9090
*/
91-
public WebFluxMessageHandlerSpec bodyExtractor(BodyExtractor<?, ClientHttpResponse> bodyExtractor) {
91+
public WebFluxMessageHandlerSpec bodyExtractor(BodyExtractor<?, ? super ClientHttpResponse> bodyExtractor) {
9292
this.target.setBodyExtractor(bodyExtractor);
9393
return this;
9494
}

spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java

Lines changed: 75 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
import org.springframework.core.ParameterizedTypeReference;
2525
import org.springframework.core.io.Resource;
26-
import org.springframework.core.io.buffer.DataBuffer;
2726
import org.springframework.expression.Expression;
2827
import org.springframework.expression.common.LiteralExpression;
2928
import org.springframework.http.HttpEntity;
@@ -73,7 +72,7 @@ public class WebFluxRequestExecutingMessageHandler extends AbstractHttpRequestEx
7372

7473
private boolean replyPayloadToFlux;
7574

76-
private BodyExtractor<?, ClientHttpResponse> bodyExtractor;
75+
private BodyExtractor<?, ? super ClientHttpResponse> bodyExtractor;
7776

7877
private Expression publisherElementTypeExpression;
7978

@@ -166,7 +165,7 @@ public void setReplyPayloadToFlux(boolean replyPayloadToFlux) {
166165
* @see #setExpectedResponseType(Class)
167166
* @see #setExpectedResponseTypeExpression(Expression)
168167
*/
169-
public void setBodyExtractor(BodyExtractor<?, ClientHttpResponse> bodyExtractor) {
168+
public void setBodyExtractor(BodyExtractor<?, ? super ClientHttpResponse> bodyExtractor) {
170169
this.bodyExtractor = bodyExtractor;
171170
}
172171

@@ -208,71 +207,16 @@ protected Object exchange(Object uri, HttpMethod httpMethod, HttpEntity<?> httpR
208207
WebClient.RequestBodySpec requestSpec =
209208
createRequestBodySpec(uri, httpMethod, httpRequest, requestMessage, uriVariables);
210209

211-
Mono<ClientResponse> responseMono = exchangeForResponseMono(requestSpec);
210+
Mono<ResponseEntity<Flux<Object>>> responseMono = exchangeForResponseMono(requestSpec, expectedResponseType);
212211

213212
if (isExpectReply()) {
214-
return createReplyFromResponse(expectedResponseType, responseMono);
213+
return createReplyFromResponse(responseMono);
215214
}
216215
else {
217216
return responseMono.then();
218217
}
219218
}
220219

221-
private Object createReplyFromResponse(Object expectedResponseType, Mono<ClientResponse> responseMono) {
222-
return responseMono
223-
.flatMap(response -> {
224-
ResponseEntity.BodyBuilder httpEntityBuilder =
225-
ResponseEntity.status(response.statusCode())
226-
.headers(response.headers().asHttpHeaders());
227-
228-
Mono<?> bodyMono;
229-
230-
if (expectedResponseType != null) {
231-
if (this.replyPayloadToFlux) {
232-
BodyExtractor<? extends Flux<?>, ReactiveHttpInputMessage> extractor;
233-
if (expectedResponseType instanceof ParameterizedTypeReference<?>) {
234-
extractor = BodyExtractors.toFlux(
235-
(ParameterizedTypeReference<?>) expectedResponseType);
236-
}
237-
else {
238-
extractor = BodyExtractors.toFlux((Class<?>) expectedResponseType);
239-
}
240-
Flux<?> flux = response.body(extractor);
241-
bodyMono = Mono.just(flux);
242-
}
243-
else {
244-
BodyExtractor<? extends Mono<?>, ReactiveHttpInputMessage> extractor;
245-
if (expectedResponseType instanceof ParameterizedTypeReference<?>) {
246-
extractor = BodyExtractors.toMono(
247-
(ParameterizedTypeReference<?>) expectedResponseType);
248-
}
249-
else {
250-
extractor = BodyExtractors.toMono((Class<?>) expectedResponseType);
251-
}
252-
bodyMono = response.body(extractor);
253-
}
254-
}
255-
else if (this.bodyExtractor != null) {
256-
Object body = response.body(this.bodyExtractor);
257-
if (body instanceof Mono) {
258-
bodyMono = (Mono<?>) body;
259-
}
260-
else {
261-
bodyMono = Mono.just(body);
262-
}
263-
}
264-
else {
265-
bodyMono = Mono.empty();
266-
}
267-
268-
return bodyMono
269-
.map(httpEntityBuilder::body)
270-
.defaultIfEmpty(httpEntityBuilder.build());
271-
}
272-
)
273-
.map(this::getReply);
274-
}
275-
276220
private WebClient.RequestBodySpec createRequestBodySpec(Object uri, HttpMethod httpMethod,
277221
HttpEntity<?> httpRequest, Message<?> requestMessage, Map<String, ?> uriVariables) {
278222

@@ -294,17 +238,6 @@ private WebClient.RequestBodySpec createRequestBodySpec(Object uri, HttpMethod h
294238
return requestSpec;
295239
}
296240

297-
private Mono<ClientResponse> exchangeForResponseMono(WebClient.RequestBodySpec requestSpec) {
298-
return requestSpec.retrieve()
299-
.onStatus(HttpStatus::isError, ClientResponse::createException)
300-
.toEntityList(DataBuffer.class)
301-
.map((entity) ->
302-
ClientResponse.create(entity.getStatusCode())
303-
.headers((headers) -> headers.addAll(entity.getHeaders()))
304-
.body(Flux.fromIterable(entity.getBody())) // NOSONAR - not null according toEntityList()
305-
.build());
306-
}
307-
308241
@Nullable
309242
private BodyInserter<?, ? super ClientHttpRequest> buildBodyInserterForRequest(Message<?> requestMessage,
310243
HttpEntity<?> httpRequest) {
@@ -366,4 +299,75 @@ else if (MediaType.MULTIPART_FORM_DATA.equals(contentType)) {
366299
}
367300
}
368301

302+
private Mono<ResponseEntity<Flux<Object>>> exchangeForResponseMono(WebClient.RequestBodySpec requestSpec,
303+
Object expectedResponseType) {
304+
305+
return requestSpec.retrieve()
306+
.onStatus(HttpStatus::isError, ClientResponse::createException)
307+
.toEntityFlux(createBodyExtractor(expectedResponseType));
308+
}
309+
310+
@SuppressWarnings({ "unchecked", "rawtypes" })
311+
private BodyExtractor<Flux<Object>, ? super ClientHttpResponse> createBodyExtractor(Object expectedResponseType) {
312+
if (expectedResponseType != null) {
313+
if (this.replyPayloadToFlux) {
314+
if (expectedResponseType instanceof ParameterizedTypeReference) {
315+
return BodyExtractors.toFlux((ParameterizedTypeReference) expectedResponseType);
316+
}
317+
else {
318+
return BodyExtractors.toFlux((Class) expectedResponseType);
319+
}
320+
}
321+
else {
322+
BodyExtractor<? extends Mono<?>, ReactiveHttpInputMessage> monoExtractor;
323+
if (expectedResponseType instanceof ParameterizedTypeReference<?>) {
324+
monoExtractor = BodyExtractors.toMono((ParameterizedTypeReference) expectedResponseType);
325+
}
326+
else {
327+
monoExtractor = BodyExtractors.toMono((Class) expectedResponseType);
328+
}
329+
return (inputMessage, context) -> Flux.from(monoExtractor.extract(inputMessage, context));
330+
}
331+
}
332+
else if (this.bodyExtractor != null) {
333+
return (inputMessage, context) -> {
334+
Object body = this.bodyExtractor.extract(inputMessage, context);
335+
if (body instanceof Publisher) {
336+
return Flux.from((Publisher) body);
337+
}
338+
return Flux.just(body);
339+
};
340+
}
341+
else {
342+
return (inputMessage, context) -> Flux.empty();
343+
}
344+
}
345+
346+
private Object createReplyFromResponse(Mono<ResponseEntity<Flux<Object>>> responseMono) {
347+
return responseMono
348+
.flatMap(response -> {
349+
ResponseEntity.BodyBuilder httpEntityBuilder =
350+
ResponseEntity.status(response.getStatusCode())
351+
.headers(response.getHeaders());
352+
353+
Flux<?> body = response.getBody();
354+
Mono<?> bodyMono = Mono.empty();
355+
356+
if (body != null) {
357+
if (this.replyPayloadToFlux) {
358+
bodyMono = Mono.just(body);
359+
}
360+
else {
361+
bodyMono = body.next();
362+
}
363+
}
364+
365+
return bodyMono
366+
.map(httpEntityBuilder::body)
367+
.defaultIfEmpty(httpEntityBuilder.build());
368+
}
369+
)
370+
.map(this::getReply);
371+
}
372+
369373
}

spring-integration-webflux/src/test/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandlerTests.java

Lines changed: 63 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import org.springframework.core.io.buffer.DataBuffer;
2727
import org.springframework.core.io.buffer.DataBufferFactory;
28+
import org.springframework.core.io.buffer.DataBufferLimitException;
2829
import org.springframework.http.HttpStatus;
2930
import org.springframework.http.MediaType;
3031
import org.springframework.http.client.reactive.ClientHttpConnector;
@@ -38,6 +39,7 @@
3839
import org.springframework.messaging.MessageHandlingException;
3940
import org.springframework.messaging.support.ErrorMessage;
4041
import org.springframework.test.web.reactive.server.HttpHandlerConnector;
42+
import org.springframework.web.reactive.function.client.ExchangeStrategies;
4143
import org.springframework.web.reactive.function.client.WebClient;
4244
import org.springframework.web.reactive.function.client.WebClientResponseException;
4345

@@ -272,8 +274,8 @@ void testClientHttpResponseAsReply() {
272274
assertThat(response.getHeaders().getContentType()).isEqualTo(MediaType.TEXT_PLAIN);
273275

274276
StepVerifier.create(
275-
response.getBody()
276-
.map(dataBuffer -> new String(dataBuffer.asByteBuffer().array())))
277+
response.getBody()
278+
.map(dataBuffer -> new String(dataBuffer.asByteBuffer().array())))
277279
.expectNext("foo", "bar", "baz")
278280
.verifyComplete();
279281
}
@@ -289,37 +291,37 @@ void testClientHttpResponseErrorAsReply() {
289291

290292
Flux<DataBuffer> data =
291293
Flux.just(
292-
bufferFactory.wrap("{".getBytes(StandardCharsets.UTF_8)),
293-
bufferFactory.wrap(" \"error\": \"Not Found\",".getBytes(StandardCharsets.UTF_8)),
294-
bufferFactory.wrap(" \"message\": \"404 NOT_FOUND\",".getBytes(StandardCharsets.UTF_8)),
295-
bufferFactory.wrap(" \"path\": \"/spring-integration\",".getBytes(StandardCharsets.UTF_8)),
296-
bufferFactory.wrap(" \"status\": 404,".getBytes(StandardCharsets.UTF_8)),
297-
bufferFactory.wrap(" \"timestamp\": \"1970-01-01T00:00:00.000+00:00\",".getBytes(StandardCharsets.UTF_8)),
298-
bufferFactory.wrap(" \"trace\": \"some really\nlong\ntrace\",".getBytes(StandardCharsets.UTF_8)),
299-
bufferFactory.wrap("}".getBytes(StandardCharsets.UTF_8))
294+
bufferFactory.wrap("{".getBytes(StandardCharsets.UTF_8)),
295+
bufferFactory.wrap(" \"error\": \"Not Found\",".getBytes(StandardCharsets.UTF_8)),
296+
bufferFactory.wrap(" \"message\": \"404 NOT_FOUND\",".getBytes(StandardCharsets.UTF_8)),
297+
bufferFactory.wrap(" \"path\": \"/spring-integration\",".getBytes(StandardCharsets.UTF_8)),
298+
bufferFactory.wrap(" \"status\": 404,".getBytes(StandardCharsets.UTF_8)),
299+
bufferFactory.wrap(" \"timestamp\": \"1970-01-01T00:00:00.000+00:00\",".getBytes(StandardCharsets.UTF_8)),
300+
bufferFactory.wrap(" \"trace\": \"some really\nlong\ntrace\",".getBytes(StandardCharsets.UTF_8)),
301+
bufferFactory.wrap("}".getBytes(StandardCharsets.UTF_8))
300302
);
301303

302304
return response.writeWith(data)
303305
.then(Mono.defer(response::setComplete));
304306
});
305307

306308
WebClient webClient = WebClient.builder()
307-
.clientConnector(httpConnector)
308-
.build();
309+
.clientConnector(httpConnector)
310+
.build();
309311

310312
String destinationUri = "https://www.springsource.org/spring-integration";
311313
WebFluxRequestExecutingMessageHandler reactiveHandler =
312-
new WebFluxRequestExecutingMessageHandler(destinationUri, webClient);
314+
new WebFluxRequestExecutingMessageHandler(destinationUri, webClient);
313315

314316
QueueChannel replyChannel = new QueueChannel();
315317
QueueChannel errorChannel = new QueueChannel();
316318
reactiveHandler.setOutputChannel(replyChannel);
317319
reactiveHandler.setBodyExtractor(new ClientHttpResponseBodyExtractor());
318320

319321
final Message<?> message =
320-
MessageBuilder.withPayload("hello, world")
321-
.setErrorChannel(errorChannel)
322-
.build();
322+
MessageBuilder.withPayload("hello, world")
323+
.setErrorChannel(errorChannel)
324+
.build();
323325
reactiveHandler.handleMessage(message);
324326

325327
Message<?> errorMessage = errorChannel.receive(10_000);
@@ -331,4 +333,49 @@ void testClientHttpResponseErrorAsReply() {
331333
assertThat(throwable.getCause()).isInstanceOf(WebClientResponseException.NotFound.class);
332334
assertThat(throwable.getMessage()).contains("404 Not Found");
333335
}
336+
337+
@Test
338+
void testMaxInMemorySizeExceeded() {
339+
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
340+
response.setStatusCode(HttpStatus.OK);
341+
342+
DataBufferFactory bufferFactory = response.bufferFactory();
343+
344+
Mono<DataBuffer> data = Mono.just(bufferFactory.wrap("test".getBytes()));
345+
346+
return response.writeWith(data)
347+
.then(Mono.defer(response::setComplete));
348+
});
349+
350+
WebClient webClient = WebClient.builder()
351+
.clientConnector(httpConnector)
352+
.exchangeStrategies(ExchangeStrategies.builder()
353+
.codecs(clientCodecConfigurer -> clientCodecConfigurer
354+
.defaultCodecs()
355+
.maxInMemorySize(1))
356+
.build())
357+
.build();
358+
359+
String destinationUri = "https://www.springsource.org/spring-integration";
360+
WebFluxRequestExecutingMessageHandler reactiveHandler =
361+
new WebFluxRequestExecutingMessageHandler(destinationUri, webClient);
362+
363+
reactiveHandler.setExpectedResponseType(String.class);
364+
365+
QueueChannel errorChannel = new QueueChannel();
366+
reactiveHandler.handleMessage(MessageBuilder.withPayload("").setErrorChannel(errorChannel).build());
367+
368+
Message<?> errorMessage = errorChannel.receive(10000);
369+
assertThat(errorMessage).isNotNull();
370+
371+
Object payload = errorMessage.getPayload();
372+
assertThat(payload).isInstanceOf(MessageHandlingException.class)
373+
.extracting("cause")
374+
.isInstanceOf(WebClientResponseException.class)
375+
.extracting("cause")
376+
.isInstanceOf(DataBufferLimitException.class)
377+
.extracting("message")
378+
.isEqualTo("Exceeded limit on max bytes to buffer : 1");
379+
}
380+
334381
}

0 commit comments

Comments
 (0)