Skip to content

Commit 8f0ad73

Browse files
committed
Merge pull request #26133 from alexfeigin/master
* gh-26133: Expose future response in new AsyncServerResponse
2 parents 227d85a + 3714d0e commit 8f0ad73

File tree

6 files changed

+277
-147
lines changed

6 files changed

+277
-147
lines changed

spring-webmvc/src/main/java/org/springframework/web/servlet/function/AsyncServerResponse.java

Lines changed: 41 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -16,161 +16,61 @@
1616

1717
package org.springframework.web.servlet.function;
1818

19-
import java.io.IOException;
2019
import java.time.Duration;
2120
import java.util.concurrent.CompletableFuture;
22-
import java.util.concurrent.CompletionException;
23-
import java.util.function.Function;
24-
25-
import javax.servlet.ServletException;
26-
import javax.servlet.http.Cookie;
27-
import javax.servlet.http.HttpServletRequest;
28-
import javax.servlet.http.HttpServletResponse;
2921

3022
import org.reactivestreams.Publisher;
3123

32-
import org.springframework.core.ReactiveAdapter;
3324
import org.springframework.core.ReactiveAdapterRegistry;
34-
import org.springframework.http.HttpHeaders;
35-
import org.springframework.http.HttpStatus;
36-
import org.springframework.lang.Nullable;
37-
import org.springframework.util.Assert;
38-
import org.springframework.util.ClassUtils;
39-
import org.springframework.util.MultiValueMap;
40-
import org.springframework.web.context.request.async.AsyncWebRequest;
41-
import org.springframework.web.context.request.async.DeferredResult;
42-
import org.springframework.web.context.request.async.WebAsyncManager;
43-
import org.springframework.web.context.request.async.WebAsyncUtils;
44-
import org.springframework.web.servlet.ModelAndView;
4525

4626
/**
47-
* Implementation of {@link ServerResponse} based on a {@link CompletableFuture}.
27+
* Asynchronous subtype of {@link ServerResponse} that exposes the future
28+
* response.
4829
*
4930
* @author Arjen Poutsma
50-
* @since 5.3
31+
* @since 5.3.2
5132
* @see ServerResponse#async(Object)
5233
*/
53-
final class AsyncServerResponse extends ErrorHandlingServerResponse {
54-
55-
static final boolean reactiveStreamsPresent = ClassUtils.isPresent(
56-
"org.reactivestreams.Publisher", AsyncServerResponse.class.getClassLoader());
57-
58-
59-
private final CompletableFuture<ServerResponse> futureResponse;
60-
61-
@Nullable
62-
private final Duration timeout;
63-
64-
65-
private AsyncServerResponse(CompletableFuture<ServerResponse> futureResponse, @Nullable Duration timeout) {
66-
this.futureResponse = futureResponse;
67-
this.timeout = timeout;
68-
}
69-
70-
@Override
71-
public HttpStatus statusCode() {
72-
return delegate(ServerResponse::statusCode);
73-
}
74-
75-
@Override
76-
public int rawStatusCode() {
77-
return delegate(ServerResponse::rawStatusCode);
78-
}
79-
80-
@Override
81-
public HttpHeaders headers() {
82-
return delegate(ServerResponse::headers);
83-
}
84-
85-
@Override
86-
public MultiValueMap<String, Cookie> cookies() {
87-
return delegate(ServerResponse::cookies);
34+
public interface AsyncServerResponse extends ServerResponse {
35+
36+
/**
37+
* Blocks indefinitely until the future response is obtained.
38+
*/
39+
ServerResponse block();
40+
41+
42+
// Static creation methods
43+
44+
/**
45+
* Create a {@code AsyncServerResponse} with the given asynchronous response.
46+
* Parameter {@code asyncResponse} can be a
47+
* {@link CompletableFuture CompletableFuture&lt;ServerResponse&gt;} or
48+
* {@link Publisher Publisher&lt;ServerResponse&gt;} (or any
49+
* asynchronous producer of a single {@code ServerResponse} that can be
50+
* adapted via the {@link ReactiveAdapterRegistry}).
51+
* @param asyncResponse a {@code CompletableFuture<ServerResponse>} or
52+
* {@code Publisher<ServerResponse>}
53+
* @return the asynchronous response
54+
*/
55+
static AsyncServerResponse create(Object asyncResponse) {
56+
return DefaultAsyncServerResponse.create(asyncResponse, null);
8857
}
8958

90-
private <R> R delegate(Function<ServerResponse, R> function) {
91-
ServerResponse response = this.futureResponse.getNow(null);
92-
if (response != null) {
93-
return function.apply(response);
94-
}
95-
else {
96-
throw new IllegalStateException("Future ServerResponse has not yet completed");
97-
}
59+
/**
60+
* Create a (built) response with the given asynchronous response.
61+
* Parameter {@code asyncResponse} can be a
62+
* {@link CompletableFuture CompletableFuture&lt;ServerResponse&gt;} or
63+
* {@link Publisher Publisher&lt;ServerResponse&gt;} (or any
64+
* asynchronous producer of a single {@code ServerResponse} that can be
65+
* adapted via the {@link ReactiveAdapterRegistry}).
66+
* @param asyncResponse a {@code CompletableFuture<ServerResponse>} or
67+
* {@code Publisher<ServerResponse>}
68+
* @param timeout maximum time period to wait for before timing out
69+
* @return the asynchronous response
70+
*/
71+
static AsyncServerResponse create(Object asyncResponse, Duration timeout) {
72+
return DefaultAsyncServerResponse.create(asyncResponse, timeout);
9873
}
9974

100-
@Nullable
101-
@Override
102-
public ModelAndView writeTo(HttpServletRequest request, HttpServletResponse response, Context context)
103-
throws ServletException, IOException {
104-
105-
writeAsync(request, response, createDeferredResult());
106-
return null;
107-
}
108-
109-
static void writeAsync(HttpServletRequest request, HttpServletResponse response, DeferredResult<?> deferredResult)
110-
throws ServletException, IOException {
111-
112-
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
113-
AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
114-
asyncManager.setAsyncWebRequest(asyncWebRequest);
115-
try {
116-
asyncManager.startDeferredResultProcessing(deferredResult);
117-
}
118-
catch (IOException | ServletException ex) {
119-
throw ex;
120-
}
121-
catch (Exception ex) {
122-
throw new ServletException("Async processing failed", ex);
123-
}
124-
125-
}
126-
127-
private DeferredResult<ServerResponse> createDeferredResult() {
128-
DeferredResult<ServerResponse> result;
129-
if (this.timeout != null) {
130-
result = new DeferredResult<>(this.timeout.toMillis());
131-
}
132-
else {
133-
result = new DeferredResult<>();
134-
}
135-
this.futureResponse.handle((value, ex) -> {
136-
if (ex != null) {
137-
if (ex instanceof CompletionException && ex.getCause() != null) {
138-
ex = ex.getCause();
139-
}
140-
result.setErrorResult(ex);
141-
}
142-
else {
143-
result.setResult(value);
144-
}
145-
return null;
146-
});
147-
return result;
148-
}
149-
150-
151-
@SuppressWarnings({"unchecked"})
152-
public static ServerResponse create(Object o, @Nullable Duration timeout) {
153-
Assert.notNull(o, "Argument to async must not be null");
154-
155-
if (o instanceof CompletableFuture) {
156-
CompletableFuture<ServerResponse> futureResponse = (CompletableFuture<ServerResponse>) o;
157-
return new AsyncServerResponse(futureResponse, timeout);
158-
}
159-
else if (reactiveStreamsPresent) {
160-
ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();
161-
ReactiveAdapter publisherAdapter = registry.getAdapter(o.getClass());
162-
if (publisherAdapter != null) {
163-
Publisher<ServerResponse> publisher = publisherAdapter.toPublisher(o);
164-
ReactiveAdapter futureAdapter = registry.getAdapter(CompletableFuture.class);
165-
if (futureAdapter != null) {
166-
CompletableFuture<ServerResponse> futureResponse =
167-
(CompletableFuture<ServerResponse>) futureAdapter.fromPublisher(publisher);
168-
return new AsyncServerResponse(futureResponse, timeout);
169-
}
170-
}
171-
}
172-
throw new IllegalArgumentException("Asynchronous type not supported: " + o.getClass());
173-
}
174-
175-
17675
}
76+
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
* Copyright 2002-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.web.servlet.function;
18+
19+
import java.io.IOException;
20+
import java.time.Duration;
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.CompletionException;
23+
import java.util.concurrent.ExecutionException;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.TimeoutException;
26+
import java.util.function.Function;
27+
28+
import javax.servlet.ServletException;
29+
import javax.servlet.http.Cookie;
30+
import javax.servlet.http.HttpServletRequest;
31+
import javax.servlet.http.HttpServletResponse;
32+
33+
import org.reactivestreams.Publisher;
34+
35+
import org.springframework.core.ReactiveAdapter;
36+
import org.springframework.core.ReactiveAdapterRegistry;
37+
import org.springframework.http.HttpHeaders;
38+
import org.springframework.http.HttpStatus;
39+
import org.springframework.lang.Nullable;
40+
import org.springframework.util.Assert;
41+
import org.springframework.util.ClassUtils;
42+
import org.springframework.util.MultiValueMap;
43+
import org.springframework.web.context.request.async.AsyncWebRequest;
44+
import org.springframework.web.context.request.async.DeferredResult;
45+
import org.springframework.web.context.request.async.WebAsyncManager;
46+
import org.springframework.web.context.request.async.WebAsyncUtils;
47+
import org.springframework.web.servlet.ModelAndView;
48+
49+
/**
50+
* Default {@link AsyncServerResponse} implementation.
51+
*
52+
* @author Arjen Poutsma
53+
* @since 5.3.2
54+
*/
55+
final class DefaultAsyncServerResponse extends ErrorHandlingServerResponse implements AsyncServerResponse {
56+
57+
static final boolean reactiveStreamsPresent = ClassUtils.isPresent(
58+
"org.reactivestreams.Publisher", DefaultAsyncServerResponse.class.getClassLoader());
59+
60+
private final CompletableFuture<ServerResponse> futureResponse;
61+
62+
@Nullable
63+
private final Duration timeout;
64+
65+
66+
private DefaultAsyncServerResponse(CompletableFuture<ServerResponse> futureResponse, @Nullable Duration timeout) {
67+
this.futureResponse = futureResponse;
68+
this.timeout = timeout;
69+
}
70+
71+
@Override
72+
public ServerResponse block() {
73+
try {
74+
if (this.timeout != null) {
75+
return this.futureResponse.get(this.timeout.toMillis(), TimeUnit.MILLISECONDS);
76+
}
77+
else {
78+
return this.futureResponse.get();
79+
}
80+
}
81+
catch (InterruptedException | ExecutionException | TimeoutException ex) {
82+
throw new IllegalStateException("Failed to get future response", ex);
83+
}
84+
}
85+
86+
@Override
87+
public HttpStatus statusCode() {
88+
return delegate(ServerResponse::statusCode);
89+
}
90+
91+
@Override
92+
public int rawStatusCode() {
93+
return delegate(ServerResponse::rawStatusCode);
94+
}
95+
96+
@Override
97+
public HttpHeaders headers() {
98+
return delegate(ServerResponse::headers);
99+
}
100+
101+
@Override
102+
public MultiValueMap<String, Cookie> cookies() {
103+
return delegate(ServerResponse::cookies);
104+
}
105+
106+
private <R> R delegate(Function<ServerResponse, R> function) {
107+
ServerResponse response = this.futureResponse.getNow(null);
108+
if (response != null) {
109+
return function.apply(response);
110+
}
111+
else {
112+
throw new IllegalStateException("Future ServerResponse has not yet completed");
113+
}
114+
}
115+
116+
@Nullable
117+
@Override
118+
public ModelAndView writeTo(HttpServletRequest request, HttpServletResponse response, Context context)
119+
throws ServletException, IOException {
120+
121+
writeAsync(request, response, createDeferredResult());
122+
return null;
123+
}
124+
125+
static void writeAsync(HttpServletRequest request, HttpServletResponse response, DeferredResult<?> deferredResult)
126+
throws ServletException, IOException {
127+
128+
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
129+
AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
130+
asyncManager.setAsyncWebRequest(asyncWebRequest);
131+
try {
132+
asyncManager.startDeferredResultProcessing(deferredResult);
133+
}
134+
catch (IOException | ServletException ex) {
135+
throw ex;
136+
}
137+
catch (Exception ex) {
138+
throw new ServletException("Async processing failed", ex);
139+
}
140+
141+
}
142+
143+
private DeferredResult<ServerResponse> createDeferredResult() {
144+
DeferredResult<ServerResponse> result;
145+
if (this.timeout != null) {
146+
result = new DeferredResult<>(this.timeout.toMillis());
147+
}
148+
else {
149+
result = new DeferredResult<>();
150+
}
151+
this.futureResponse.handle((value, ex) -> {
152+
if (ex != null) {
153+
if (ex instanceof CompletionException && ex.getCause() != null) {
154+
ex = ex.getCause();
155+
}
156+
result.setErrorResult(ex);
157+
}
158+
else {
159+
result.setResult(value);
160+
}
161+
return null;
162+
});
163+
return result;
164+
}
165+
166+
@SuppressWarnings({"unchecked"})
167+
public static AsyncServerResponse create(Object o, @Nullable Duration timeout) {
168+
Assert.notNull(o, "Argument to async must not be null");
169+
170+
if (o instanceof CompletableFuture) {
171+
CompletableFuture<ServerResponse> futureResponse = (CompletableFuture<ServerResponse>) o;
172+
return new DefaultAsyncServerResponse(futureResponse, timeout);
173+
}
174+
else if (reactiveStreamsPresent) {
175+
ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();
176+
ReactiveAdapter publisherAdapter = registry.getAdapter(o.getClass());
177+
if (publisherAdapter != null) {
178+
Publisher<ServerResponse> publisher = publisherAdapter.toPublisher(o);
179+
ReactiveAdapter futureAdapter = registry.getAdapter(CompletableFuture.class);
180+
if (futureAdapter != null) {
181+
CompletableFuture<ServerResponse> futureResponse =
182+
(CompletableFuture<ServerResponse>) futureAdapter.fromPublisher(publisher);
183+
return new DefaultAsyncServerResponse(futureResponse, timeout);
184+
}
185+
}
186+
}
187+
throw new IllegalArgumentException("Asynchronous type not supported: " + o.getClass());
188+
}
189+
190+
191+
}

0 commit comments

Comments
 (0)