Skip to content

Commit 3714d0e

Browse files
Alex Feiginpoutsma
authored andcommitted
Expose future response in new AsyncServerResponse
This commit introduces AsyncServerResponse, an extension of ServerResponse that is returned from ServerResponse.async and that allows users to get the future response by calling the block method. This is particularly useful for testing purposes.
1 parent 227d85a commit 3714d0e

File tree

6 files changed

+277
-147
lines changed

6 files changed

+277
-147
lines changed

spring-webmvc/src/main/java/org/springframework/web/servlet/function/AsyncServerResponse.java

Lines changed: 41 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -16,161 +16,61 @@
1616

1717
package org.springframework.web.servlet.function;
1818

19-
import java.io.IOException;
2019
import java.time.Duration;
2120
import java.util.concurrent.CompletableFuture;
22-
import java.util.concurrent.CompletionException;
23-
import java.util.function.Function;
24-
25-
import javax.servlet.ServletException;
26-
import javax.servlet.http.Cookie;
27-
import javax.servlet.http.HttpServletRequest;
28-
import javax.servlet.http.HttpServletResponse;
2921

3022
import org.reactivestreams.Publisher;
3123

32-
import org.springframework.core.ReactiveAdapter;
3324
import org.springframework.core.ReactiveAdapterRegistry;
34-
import org.springframework.http.HttpHeaders;
35-
import org.springframework.http.HttpStatus;
36-
import org.springframework.lang.Nullable;
37-
import org.springframework.util.Assert;
38-
import org.springframework.util.ClassUtils;
39-
import org.springframework.util.MultiValueMap;
40-
import org.springframework.web.context.request.async.AsyncWebRequest;
41-
import org.springframework.web.context.request.async.DeferredResult;
42-
import org.springframework.web.context.request.async.WebAsyncManager;
43-
import org.springframework.web.context.request.async.WebAsyncUtils;
44-
import org.springframework.web.servlet.ModelAndView;
4525

4626
/**
47-
* Implementation of {@link ServerResponse} based on a {@link CompletableFuture}.
27+
* Asynchronous subtype of {@link ServerResponse} that exposes the future
28+
* response.
4829
*
4930
* @author Arjen Poutsma
50-
* @since 5.3
31+
* @since 5.3.2
5132
* @see ServerResponse#async(Object)
5233
*/
53-
final class AsyncServerResponse extends ErrorHandlingServerResponse {
54-
55-
static final boolean reactiveStreamsPresent = ClassUtils.isPresent(
56-
"org.reactivestreams.Publisher", AsyncServerResponse.class.getClassLoader());
57-
58-
59-
private final CompletableFuture<ServerResponse> futureResponse;
60-
61-
@Nullable
62-
private final Duration timeout;
63-
64-
65-
private AsyncServerResponse(CompletableFuture<ServerResponse> futureResponse, @Nullable Duration timeout) {
66-
this.futureResponse = futureResponse;
67-
this.timeout = timeout;
68-
}
69-
70-
@Override
71-
public HttpStatus statusCode() {
72-
return delegate(ServerResponse::statusCode);
73-
}
74-
75-
@Override
76-
public int rawStatusCode() {
77-
return delegate(ServerResponse::rawStatusCode);
78-
}
79-
80-
@Override
81-
public HttpHeaders headers() {
82-
return delegate(ServerResponse::headers);
83-
}
84-
85-
@Override
86-
public MultiValueMap<String, Cookie> cookies() {
87-
return delegate(ServerResponse::cookies);
34+
public interface AsyncServerResponse extends ServerResponse {
35+
36+
/**
37+
* Blocks indefinitely until the future response is obtained.
38+
*/
39+
ServerResponse block();
40+
41+
42+
// Static creation methods
43+
44+
/**
45+
* Create a {@code AsyncServerResponse} with the given asynchronous response.
46+
* Parameter {@code asyncResponse} can be a
47+
* {@link CompletableFuture CompletableFuture&lt;ServerResponse&gt;} or
48+
* {@link Publisher Publisher&lt;ServerResponse&gt;} (or any
49+
* asynchronous producer of a single {@code ServerResponse} that can be
50+
* adapted via the {@link ReactiveAdapterRegistry}).
51+
* @param asyncResponse a {@code CompletableFuture<ServerResponse>} or
52+
* {@code Publisher<ServerResponse>}
53+
* @return the asynchronous response
54+
*/
55+
static AsyncServerResponse create(Object asyncResponse) {
56+
return DefaultAsyncServerResponse.create(asyncResponse, null);
8857
}
8958

90-
private <R> R delegate(Function<ServerResponse, R> function) {
91-
ServerResponse response = this.futureResponse.getNow(null);
92-
if (response != null) {
93-
return function.apply(response);
94-
}
95-
else {
96-
throw new IllegalStateException("Future ServerResponse has not yet completed");
97-
}
59+
/**
60+
* Create a (built) response with the given asynchronous response.
61+
* Parameter {@code asyncResponse} can be a
62+
* {@link CompletableFuture CompletableFuture&lt;ServerResponse&gt;} or
63+
* {@link Publisher Publisher&lt;ServerResponse&gt;} (or any
64+
* asynchronous producer of a single {@code ServerResponse} that can be
65+
* adapted via the {@link ReactiveAdapterRegistry}).
66+
* @param asyncResponse a {@code CompletableFuture<ServerResponse>} or
67+
* {@code Publisher<ServerResponse>}
68+
* @param timeout maximum time period to wait for before timing out
69+
* @return the asynchronous response
70+
*/
71+
static AsyncServerResponse create(Object asyncResponse, Duration timeout) {
72+
return DefaultAsyncServerResponse.create(asyncResponse, timeout);
9873
}
9974

100-
@Nullable
101-
@Override
102-
public ModelAndView writeTo(HttpServletRequest request, HttpServletResponse response, Context context)
103-
throws ServletException, IOException {
104-
105-
writeAsync(request, response, createDeferredResult());
106-
return null;
107-
}
108-
109-
static void writeAsync(HttpServletRequest request, HttpServletResponse response, DeferredResult<?> deferredResult)
110-
throws ServletException, IOException {
111-
112-
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
113-
AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
114-
asyncManager.setAsyncWebRequest(asyncWebRequest);
115-
try {
116-
asyncManager.startDeferredResultProcessing(deferredResult);
117-
}
118-
catch (IOException | ServletException ex) {
119-
throw ex;
120-
}
121-
catch (Exception ex) {
122-
throw new ServletException("Async processing failed", ex);
123-
}
124-
125-
}
126-
127-
private DeferredResult<ServerResponse> createDeferredResult() {
128-
DeferredResult<ServerResponse> result;
129-
if (this.timeout != null) {
130-
result = new DeferredResult<>(this.timeout.toMillis());
131-
}
132-
else {
133-
result = new DeferredResult<>();
134-
}
135-
this.futureResponse.handle((value, ex) -> {
136-
if (ex != null) {
137-
if (ex instanceof CompletionException && ex.getCause() != null) {
138-
ex = ex.getCause();
139-
}
140-
result.setErrorResult(ex);
141-
}
142-
else {
143-
result.setResult(value);
144-
}
145-
return null;
146-
});
147-
return result;
148-
}
149-
150-
151-
@SuppressWarnings({"unchecked"})
152-
public static ServerResponse create(Object o, @Nullable Duration timeout) {
153-
Assert.notNull(o, "Argument to async must not be null");
154-
155-
if (o instanceof CompletableFuture) {
156-
CompletableFuture<ServerResponse> futureResponse = (CompletableFuture<ServerResponse>) o;
157-
return new AsyncServerResponse(futureResponse, timeout);
158-
}
159-
else if (reactiveStreamsPresent) {
160-
ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();
161-
ReactiveAdapter publisherAdapter = registry.getAdapter(o.getClass());
162-
if (publisherAdapter != null) {
163-
Publisher<ServerResponse> publisher = publisherAdapter.toPublisher(o);
164-
ReactiveAdapter futureAdapter = registry.getAdapter(CompletableFuture.class);
165-
if (futureAdapter != null) {
166-
CompletableFuture<ServerResponse> futureResponse =
167-
(CompletableFuture<ServerResponse>) futureAdapter.fromPublisher(publisher);
168-
return new AsyncServerResponse(futureResponse, timeout);
169-
}
170-
}
171-
}
172-
throw new IllegalArgumentException("Asynchronous type not supported: " + o.getClass());
173-
}
174-
175-
17675
}
76+
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
* Copyright 2002-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.web.servlet.function;
18+
19+
import java.io.IOException;
20+
import java.time.Duration;
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.CompletionException;
23+
import java.util.concurrent.ExecutionException;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.TimeoutException;
26+
import java.util.function.Function;
27+
28+
import javax.servlet.ServletException;
29+
import javax.servlet.http.Cookie;
30+
import javax.servlet.http.HttpServletRequest;
31+
import javax.servlet.http.HttpServletResponse;
32+
33+
import org.reactivestreams.Publisher;
34+
35+
import org.springframework.core.ReactiveAdapter;
36+
import org.springframework.core.ReactiveAdapterRegistry;
37+
import org.springframework.http.HttpHeaders;
38+
import org.springframework.http.HttpStatus;
39+
import org.springframework.lang.Nullable;
40+
import org.springframework.util.Assert;
41+
import org.springframework.util.ClassUtils;
42+
import org.springframework.util.MultiValueMap;
43+
import org.springframework.web.context.request.async.AsyncWebRequest;
44+
import org.springframework.web.context.request.async.DeferredResult;
45+
import org.springframework.web.context.request.async.WebAsyncManager;
46+
import org.springframework.web.context.request.async.WebAsyncUtils;
47+
import org.springframework.web.servlet.ModelAndView;
48+
49+
/**
50+
* Default {@link AsyncServerResponse} implementation.
51+
*
52+
* @author Arjen Poutsma
53+
* @since 5.3.2
54+
*/
55+
final class DefaultAsyncServerResponse extends ErrorHandlingServerResponse implements AsyncServerResponse {
56+
57+
static final boolean reactiveStreamsPresent = ClassUtils.isPresent(
58+
"org.reactivestreams.Publisher", DefaultAsyncServerResponse.class.getClassLoader());
59+
60+
private final CompletableFuture<ServerResponse> futureResponse;
61+
62+
@Nullable
63+
private final Duration timeout;
64+
65+
66+
private DefaultAsyncServerResponse(CompletableFuture<ServerResponse> futureResponse, @Nullable Duration timeout) {
67+
this.futureResponse = futureResponse;
68+
this.timeout = timeout;
69+
}
70+
71+
@Override
72+
public ServerResponse block() {
73+
try {
74+
if (this.timeout != null) {
75+
return this.futureResponse.get(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
76+
}
77+
else {
78+
return this.futureResponse.get();
79+
}
80+
}
81+
catch (InterruptedException | ExecutionException | TimeoutException ex) {
82+
throw new IllegalStateException("Failed to get future response", ex);
83+
}
84+
}
85+
86+
@Override
87+
public HttpStatus statusCode() {
88+
return delegate(ServerResponse::statusCode);
89+
}
90+
91+
@Override
92+
public int rawStatusCode() {
93+
return delegate(ServerResponse::rawStatusCode);
94+
}
95+
96+
@Override
97+
public HttpHeaders headers() {
98+
return delegate(ServerResponse::headers);
99+
}
100+
101+
@Override
102+
public MultiValueMap<String, Cookie> cookies() {
103+
return delegate(ServerResponse::cookies);
104+
}
105+
106+
private <R> R delegate(Function<ServerResponse, R> function) {
107+
ServerResponse response = this.futureResponse.getNow(null);
108+
if (response != null) {
109+
return function.apply(response);
110+
}
111+
else {
112+
throw new IllegalStateException("Future ServerResponse has not yet completed");
113+
}
114+
}
115+
116+
@Nullable
117+
@Override
118+
public ModelAndView writeTo(HttpServletRequest request, HttpServletResponse response, Context context)
119+
throws ServletException, IOException {
120+
121+
writeAsync(request, response, createDeferredResult());
122+
return null;
123+
}
124+
125+
static void writeAsync(HttpServletRequest request, HttpServletResponse response, DeferredResult<?> deferredResult)
126+
throws ServletException, IOException {
127+
128+
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
129+
AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
130+
asyncManager.setAsyncWebRequest(asyncWebRequest);
131+
try {
132+
asyncManager.startDeferredResultProcessing(deferredResult);
133+
}
134+
catch (IOException | ServletException ex) {
135+
throw ex;
136+
}
137+
catch (Exception ex) {
138+
throw new ServletException("Async processing failed", ex);
139+
}
140+
141+
}
142+
143+
private DeferredResult<ServerResponse> createDeferredResult() {
144+
DeferredResult<ServerResponse> result;
145+
if (this.timeout != null) {
146+
result = new DeferredResult<>(this.timeout.toMillis());
147+
}
148+
else {
149+
result = new DeferredResult<>();
150+
}
151+
this.futureResponse.handle((value, ex) -> {
152+
if (ex != null) {
153+
if (ex instanceof CompletionException && ex.getCause() != null) {
154+
ex = ex.getCause();
155+
}
156+
result.setErrorResult(ex);
157+
}
158+
else {
159+
result.setResult(value);
160+
}
161+
return null;
162+
});
163+
return result;
164+
}
165+
166+
@SuppressWarnings({"unchecked"})
167+
public static AsyncServerResponse create(Object o, @Nullable Duration timeout) {
168+
Assert.notNull(o, "Argument to async must not be null");
169+
170+
if (o instanceof CompletableFuture) {
171+
CompletableFuture<ServerResponse> futureResponse = (CompletableFuture<ServerResponse>) o;
172+
return new DefaultAsyncServerResponse(futureResponse, timeout);
173+
}
174+
else if (reactiveStreamsPresent) {
175+
ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();
176+
ReactiveAdapter publisherAdapter = registry.getAdapter(o.getClass());
177+
if (publisherAdapter != null) {
178+
Publisher<ServerResponse> publisher = publisherAdapter.toPublisher(o);
179+
ReactiveAdapter futureAdapter = registry.getAdapter(CompletableFuture.class);
180+
if (futureAdapter != null) {
181+
CompletableFuture<ServerResponse> futureResponse =
182+
(CompletableFuture<ServerResponse>) futureAdapter.fromPublisher(publisher);
183+
return new DefaultAsyncServerResponse(futureResponse, timeout);
184+
}
185+
}
186+
}
187+
throw new IllegalArgumentException("Asynchronous type not supported: " + o.getClass());
188+
}
189+
190+
191+
}

0 commit comments

Comments
 (0)