Skip to content

Commit 126ac84

Browse files
committed
Fix behavior of ClientResponse#bodyTo** with Void
Prior to this commit, asking for a `Void` type using any of the `ClientResponse#bodyTo*` methods would immediately return an empty `Publisher` without consuming the response body. Not doing so can lead to HTTP connection pool inconsistencies and/or memory leaks, since: * a connection that still has a response body being written to it cannot be properly recycled in the connection pool * incoming `DataBuffer` might not be released This commit detects when `Void` types are asked as body types and in those cases does the following: 1. Subscribe to the response body `Publisher` to allow the connection to be returned to the connection pool 2. `cancel()` the body `Publisher` if the response body is not empty; in that case, we choose to close the connection vs. consume the whole response body Those changes imply that `ClientHttpResponse` and other related contracts don't need a `close()` method anymore. Issue: SPR-16018
1 parent ec345bf commit 126ac84

File tree

11 files changed

+122
-152
lines changed

11 files changed

+122
-152
lines changed

spring-test/src/main/java/org/springframework/mock/http/client/reactive/MockClientHttpResponse.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,6 @@ public class MockClientHttpResponse implements ClientHttpResponse {
5555

5656
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
5757

58-
private boolean closed = false;
59-
6058

6159
public MockClientHttpResponse(HttpStatus status) {
6260
Assert.notNull(status, "HttpStatus is required");
@@ -98,21 +96,9 @@ private DataBuffer toDataBuffer(String body, Charset charset) {
9896

9997
@Override
10098
public Flux<DataBuffer> getBody() {
101-
if (this.closed) {
102-
return Flux.error(new IllegalStateException("Connection has been closed."));
103-
}
10499
return this.body;
105100
}
106101

107-
@Override
108-
public void close() {
109-
this.closed = true;
110-
}
111-
112-
public boolean isClosed() {
113-
return this.closed;
114-
}
115-
116102
/**
117103
* Return the response body aggregated and converted to a String using the
118104
* charset of the Content-Type response or otherwise as "UTF-8".

spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponse.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package org.springframework.http.client.reactive;
1818

19-
import java.io.Closeable;
20-
2119
import org.springframework.http.HttpStatus;
2220
import org.springframework.http.ReactiveHttpInputMessage;
2321
import org.springframework.http.ResponseCookie;
@@ -30,7 +28,7 @@
3028
* @author Brian Clozel
3129
* @since 5.0
3230
*/
33-
public interface ClientHttpResponse extends ReactiveHttpInputMessage, Closeable {
31+
public interface ClientHttpResponse extends ReactiveHttpInputMessage {
3432

3533
/**
3634
* Return the HTTP status as an {@link HttpStatus} enum value.
@@ -42,16 +40,4 @@ public interface ClientHttpResponse extends ReactiveHttpInputMessage, Closeable
4240
*/
4341
MultiValueMap<String, ResponseCookie> getCookies();
4442

45-
/**
46-
* Close this response and the underlying HTTP connection.
47-
* <p>This non-blocking method has to be called if its body isn't going
48-
* to be consumed. Not doing so might result in HTTP connection pool
49-
* inconsistencies or memory leaks.
50-
* <p>This shouldn't be called if the response body is read,
51-
* because it would prevent connections to be reused and cancel
52-
* the benefits of using a connection pooling.
53-
*/
54-
@Override
55-
void close();
56-
5743
}

spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponseDecorator.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,6 @@ public Flux<DataBuffer> getBody() {
7070
return this.delegate.getBody();
7171
}
7272

73-
@Override
74-
public void close() {
75-
this.delegate.close();
76-
}
77-
7873
@Override
7974
public String toString() {
8075
return getClass().getSimpleName() + " [delegate=" + getDelegate() + "]";

spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,6 @@ public MultiValueMap<String, ResponseCookie> getCookies() {
8989
return CollectionUtils.unmodifiableMultiValueMap(result);
9090
}
9191

92-
@Override
93-
public void close() {
94-
this.response.dispose();
95-
}
96-
9792
@Override
9893
public String toString() {
9994
return "ReactorClientHttpResponse{" +

spring-web/src/test/java/org/springframework/mock/http/client/reactive/test/MockClientHttpResponse.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ public class MockClientHttpResponse implements ClientHttpResponse {
5555

5656
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
5757

58-
private boolean closed = false;
5958

6059
public MockClientHttpResponse(HttpStatus status) {
6160
Assert.notNull(status, "HttpStatus is required");
@@ -99,21 +98,9 @@ private DataBuffer toDataBuffer(String body, Charset charset) {
9998

10099
@Override
101100
public Flux<DataBuffer> getBody() {
102-
if (this.closed) {
103-
return Flux.error(new IllegalStateException("Connection has been closed."));
104-
}
105101
return this.body;
106102
}
107103

108-
@Override
109-
public void close() {
110-
this.closed = true;
111-
}
112-
113-
public boolean isClosed() {
114-
return this.closed;
115-
}
116-
117104
/**
118105
* Return the response body aggregated and converted to a String using the
119106
* charset of the Content-Type response or otherwise as "UTF-8".

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package org.springframework.web.reactive.function.client;
1818

19-
import java.io.Closeable;
2019
import java.util.List;
2120
import java.util.Optional;
2221
import java.util.OptionalLong;
@@ -44,7 +43,7 @@
4443
* @author Arjen Poutsma
4544
* @since 5.0
4645
*/
47-
public interface ClientResponse extends Closeable {
46+
public interface ClientResponse {
4847

4948
/**
5049
* Return the status code of this response.
@@ -133,18 +132,6 @@ public interface ClientResponse extends Closeable {
133132
*/
134133
<T> Mono<ResponseEntity<List<T>>> toEntityList(ParameterizedTypeReference<T> typeReference);
135134

136-
/**
137-
* Close this response and the underlying HTTP connection.
138-
* <p>This non-blocking method has to be called if its body isn't going
139-
* to be consumed. Not doing so might result in HTTP connection pool
140-
* inconsistencies or memory leaks.
141-
* <p>This shouldn't be called if the response body is read,
142-
* because it would prevent connections to be reused and cancel
143-
* the benefits of using a connection pooling.
144-
*/
145-
@Override
146-
void close();
147-
148135

149136
/**
150137
* Represents the headers of the HTTP response.

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import reactor.core.publisher.Mono;
2727

2828
import org.springframework.core.ParameterizedTypeReference;
29+
import org.springframework.core.io.buffer.DataBufferUtils;
2930
import org.springframework.http.HttpHeaders;
3031
import org.springframework.http.HttpStatus;
3132
import org.springframework.http.MediaType;
@@ -98,32 +99,73 @@ public Map<String, Object> hints() {
9899

99100
@Override
100101
public <T> Mono<T> bodyToMono(Class<? extends T> elementClass) {
101-
return body(BodyExtractors.toMono(elementClass));
102+
if (Void.class.isAssignableFrom(elementClass)) {
103+
return consumeAndCancel();
104+
}
105+
else {
106+
return body(BodyExtractors.toMono(elementClass));
107+
}
108+
}
109+
110+
@SuppressWarnings("unchecked")
111+
private <T> Mono<T> consumeAndCancel() {
112+
return (Mono<T>) this.response.getBody()
113+
.map(buffer -> {
114+
DataBufferUtils.release(buffer);
115+
throw new ReadCancellationException();
116+
})
117+
.onErrorResume(ReadCancellationException.class, ex -> Mono.empty())
118+
.then();
102119
}
103120

104121
@Override
105122
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> typeReference) {
106-
return body(BodyExtractors.toMono(typeReference));
123+
if (Void.class.isAssignableFrom(typeReference.getType().getClass())) {
124+
return consumeAndCancel();
125+
}
126+
else {
127+
return body(BodyExtractors.toMono(typeReference));
128+
}
107129
}
108130

109131
@Override
110132
public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
111-
return body(BodyExtractors.toFlux(elementClass));
133+
if (Void.class.isAssignableFrom(elementClass)) {
134+
return Flux.from(consumeAndCancel());
135+
}
136+
else {
137+
return body(BodyExtractors.toFlux(elementClass));
138+
}
112139
}
113140

114141
@Override
115142
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> typeReference) {
116-
return body(BodyExtractors.toFlux(typeReference));
143+
if (Void.class.isAssignableFrom(typeReference.getType().getClass())) {
144+
return Flux.from(consumeAndCancel());
145+
}
146+
else {
147+
return body(BodyExtractors.toFlux(typeReference));
148+
}
117149
}
118150

119151
@Override
120152
public <T> Mono<ResponseEntity<T>> toEntity(Class<T> bodyType) {
121-
return toEntityInternal(bodyToMono(bodyType));
153+
if (Void.class.isAssignableFrom(bodyType)) {
154+
return toEntityInternal(consumeAndCancel());
155+
}
156+
else {
157+
return toEntityInternal(bodyToMono(bodyType));
158+
}
122159
}
123160

124161
@Override
125162
public <T> Mono<ResponseEntity<T>> toEntity(ParameterizedTypeReference<T> typeReference) {
126-
return toEntityInternal(bodyToMono(typeReference));
163+
if (Void.class.isAssignableFrom(typeReference.getType().getClass())) {
164+
return toEntityInternal(consumeAndCancel());
165+
}
166+
else {
167+
return toEntityInternal(bodyToMono(typeReference));
168+
}
127169
}
128170

129171
private <T> Mono<ResponseEntity<T>> toEntityInternal(Mono<T> bodyMono) {
@@ -154,10 +196,6 @@ private <T> Mono<ResponseEntity<List<T>>> toEntityListInternal(Flux<T> bodyFlux)
154196
.map(body -> new ResponseEntity<>(body, headers, statusCode));
155197
}
156198

157-
@Override
158-
public void close() {
159-
this.response.close();
160-
}
161199

162200
private class DefaultHeaders implements Headers {
163201

@@ -191,4 +229,8 @@ private OptionalLong toOptionalLong(long value) {
191229
}
192230

193231
}
232+
233+
@SuppressWarnings("serial")
234+
private class ReadCancellationException extends RuntimeException {
235+
}
194236
}

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -461,17 +461,11 @@ interface RequestHeadersSpec<S extends RequestHeadersSpec<S>> {
461461
* .exchange()
462462
* .flatMapMany(response -> response.bodyToFlux(Pojo.class));
463463
* </pre>
464-
* <p>If the response body is not consumed with {@code bodyTo*}
465-
* or {@code toEntity*} methods, it is your responsibility
466-
* to release the HTTP resources with {@link ClientResponse#close()}.
467-
* <pre>
468-
* Mono&lt;HttpStatus&gt; mono = client.get().uri("/")
469-
* .exchange()
470-
* .map(response -> {
471-
* response.close();
472-
* return response.statusCode();
473-
* });
474-
* </pre>
464+
* <p>The response body should always be consumed with {@code bodyTo*}
465+
* or {@code toEntity*} methods; if you do not care about the body,
466+
* you can use {@code bodyToMono(Void.class)}.
467+
* <p>Not consuming the response body might lead to HTTP connection pool
468+
* inconsistencies or memory leaks.
475469
* @return a {@code Mono} with the response
476470
* @see #retrieve()
477471
*/
@@ -491,8 +485,6 @@ interface RequestHeadersSpec<S extends RequestHeadersSpec<S>> {
491485
* .retrieve()
492486
* .bodyToMono(Pojo.class);
493487
* </pre>
494-
* <p>Since this method reads the response body,
495-
* {@link ClientResponse#close()} should not be called.
496488
* @return spec with options for extracting the response body
497489
*/
498490
ResponseSpec retrieve();

spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultClientResponseTests.java

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.junit.Test;
2929
import reactor.core.publisher.Flux;
3030
import reactor.core.publisher.Mono;
31+
import reactor.test.StepVerifier;
32+
import reactor.test.publisher.TestPublisher;
3133

3234
import org.springframework.core.ParameterizedTypeReference;
3335
import org.springframework.core.codec.StringDecoder;
@@ -46,8 +48,10 @@
4648
import org.springframework.util.LinkedMultiValueMap;
4749
import org.springframework.util.MultiValueMap;
4850

49-
import static org.junit.Assert.*;
50-
import static org.mockito.Mockito.*;
51+
import static org.junit.Assert.assertEquals;
52+
import static org.junit.Assert.assertSame;
53+
import static org.mockito.Mockito.mock;
54+
import static org.mockito.Mockito.when;
5155
import static org.springframework.web.reactive.function.BodyExtractors.toMono;
5256

5357
/**
@@ -214,7 +218,8 @@ public void bodyToFluxTypeReference() throws Exception {
214218
when(mockExchangeStrategies.messageReaders()).thenReturn(messageReaders);
215219

216220
Flux<String> resultFlux =
217-
defaultClientResponse.bodyToFlux(new ParameterizedTypeReference<String>() {});
221+
defaultClientResponse.bodyToFlux(new ParameterizedTypeReference<String>() {
222+
});
218223
Mono<List<String>> result = resultFlux.collectList();
219224
assertEquals(Collections.singletonList("foo"), result.block());
220225
}
@@ -260,7 +265,8 @@ public void toEntityTypeReference() throws Exception {
260265
when(mockExchangeStrategies.messageReaders()).thenReturn(messageReaders);
261266

262267
ResponseEntity<String> result = defaultClientResponse.toEntity(
263-
new ParameterizedTypeReference<String>() {}).block();
268+
new ParameterizedTypeReference<String>() {
269+
}).block();
264270
assertEquals("foo", result.getBody());
265271
assertEquals(HttpStatus.OK, result.getStatusCode());
266272
assertEquals(MediaType.TEXT_PLAIN, result.getHeaders().getContentType());
@@ -307,13 +313,60 @@ public void toEntityListTypeReference() throws Exception {
307313
when(mockExchangeStrategies.messageReaders()).thenReturn(messageReaders);
308314

309315
ResponseEntity<List<String>> result = defaultClientResponse.toEntityList(
310-
new ParameterizedTypeReference<String>() {}).block();
316+
new ParameterizedTypeReference<String>() {
317+
}).block();
311318
assertEquals(Collections.singletonList("foo"), result.getBody());
312319
assertEquals(HttpStatus.OK, result.getStatusCode());
313320
assertEquals(MediaType.TEXT_PLAIN, result.getHeaders().getContentType());
314321
}
315322

323+
@Test
324+
public void toMonoVoid() throws Exception {
325+
TestPublisher<DataBuffer> body = TestPublisher.create();
326+
327+
HttpHeaders httpHeaders = new HttpHeaders();
328+
httpHeaders.setContentType(MediaType.TEXT_PLAIN);
329+
when(mockResponse.getHeaders()).thenReturn(httpHeaders);
330+
when(mockResponse.getStatusCode()).thenReturn(HttpStatus.OK);
331+
when(mockResponse.getBody()).thenReturn(body.flux());
332+
333+
List<HttpMessageReader<?>> messageReaders = Collections
334+
.singletonList(new DecoderHttpMessageReader<>(StringDecoder.allMimeTypes(true)));
335+
when(mockExchangeStrategies.messageReaders()).thenReturn(messageReaders);
336+
337+
StepVerifier.create(defaultClientResponse.bodyToMono(Void.class))
338+
.then(() -> {
339+
body.assertWasSubscribed();
340+
body.complete();
341+
})
342+
.verifyComplete();
343+
}
344+
345+
@Test
346+
public void toMonoVoidNonEmptyBody() throws Exception {
347+
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
348+
DefaultDataBuffer dataBuffer =
349+
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
350+
TestPublisher<DataBuffer> body = TestPublisher.create();
351+
352+
HttpHeaders httpHeaders = new HttpHeaders();
353+
httpHeaders.setContentType(MediaType.TEXT_PLAIN);
354+
when(mockResponse.getHeaders()).thenReturn(httpHeaders);
355+
when(mockResponse.getStatusCode()).thenReturn(HttpStatus.OK);
356+
when(mockResponse.getBody()).thenReturn(body.flux());
316357

358+
List<HttpMessageReader<?>> messageReaders = Collections
359+
.singletonList(new DecoderHttpMessageReader<>(StringDecoder.allMimeTypes(true)));
360+
when(mockExchangeStrategies.messageReaders()).thenReturn(messageReaders);
361+
362+
StepVerifier.create(defaultClientResponse.bodyToMono(Void.class))
363+
.then(() -> {
364+
body.assertWasSubscribed();
365+
body.emit(dataBuffer);
366+
})
367+
.verifyComplete();
317368

369+
body.assertCancelled();
370+
}
318371

319372
}

0 commit comments

Comments
 (0)