Skip to content

Commit 16f3f8d

Browse files
committed
Add close() method on HTTP client response
Before this commit, there was no way to signal the HTTP client that we were done consuming the response. Without that, the underlying client library cannot know when it is safe to release the associated resources (e.g. the HTTP connection). This commit adds new `close()` methods on both `ClientHttpResponse` and `ClientResponse`. This methods is non-blocking and its behavior depends on the library, its configuration, HTTP version, etc. At the `WebClient` level, `close()` is called automatically if we consume the response body through the `ResponseSpec` or the `ClientResponse` itself. Note that it is *required* to call `close()` manually otherwise; not doing so might create resource leaks or connection issues. Issue: SPR-15920
1 parent ba6b617 commit 16f3f8d

File tree

11 files changed

+238
-16
lines changed

11 files changed

+238
-16
lines changed

spring-test/src/main/java/org/springframework/mock/http/client/reactive/MockClientHttpResponse.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ public class MockClientHttpResponse implements ClientHttpResponse {
5555

5656
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
5757

58+
private boolean closed = false;
59+
5860

5961
public MockClientHttpResponse(HttpStatus status) {
6062
Assert.notNull(status, "HttpStatus is required");
@@ -94,10 +96,23 @@ private DataBuffer toDataBuffer(String body, Charset charset) {
9496
return this.bufferFactory.wrap(byteBuffer);
9597
}
9698

99+
@Override
97100
public Flux<DataBuffer> getBody() {
101+
if (this.closed) {
102+
return Flux.error(new IllegalStateException("Connection has been closed."));
103+
}
98104
return this.body;
99105
}
100106

107+
@Override
108+
public void close() {
109+
this.closed = true;
110+
}
111+
112+
public boolean isClosed() {
113+
return this.closed;
114+
}
115+
101116
/**
102117
* Return the response body aggregated and converted to a String using the
103118
* charset of the Content-Type response or otherwise as "UTF-8".

spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponse.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.http.client.reactive;
1818

19+
import java.io.Closeable;
20+
1921
import org.springframework.http.HttpStatus;
2022
import org.springframework.http.ReactiveHttpInputMessage;
2123
import org.springframework.http.ResponseCookie;
@@ -25,9 +27,10 @@
2527
* Represents a client-side reactive HTTP response.
2628
*
2729
* @author Arjen Poutsma
30+
* @author Brian Clozel
2831
* @since 5.0
2932
*/
30-
public interface ClientHttpResponse extends ReactiveHttpInputMessage {
33+
public interface ClientHttpResponse extends ReactiveHttpInputMessage, Closeable {
3134

3235
/**
3336
* Return the HTTP status as an {@link HttpStatus} enum value.
@@ -39,4 +42,15 @@ public interface ClientHttpResponse extends ReactiveHttpInputMessage {
3942
*/
4043
MultiValueMap<String, ResponseCookie> getCookies();
4144

45+
/**
46+
* Close this response, freeing any resources created.
47+
* <p>This non-blocking method has to be called once the response has been
48+
* processed and the resources are no longer needed; not doing so might
49+
* create resource leaks or connection issues.
50+
* <p>Depending on the client configuration and HTTP version,
51+
* this can lead to closing the connection or returning it to a connection pool.
52+
*/
53+
@Override
54+
void close();
55+
4256
}

spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponseDecorator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ public Flux<DataBuffer> getBody() {
7070
return this.delegate.getBody();
7171
}
7272

73+
@Override
74+
public void close() {
75+
this.delegate.close();
76+
}
7377

7478
@Override
7579
public String toString() {

spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ public MultiValueMap<String, ResponseCookie> getCookies() {
8989
return CollectionUtils.unmodifiableMultiValueMap(result);
9090
}
9191

92+
@Override
93+
public void close() {
94+
this.response.dispose();
95+
}
9296

9397
@Override
9498
public String toString() {

spring-web/src/test/java/org/springframework/mock/http/client/reactive/test/MockClientHttpResponse.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,15 @@ public class MockClientHttpResponse implements ClientHttpResponse {
5555

5656
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
5757

58+
private boolean closed = false;
5859

5960
public MockClientHttpResponse(HttpStatus status) {
6061
Assert.notNull(status, "HttpStatus is required");
6162
this.status = status;
6263
}
6364

6465

66+
@Override
6567
public HttpStatus getStatusCode() {
6668
return this.status;
6769
}
@@ -71,6 +73,7 @@ public HttpHeaders getHeaders() {
7173
return this.headers;
7274
}
7375

76+
@Override
7477
public MultiValueMap<String, ResponseCookie> getCookies() {
7578
return this.cookies;
7679
}
@@ -94,10 +97,23 @@ private DataBuffer toDataBuffer(String body, Charset charset) {
9497
return this.bufferFactory.wrap(byteBuffer);
9598
}
9699

100+
@Override
97101
public Flux<DataBuffer> getBody() {
102+
if (this.closed) {
103+
return Flux.error(new IllegalStateException("Connection has been closed."));
104+
}
98105
return this.body;
99106
}
100107

108+
@Override
109+
public void close() {
110+
this.closed = true;
111+
}
112+
113+
public boolean isClosed() {
114+
return this.closed;
115+
}
116+
101117
/**
102118
* Return the response body aggregated and converted to a String using the
103119
* charset of the Content-Type response or otherwise as "UTF-8".

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.web.reactive.function.client;
1818

19+
import java.io.Closeable;
1920
import java.util.List;
2021
import java.util.Optional;
2122
import java.util.OptionalLong;
@@ -43,7 +44,7 @@
4344
* @author Arjen Poutsma
4445
* @since 5.0
4546
*/
46-
public interface ClientResponse {
47+
public interface ClientResponse extends Closeable {
4748

4849
/**
4950
* Return the status code of this response.
@@ -132,6 +133,18 @@ public interface ClientResponse {
132133
*/
133134
<T> Mono<ResponseEntity<List<T>>> toEntityList(ParameterizedTypeReference<T> typeReference);
134135

136+
/**
137+
* Close this response, freeing any resources created.
138+
* <p>This non-blocking method has to be called once the response has been processed
139+
* and the resources are no longer needed.
140+
* <p>{@code ClientResponse.bodyTo*}, {@code ClientResponse.toEntity*}
141+
* and all methods under {@code WebClient.retrieve()} will close the response
142+
* automatically.
143+
* <p>It is required to call close() manually otherwise; not doing so might
144+
* create resource leaks or connection issues.
145+
*/
146+
@Override
147+
void close();
135148

136149
/**
137150
* Represents the headers of the HTTP response.

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
* Default implementation of {@link ClientResponse}.
4343
*
4444
* @author Arjen Poutsma
45+
* @author Brian Clozel
4546
* @since 5.0
4647
*/
4748
class DefaultClientResponse implements ClientResponse {
@@ -97,22 +98,24 @@ public Map<String, Object> hints() {
9798

9899
@Override
99100
public <T> Mono<T> bodyToMono(Class<? extends T> elementClass) {
100-
return body(BodyExtractors.toMono(elementClass));
101+
Mono<T> body = body(BodyExtractors.toMono(elementClass));
102+
return body.doOnTerminate(this.response::close);
101103
}
102104

103105
@Override
104106
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> typeReference) {
105-
return body(BodyExtractors.toMono(typeReference));
107+
return body(BodyExtractors.toMono(typeReference)).doOnTerminate(this.response::close);
106108
}
107109

108110
@Override
109111
public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
110-
return body(BodyExtractors.toFlux(elementClass));
112+
Flux<T> body = body(BodyExtractors.toFlux(elementClass));
113+
return body.doOnTerminate(this.response::close);
111114
}
112115

113116
@Override
114117
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> typeReference) {
115-
return body(BodyExtractors.toFlux(typeReference));
118+
return body(BodyExtractors.toFlux(typeReference)).doOnTerminate(this.response::close);
116119
}
117120

118121
@Override
@@ -131,7 +134,8 @@ private <T> Mono<ResponseEntity<T>> toEntityInternal(Mono<T> bodyMono) {
131134
return bodyMono
132135
.map(body -> new ResponseEntity<>(body, headers, statusCode))
133136
.switchIfEmpty(Mono.defer(
134-
() -> Mono.just(new ResponseEntity<>(headers, statusCode))));
137+
() -> Mono.just(new ResponseEntity<>(headers, statusCode))))
138+
.doOnTerminate(this.response::close);
135139
}
136140

137141
@Override
@@ -150,9 +154,14 @@ private <T> Mono<ResponseEntity<List<T>>> toEntityListInternal(Flux<T> bodyFlux)
150154
HttpStatus statusCode = statusCode();
151155
return bodyFlux
152156
.collectList()
153-
.map(body -> new ResponseEntity<>(body, headers, statusCode));
157+
.map(body -> new ResponseEntity<>(body, headers, statusCode))
158+
.doOnTerminate(this.response::close);
154159
}
155160

161+
@Override
162+
public void close() {
163+
this.response.close();
164+
}
156165

157166
private class DefaultHeaders implements Headers {
158167

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -413,50 +413,64 @@ public ResponseSpec onStatus(Predicate<HttpStatus> statusPredicate,
413413
@SuppressWarnings("unchecked")
414414
public <T> Mono<T> bodyToMono(Class<T> bodyType) {
415415
return this.responseMono.flatMap(
416-
response -> bodyToPublisher(response, BodyExtractors.toMono(bodyType),
416+
response -> bodyToMono(response, BodyExtractors.toMono(bodyType),
417417
this::monoThrowableToMono));
418418
}
419419

420420
@Override
421421
@SuppressWarnings("unchecked")
422422
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> typeReference) {
423423
return this.responseMono.flatMap(
424-
response -> bodyToPublisher(response, BodyExtractors.toMono(typeReference),
424+
response -> bodyToMono(response, BodyExtractors.toMono(typeReference),
425425
mono -> (Mono<T>)mono));
426426
}
427427

428428
private <T> Mono<T> monoThrowableToMono(Mono<? extends Throwable> mono) {
429429
return mono.flatMap(Mono::error);
430430
}
431431

432+
private <T> Mono<T> bodyToMono(ClientResponse response,
433+
BodyExtractor<Mono<T>, ? super ClientHttpResponse> extractor,
434+
Function<Mono<? extends Throwable>, Mono<T>> errorFunction) {
435+
436+
return this.statusHandlers.stream()
437+
.filter(statusHandler -> statusHandler.test(response.statusCode()))
438+
.findFirst()
439+
.map(statusHandler -> statusHandler.apply(response))
440+
.map(errorFunction::apply)
441+
.orElse(response.body(extractor))
442+
.doAfterTerminate(response::close);
443+
}
444+
432445
@Override
433446
public <T> Flux<T> bodyToFlux(Class<T> elementType) {
434447
return this.responseMono.flatMapMany(
435-
response -> bodyToPublisher(response, BodyExtractors.toFlux(elementType),
448+
response -> bodyToFlux(response, BodyExtractors.toFlux(elementType),
436449
this::monoThrowableToFlux));
437450
}
438451

439452
@Override
440453
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> typeReference) {
441454
return this.responseMono.flatMapMany(
442-
response -> bodyToPublisher(response, BodyExtractors.toFlux(typeReference),
455+
response -> bodyToFlux(response, BodyExtractors.toFlux(typeReference),
443456
this::monoThrowableToFlux));
444457
}
445458

446459
private <T> Flux<T> monoThrowableToFlux(Mono<? extends Throwable> mono) {
447460
return mono.flatMapMany(Flux::error);
448461
}
449462

450-
private <T extends Publisher<?>> T bodyToPublisher(ClientResponse response,
451-
BodyExtractor<T, ? super ClientHttpResponse> extractor,
452-
Function<Mono<? extends Throwable>, T> errorFunction) {
463+
private <T> Flux<T> bodyToFlux(ClientResponse response,
464+
BodyExtractor<Flux<T>, ? super ClientHttpResponse> extractor,
465+
Function<Mono<? extends Throwable>, Flux<T>> errorFunction) {
453466

454467
return this.statusHandlers.stream()
455468
.filter(statusHandler -> statusHandler.test(response.statusCode()))
456469
.findFirst()
457470
.map(statusHandler -> statusHandler.apply(response))
458471
.map(errorFunction::apply)
459-
.orElse(response.body(extractor));
472+
.orElse(response.body(extractor))
473+
.doAfterTerminate(response::close);
460474
}
461475

462476
private static Mono<WebClientResponseException> createResponseException(ClientResponse response) {

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,17 @@ interface RequestHeadersSpec<S extends RequestHeadersSpec<S>> {
461461
* .exchange()
462462
* .flatMapMany(response -> response.bodyToFlux(Pojo.class));
463463
* </pre>
464+
* <p>If the response body is not consumed with {@code bodyTo*}
465+
* or {@code toEntity*} methods, it is your responsibility
466+
* to release the HTTP resources with {@link ClientResponse#close()}.
467+
* <pre>
468+
* Mono&lt;HttpStatus&gt; mono = client.get().uri("/")
469+
* .exchange()
470+
* .map(response -> {
471+
* response.close();
472+
* return response.statusCode();
473+
* });
474+
* </pre>
464475
* @return a {@code Mono} with the response
465476
* @see #retrieve()
466477
*/

spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ public void shutdown() throws Exception {
7070
public void headers() throws Exception {
7171
this.server.enqueue(new MockResponse().setHeader("Content-Type", "text/plain").setBody("Hello Spring!"));
7272

73+
this.webClient.get().uri("/test")
74+
.exchange()
75+
.map(response -> {
76+
response.close();
77+
return response.statusCode();
78+
});
7379
Mono<HttpHeaders> result = this.webClient.get()
7480
.uri("/greeting?name=Spring")
7581
.exchange()

0 commit comments

Comments
 (0)