Skip to content

Commit 10139d4

Browse files
committed
Revisit meaning of response.close() in HTTP client
Prior to this issue, SPR-15920 added this new `close()` method which was supposed to be called to clean resources after response processing. This commit changes the meaning of that method: calling `close()` will close the underlying HTTP connection. This has to be called if the response body is not consumed by the application, since at that point the underlying connection might be in an inconsistent state if shared in a connection pool. Issue: SPR-15993
1 parent fb09a75 commit 10139d4

File tree

6 files changed

+33
-112
lines changed

6 files changed

+33
-112
lines changed

spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponse.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,13 @@ public interface ClientHttpResponse extends ReactiveHttpInputMessage, Closeable
4343
MultiValueMap<String, ResponseCookie> getCookies();
4444

4545
/**
46-
* Close this response, freeing any resources created.
47-
* <p>This non-blocking method has to be called once the response has been
48-
* processed and the resources are no longer needed; not doing so might
49-
* create resource leaks or connection issues.
50-
* <p>Depending on the client configuration and HTTP version,
51-
* this can lead to closing the connection or returning it to a connection pool.
46+
* Close this response and the underlying HTTP connection.
47+
* <p>This non-blocking method has to be called if its body isn't going
48+
* to be consumed. Not doing so might result in HTTP connection pool
49+
* inconsistencies or memory leaks.
50+
* <p>This shouldn't be called if the response body is read,
51+
* because it would prevent connections to be reused and cancel
52+
* the benefits of using a connection pooling.
5253
*/
5354
@Override
5455
void close();

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -134,14 +134,13 @@ public interface ClientResponse extends Closeable {
134134
<T> Mono<ResponseEntity<List<T>>> toEntityList(ParameterizedTypeReference<T> typeReference);
135135

136136
/**
137-
* Close this response, freeing any resources created.
138-
* <p>This non-blocking method has to be called once the response has been processed
139-
* and the resources are no longer needed.
140-
* <p>{@code ClientResponse.bodyTo*}, {@code ClientResponse.toEntity*}
141-
* and all methods under {@code WebClient.retrieve()} will close the response
142-
* automatically.
143-
* <p>It is required to call close() manually otherwise; not doing so might
144-
* create resource leaks or connection issues.
137+
* Close this response and the underlying HTTP connection.
138+
* <p>This non-blocking method has to be called if its body isn't going
139+
* to be consumed. Not doing so might result in HTTP connection pool
140+
* inconsistencies or memory leaks.
141+
* <p>This shouldn't be called if the response body is read,
142+
* because it would prevent connections to be reused and cancel
143+
* the benefits of using a connection pooling.
145144
*/
146145
@Override
147146
void close();

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -98,24 +98,22 @@ public Map<String, Object> hints() {
9898

9999
@Override
100100
public <T> Mono<T> bodyToMono(Class<? extends T> elementClass) {
101-
Mono<T> body = body(BodyExtractors.toMono(elementClass));
102-
return body.doOnTerminate(this.response::close);
101+
return body(BodyExtractors.toMono(elementClass));
103102
}
104103

105104
@Override
106105
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> typeReference) {
107-
return body(BodyExtractors.toMono(typeReference)).doOnTerminate(this.response::close);
106+
return body(BodyExtractors.toMono(typeReference));
108107
}
109108

110109
@Override
111110
public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
112-
Flux<T> body = body(BodyExtractors.toFlux(elementClass));
113-
return body.doOnTerminate(this.response::close);
111+
return body(BodyExtractors.toFlux(elementClass));
114112
}
115113

116114
@Override
117115
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> typeReference) {
118-
return body(BodyExtractors.toFlux(typeReference)).doOnTerminate(this.response::close);
116+
return body(BodyExtractors.toFlux(typeReference));
119117
}
120118

121119
@Override
@@ -134,8 +132,7 @@ private <T> Mono<ResponseEntity<T>> toEntityInternal(Mono<T> bodyMono) {
134132
return bodyMono
135133
.map(body -> new ResponseEntity<>(body, headers, statusCode))
136134
.switchIfEmpty(Mono.defer(
137-
() -> Mono.just(new ResponseEntity<>(headers, statusCode))))
138-
.doOnTerminate(this.response::close);
135+
() -> Mono.just(new ResponseEntity<>(headers, statusCode))));
139136
}
140137

141138
@Override
@@ -154,8 +151,7 @@ private <T> Mono<ResponseEntity<List<T>>> toEntityListInternal(Flux<T> bodyFlux)
154151
HttpStatus statusCode = statusCode();
155152
return bodyFlux
156153
.collectList()
157-
.map(body -> new ResponseEntity<>(body, headers, statusCode))
158-
.doOnTerminate(this.response::close);
154+
.map(body -> new ResponseEntity<>(body, headers, statusCode));
159155
}
160156

161157
@Override

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -409,64 +409,50 @@ public ResponseSpec onStatus(Predicate<HttpStatus> statusPredicate,
409409
@SuppressWarnings("unchecked")
410410
public <T> Mono<T> bodyToMono(Class<T> bodyType) {
411411
return this.responseMono.flatMap(
412-
response -> bodyToMono(response, BodyExtractors.toMono(bodyType),
412+
response -> bodyToPublisher(response, BodyExtractors.toMono(bodyType),
413413
this::monoThrowableToMono));
414414
}
415415

416416
@Override
417417
@SuppressWarnings("unchecked")
418418
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> typeReference) {
419419
return this.responseMono.flatMap(
420-
response -> bodyToMono(response, BodyExtractors.toMono(typeReference),
421-
this::monoThrowableToMono));
420+
response -> bodyToPublisher(response, BodyExtractors.toMono(typeReference),
421+
mono -> (Mono<T>)mono));
422422
}
423423

424424
private <T> Mono<T> monoThrowableToMono(Mono<? extends Throwable> mono) {
425425
return mono.flatMap(Mono::error);
426426
}
427427

428-
private <T> Mono<T> bodyToMono(ClientResponse response,
429-
BodyExtractor<Mono<T>, ? super ClientHttpResponse> extractor,
430-
Function<Mono<? extends Throwable>, Mono<T>> errorFunction) {
431-
432-
return this.statusHandlers.stream()
433-
.filter(statusHandler -> statusHandler.test(response.statusCode()))
434-
.findFirst()
435-
.map(statusHandler -> statusHandler.apply(response))
436-
.map(errorFunction::apply)
437-
.orElse(response.body(extractor))
438-
.doAfterTerminate(response::close);
439-
}
440-
441-
@Override
428+
@Override
442429
public <T> Flux<T> bodyToFlux(Class<T> elementType) {
443430
return this.responseMono.flatMapMany(
444-
response -> bodyToFlux(response, BodyExtractors.toFlux(elementType),
431+
response -> bodyToPublisher(response, BodyExtractors.toFlux(elementType),
445432
this::monoThrowableToFlux));
446433
}
447434

448435
@Override
449436
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> typeReference) {
450437
return this.responseMono.flatMapMany(
451-
response -> bodyToFlux(response, BodyExtractors.toFlux(typeReference),
438+
response -> bodyToPublisher(response, BodyExtractors.toFlux(typeReference),
452439
this::monoThrowableToFlux));
453440
}
454441

455442
private <T> Flux<T> monoThrowableToFlux(Mono<? extends Throwable> mono) {
456443
return mono.flatMapMany(Flux::error);
457444
}
458445

459-
private <T> Flux<T> bodyToFlux(ClientResponse response,
460-
BodyExtractor<Flux<T>, ? super ClientHttpResponse> extractor,
461-
Function<Mono<? extends Throwable>, Flux<T>> errorFunction) {
446+
private <T extends Publisher<?>> T bodyToPublisher(ClientResponse response,
447+
BodyExtractor<T, ? super ClientHttpResponse> extractor,
448+
Function<Mono<? extends Throwable>, T> errorFunction) {
462449

463450
return this.statusHandlers.stream()
464451
.filter(statusHandler -> statusHandler.test(response.statusCode()))
465452
.findFirst()
466453
.map(statusHandler -> statusHandler.apply(response))
467454
.map(errorFunction::apply)
468-
.orElse(response.body(extractor))
469-
.doAfterTerminate(response::close);
455+
.orElse(response.body(extractor));
470456
}
471457

472458
private static Mono<WebClientResponseException> createResponseException(ClientResponse response) {

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,8 @@ interface RequestHeadersSpec<S extends RequestHeadersSpec<S>> {
491491
* .retrieve()
492492
* .bodyToMono(Pojo.class);
493493
* </pre>
494+
* <p>Since this method reads the response body,
495+
* {@link ClientResponse#close()} should not be called.
494496
* @return spec with options for extracting the response body
495497
*/
496498
ResponseSpec retrieve();

spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientMockTests.java

Lines changed: 1 addition & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,15 @@
22

33
import org.junit.Before;
44
import org.junit.Test;
5-
import reactor.core.publisher.Flux;
65
import reactor.core.publisher.Mono;
76
import reactor.test.StepVerifier;
87

98
import org.springframework.http.HttpHeaders;
109
import org.springframework.http.HttpStatus;
11-
import org.springframework.http.ResponseEntity;
1210
import org.springframework.http.client.reactive.ClientHttpConnector;
1311
import org.springframework.mock.http.client.reactive.test.MockClientHttpResponse;
1412

15-
import static org.junit.Assert.assertEquals;
1613
import static org.junit.Assert.assertFalse;
17-
import static org.junit.Assert.assertTrue;
1814
import static org.mockito.ArgumentMatchers.any;
1915
import static org.mockito.BDDMockito.given;
2016
import static org.mockito.Mockito.mock;
@@ -44,7 +40,7 @@ public void setUp() throws Exception {
4440

4541
@Test
4642
public void shouldDisposeResponseManually() {
47-
Mono<HttpHeaders> headers= this.webClient
43+
Mono<HttpHeaders> headers = this.webClient
4844
.get().uri("/test")
4945
.exchange()
5046
.map(response -> response.headers().asHttpHeaders());
@@ -54,63 +50,4 @@ public void shouldDisposeResponseManually() {
5450
assertFalse(this.response.isClosed());
5551
}
5652

57-
@Test
58-
public void shouldDisposeResponseExchangeMono() {
59-
Mono<String> body = this.webClient
60-
.get().uri("/test")
61-
.exchange()
62-
.flatMap(response -> response.bodyToMono(String.class));
63-
StepVerifier.create(body)
64-
.expectNext("example")
65-
.verifyComplete();
66-
assertTrue(this.response.isClosed());
67-
}
68-
69-
@Test
70-
public void shouldDisposeResponseExchangeFlux() {
71-
Flux<String> body = this.webClient
72-
.get().uri("/test")
73-
.exchange()
74-
.flatMapMany(response -> response.bodyToFlux(String.class));
75-
StepVerifier.create(body)
76-
.expectNext("example")
77-
.verifyComplete();
78-
assertTrue(this.response.isClosed());
79-
}
80-
81-
@Test
82-
public void shouldDisposeResponseExchangeEntity() {
83-
ResponseEntity<String> entity = this.webClient
84-
.get().uri("/test")
85-
.exchange()
86-
.flatMap(response -> response.toEntity(String.class))
87-
.block();
88-
assertEquals("example", entity.getBody());
89-
assertTrue(this.response.isClosed());
90-
}
91-
92-
@Test
93-
public void shouldDisposeResponseRetrieveMono() {
94-
Mono<String> body = this.webClient
95-
.get().uri("/test")
96-
.retrieve()
97-
.bodyToMono(String.class);
98-
StepVerifier.create(body)
99-
.expectNext("example")
100-
.verifyComplete();
101-
assertTrue(this.response.isClosed());
102-
}
103-
104-
@Test
105-
public void shouldDisposeResponseRetrieveFlux() {
106-
Flux<String> body = this.webClient
107-
.get().uri("/test")
108-
.retrieve()
109-
.bodyToFlux(String.class);
110-
StepVerifier.create(body)
111-
.expectNext("example")
112-
.verifyComplete();
113-
assertTrue(this.response.isClosed());
114-
}
115-
11653
}

0 commit comments

Comments
 (0)