Skip to content

Commit 8df0bc8

Browse files
committed
Improve access to raw content in WebTestClient
If the content has not been consumed, cause it to be produced, and wait for a certain amount of time before giving up, so the raw content can be made available. This can occur when: 1) In a mock server scenario the Flux representing the client request content is passed directly to the mock server request, but is never consumed because of an error before the body is read. 2) Test obtains FluxExchangeResult (e.g. for streaming) but instead of consuming the Flux, it calls getResponseBodyContent() instead. Issue: SPR-17363
1 parent c567e65 commit 8df0bc8

File tree

9 files changed

+153
-97
lines changed

9 files changed

+153
-97
lines changed

spring-test/src/main/java/org/springframework/test/web/reactive/server/DefaultWebTestClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ private static class DefaultResponseSpec implements ResponseSpec {
300300
DefaultResponseSpec(WiretapConnector.Info wiretapInfo, ClientResponse response,
301301
@Nullable String uriTemplate, Duration timeout) {
302302

303-
this.exchangeResult = wiretapInfo.createExchangeResult(uriTemplate);
303+
this.exchangeResult = wiretapInfo.createExchangeResult(timeout, uriTemplate);
304304
this.response = response;
305305
this.timeout = timeout;
306306
}
@@ -357,13 +357,13 @@ public BodyContentSpec expectBody() {
357357
@Override
358358
public <T> FluxExchangeResult<T> returnResult(Class<T> elementType) {
359359
Flux<T> body = this.response.bodyToFlux(elementType);
360-
return new FluxExchangeResult<>(this.exchangeResult, body, this.timeout);
360+
return new FluxExchangeResult<>(this.exchangeResult, body);
361361
}
362362

363363
@Override
364364
public <T> FluxExchangeResult<T> returnResult(ParameterizedTypeReference<T> elementType) {
365365
Flux<T> body = this.response.bodyToFlux(elementType);
366-
return new FluxExchangeResult<>(this.exchangeResult, body, this.timeout);
366+
return new FluxExchangeResult<>(this.exchangeResult, body);
367367
}
368368
}
369369

spring-test/src/main/java/org/springframework/test/web/reactive/server/ExchangeResult.java

Lines changed: 38 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.util.List;
2525
import java.util.stream.Collectors;
2626

27-
import reactor.core.publisher.MonoProcessor;
27+
import reactor.core.publisher.Mono;
2828

2929
import org.springframework.http.HttpHeaders;
3030
import org.springframework.http.HttpMethod;
@@ -36,7 +36,6 @@
3636
import org.springframework.lang.Nullable;
3737
import org.springframework.util.Assert;
3838
import org.springframework.util.MultiValueMap;
39-
import org.springframework.util.ObjectUtils;
4039

4140
/**
4241
* Container for request and response details for exchanges performed through
@@ -64,9 +63,11 @@ public class ExchangeResult {
6463

6564
private final ClientHttpResponse response;
6665

67-
private final MonoProcessor<byte[]> requestBody;
66+
private final Mono<byte[]> requestBody;
6867

69-
private final MonoProcessor<byte[]> responseBody;
68+
private final Mono<byte[]> responseBody;
69+
70+
private final Duration timeout;
7071

7172
@Nullable
7273
private final String uriTemplate;
@@ -80,11 +81,11 @@ public class ExchangeResult {
8081
* @param response the HTTP response
8182
* @param requestBody capture of serialized request body content
8283
* @param responseBody capture of serialized response body content
84+
* @param timeout how long to wait for content to materialize
8385
* @param uriTemplate the URI template used to set up the request, if any
8486
*/
8587
ExchangeResult(ClientHttpRequest request, ClientHttpResponse response,
86-
MonoProcessor<byte[]> requestBody, MonoProcessor<byte[]> responseBody,
87-
@Nullable String uriTemplate) {
88+
Mono<byte[]> requestBody, Mono<byte[]> responseBody, Duration timeout, @Nullable String uriTemplate) {
8889

8990
Assert.notNull(request, "ClientHttpRequest is required");
9091
Assert.notNull(response, "ClientHttpResponse is required");
@@ -95,6 +96,7 @@ public class ExchangeResult {
9596
this.response = response;
9697
this.requestBody = requestBody;
9798
this.responseBody = responseBody;
99+
this.timeout = timeout;
98100
this.uriTemplate = uriTemplate;
99101
}
100102

@@ -106,6 +108,7 @@ public class ExchangeResult {
106108
this.response = other.response;
107109
this.requestBody = other.requestBody;
108110
this.responseBody = other.responseBody;
111+
this.timeout = other.timeout;
109112
this.uriTemplate = other.uriTemplate;
110113
}
111114

@@ -140,14 +143,14 @@ public HttpHeaders getRequestHeaders() {
140143
}
141144

142145
/**
143-
* Return the raw request body content written as a {@code byte[]}.
144-
* @throws IllegalStateException if the request body is not fully written yet.
146+
* Return the raw request body content written through the request.
147+
* <p><strong>Note:</strong> If the request content has not been consumed
148+
* for any reason yet, use of this method will trigger consumption.
149+
* @throws IllegalStateException if the request body is not been fully written.
145150
*/
146151
@Nullable
147152
public byte[] getRequestBodyContent() {
148-
MonoProcessor<byte[]> body = this.requestBody;
149-
Assert.isTrue(body.isTerminated(), "Request body incomplete.");
150-
return body.block(Duration.ZERO);
153+
return this.requestBody.block(this.timeout);
151154
}
152155

153156

@@ -173,14 +176,14 @@ public MultiValueMap<String, ResponseCookie> getResponseCookies() {
173176
}
174177

175178
/**
176-
* Return the raw request body content written as a {@code byte[]}.
177-
* @throws IllegalStateException if the response is not fully read yet.
179+
* Return the raw request body content written to the response.
180+
* <p><strong>Note:</strong> If the response content has not been consumed
181+
* yet, use of this method will trigger consumption.
182+
* @throws IllegalStateException if the response is not been fully read.
178183
*/
179184
@Nullable
180185
public byte[] getResponseBodyContent() {
181-
MonoProcessor<byte[]> body = this.responseBody;
182-
Assert.state(body.isTerminated(), "Response body incomplete");
183-
return body.block(Duration.ZERO);
186+
return this.responseBody.block(this.timeout);
184187
}
185188

186189

@@ -223,30 +226,25 @@ private String formatHeaders(HttpHeaders headers, String delimiter) {
223226
.collect(Collectors.joining(delimiter));
224227
}
225228

226-
private String formatBody(@Nullable MediaType contentType, MonoProcessor<byte[]> body) {
227-
if (body.isSuccess()) {
228-
byte[] bytes = body.block(Duration.ZERO);
229-
if (ObjectUtils.isEmpty(bytes)) {
230-
return "No content";
231-
}
232-
if (contentType == null) {
233-
return "Unknown content type (" + bytes.length + " bytes)";
234-
}
235-
Charset charset = contentType.getCharset();
236-
if (charset != null) {
237-
return new String(bytes, charset);
238-
}
239-
if (PRINTABLE_MEDIA_TYPES.stream().anyMatch(contentType::isCompatibleWith)) {
240-
return new String(bytes, StandardCharsets.UTF_8);
241-
}
242-
return "Unknown charset (" + bytes.length + " bytes)";
243-
}
244-
else if (body.isError()) {
245-
return "I/O failure: " + body.getError();
246-
}
247-
else {
248-
return "Content not available yet";
249-
}
229+
@Nullable
230+
private String formatBody(@Nullable MediaType contentType, Mono<byte[]> body) {
231+
return body
232+
.map(bytes -> {
233+
if (contentType == null) {
234+
return "Unknown content type (" + bytes.length + " bytes)";
235+
}
236+
Charset charset = contentType.getCharset();
237+
if (charset != null) {
238+
return new String(bytes, charset);
239+
}
240+
if (PRINTABLE_MEDIA_TYPES.stream().anyMatch(contentType::isCompatibleWith)) {
241+
return new String(bytes, StandardCharsets.UTF_8);
242+
}
243+
return "Unknown charset (" + bytes.length + " bytes)";
244+
})
245+
.defaultIfEmpty("No content")
246+
.onErrorResume(ex -> Mono.just("Failed to obtain content: " + ex.getMessage()))
247+
.block(this.timeout);
250248
}
251249

252250
}

spring-test/src/main/java/org/springframework/test/web/reactive/server/FluxExchangeResult.java

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,9 @@
1616

1717
package org.springframework.test.web.reactive.server;
1818

19-
import java.time.Duration;
2019
import java.util.function.Consumer;
2120

2221
import reactor.core.publisher.Flux;
23-
import reactor.core.publisher.Mono;
24-
25-
import org.springframework.lang.Nullable;
2622

2723
/**
2824
* {@code ExchangeResult} variant with the response body decoded as
@@ -35,20 +31,12 @@
3531
*/
3632
public class FluxExchangeResult<T> extends ExchangeResult {
3733

38-
private static final IllegalStateException TIMEOUT_ERROR =
39-
new IllegalStateException("Response timeout: for infinite streams " +
40-
"use getResponseBody() first with explicit cancellation, e.g. via take(n).");
41-
42-
4334
private final Flux<T> body;
4435

45-
private final Duration timeout;
46-
4736

48-
FluxExchangeResult(ExchangeResult result, Flux<T> body, Duration timeout) {
37+
FluxExchangeResult(ExchangeResult result, Flux<T> body) {
4938
super(result);
5039
this.body = body;
51-
this.timeout = timeout;
5240
}
5341

5442

@@ -81,22 +69,6 @@ public Flux<T> getResponseBody() {
8169
return this.body;
8270
}
8371

84-
/**
85-
* {@inheritDoc}
86-
* <p><strong>Note:</strong> this method should typically be called after
87-
* the response has been consumed in full via {@link #getResponseBody()}.
88-
* Calling it first will cause the response {@code Flux<T>} to be consumed
89-
* via {@code getResponseBody.ignoreElements()}.
90-
*/
91-
@Override
92-
@Nullable
93-
public byte[] getResponseBodyContent() {
94-
return this.body.ignoreElements()
95-
.timeout(this.timeout, Mono.error(TIMEOUT_ERROR))
96-
.then(Mono.defer(() -> Mono.justOrEmpty(super.getResponseBodyContent())))
97-
.block();
98-
}
99-
10072
/**
10173
* Invoke the given consumer within {@link #assertWithDiagnostics(Runnable)}
10274
* passing {@code "this"} instance to it. This method allows the following,

spring-test/src/main/java/org/springframework/test/web/reactive/server/WiretapConnector.java

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717
package org.springframework.test.web.reactive.server;
1818

1919
import java.net.URI;
20+
import java.time.Duration;
2021
import java.util.Map;
2122
import java.util.concurrent.ConcurrentHashMap;
2223
import java.util.concurrent.atomic.AtomicReference;
2324
import java.util.function.Function;
2425

2526
import org.reactivestreams.Publisher;
27+
import org.reactivestreams.Subscription;
2628
import reactor.core.publisher.Flux;
2729
import reactor.core.publisher.Mono;
2830
import reactor.core.publisher.MonoProcessor;
@@ -112,9 +114,11 @@ public Info(WiretapClientHttpRequest request, WiretapClientHttpResponse response
112114
}
113115

114116

115-
public ExchangeResult createExchangeResult(@Nullable String uriTemplate) {
117+
public ExchangeResult createExchangeResult(Duration timeout, @Nullable String uriTemplate) {
116118
return new ExchangeResult(this.request, this.response,
117-
this.request.getRecorder().getContent(), this.response.getRecorder().getContent(), uriTemplate);
119+
Mono.defer(() -> this.request.getRecorder().getContent()),
120+
Mono.defer(() -> this.response.getRecorder().getContent()),
121+
timeout, uriTemplate);
118122
}
119123
}
120124

@@ -126,21 +130,21 @@ final static class WiretapRecorder {
126130

127131
private static final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
128132

129-
public static final byte[] EMPTY_CONTENT = new byte[0];
130-
131133

132134
@Nullable
133-
private final Publisher<? extends DataBuffer> publisher;
135+
private final Flux<? extends DataBuffer> publisher;
134136

135137
@Nullable
136-
private final Publisher<? extends Publisher<? extends DataBuffer>> publisherNested;
138+
private final Flux<? extends Publisher<? extends DataBuffer>> publisherNested;
137139

138140
private final DataBuffer buffer;
139141

140142
private final MonoProcessor<byte[]> content;
141143

144+
private volatile boolean subscriberRegistered;
145+
142146

143-
private WiretapRecorder(@Nullable Publisher<? extends DataBuffer> publisher,
147+
public WiretapRecorder(@Nullable Publisher<? extends DataBuffer> publisher,
144148
@Nullable Publisher<? extends Publisher<? extends DataBuffer>> publisherNested) {
145149

146150
if (publisher != null && publisherNested != null) {
@@ -149,24 +153,22 @@ private WiretapRecorder(@Nullable Publisher<? extends DataBuffer> publisher,
149153

150154
this.publisher = publisher != null ?
151155
Flux.from(publisher)
156+
.doOnSubscribe(this::handleOnSubscribe)
152157
.doOnNext(this::handleOnNext)
153158
.doOnError(this::handleOnError)
154159
.doOnCancel(this::handleOnComplete)
155160
.doOnComplete(this::handleOnComplete) : null;
156161

157162
this.publisherNested = publisherNested != null ?
158163
Flux.from(publisherNested)
164+
.doOnSubscribe(this::handleOnSubscribe)
159165
.map(p -> Flux.from(p).doOnNext(this::handleOnNext).doOnError(this::handleOnError))
160166
.doOnError(this::handleOnError)
161167
.doOnCancel(this::handleOnComplete)
162168
.doOnComplete(this::handleOnComplete) : null;
163169

164170
this.buffer = bufferFactory.allocateBuffer();
165171
this.content = MonoProcessor.create();
166-
167-
if (this.publisher == null && this.publisherNested == null) {
168-
this.content.onNext(EMPTY_CONTENT);
169-
}
170172
}
171173

172174

@@ -180,11 +182,36 @@ public Publisher<? extends Publisher<? extends DataBuffer>> getNestedPublisherTo
180182
return this.publisherNested;
181183
}
182184

183-
public MonoProcessor<byte[]> getContent() {
184-
return this.content;
185+
public Mono<byte[]> getContent() {
186+
// No publisher (e.g. request#setComplete)
187+
if (this.publisher == null && this.publisherNested == null) {
188+
return Mono.empty();
189+
}
190+
if (this.content.isTerminated()) {
191+
return this.content;
192+
}
193+
if (this.subscriberRegistered) {
194+
return Mono.error(new IllegalStateException(
195+
"Subscriber registered but content is not yet fully consumed."));
196+
}
197+
else {
198+
// No subscriber, e.g.:
199+
// - mock server request body never consumed (error before read)
200+
// - FluxExchangeResult#getResponseBodyContent called
201+
(this.publisher != null ? this.publisher : this.publisherNested)
202+
.onErrorMap(ex -> new IllegalStateException(
203+
"Content was not been consumed and " +
204+
"an error was raised on attempt to produce it:", ex))
205+
.subscribe();
206+
return this.content;
207+
}
185208
}
186209

187210

211+
private void handleOnSubscribe(Subscription subscription) {
212+
this.subscriberRegistered = true;
213+
}
214+
188215
private void handleOnNext(DataBuffer nextBuffer) {
189216
this.buffer.write(nextBuffer);
190217
}

spring-test/src/test/java/org/springframework/test/web/reactive/server/HeaderAssertionTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.test.web.reactive.server;
1818

1919
import java.net.URI;
20+
import java.time.Duration;
2021
import java.time.ZoneId;
2122
import java.time.ZonedDateTime;
2223
import java.util.concurrent.TimeUnit;
@@ -257,7 +258,7 @@ private HeaderAssertions headerAssertions(HttpHeaders responseHeaders) {
257258
MonoProcessor<byte[]> emptyContent = MonoProcessor.create();
258259
emptyContent.onComplete();
259260

260-
ExchangeResult result = new ExchangeResult(request, response, emptyContent, emptyContent, null);
261+
ExchangeResult result = new ExchangeResult(request, response, emptyContent, emptyContent, Duration.ZERO, null);
261262
return new HeaderAssertions(result, mock(WebTestClient.ResponseSpec.class));
262263
}
263264

0 commit comments

Comments
 (0)