Skip to content
This repository was archived by the owner on May 28, 2018. It is now read-only.

Commit 65aa8f1

Browse files
committed
Support for returning CompletionStage from a resource method.
Change-Id: I6c8ed84cce803559bd4cfe40da411d8784671380
1 parent da42801 commit 65aa8f1

File tree

6 files changed

+301
-13
lines changed

6 files changed

+301
-13
lines changed

core-server/src/main/java/org/glassfish/jersey/server/ServerRuntime.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -854,6 +854,11 @@ public void run() {
854854
});
855855
}
856856

857+
@Override
858+
public void invokeManaged(Runnable runnable) {
859+
responder.runtime.managedAsyncExecutor.get().submit(runnable);
860+
}
861+
857862
@Override
858863
public boolean suspend() {
859864
synchronized (stateLock) {

core-server/src/main/java/org/glassfish/jersey/server/internal/process/AsyncContext.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
33
*
4-
* Copyright (c) 2012-2014 Oracle and/or its affiliates. All rights reserved.
4+
* Copyright (c) 2012-2017 Oracle and/or its affiliates. All rights reserved.
55
*
66
* The contents of this file are subject to the terms of either the GNU
77
* General Public License Version 2 only ("GPL") or the Common Development
@@ -97,4 +97,11 @@ public static enum State {
9797
* @param producer response producer.
9898
*/
9999
public void invokeManaged(Producer<Response> producer);
100+
101+
/**
102+
* Invoke the provided runnable in a Jersey-managed asynchronous thread.
103+
*
104+
* @param runnable to be invoked.
105+
*/
106+
public void invokeManaged(Runnable runnable);
100107
}

core-server/src/main/java/org/glassfish/jersey/server/model/ResourceMethodInvoker.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@
5252
import java.util.List;
5353
import java.util.Map;
5454
import java.util.Set;
55+
import java.util.concurrent.CancellationException;
56+
import java.util.concurrent.CompletableFuture;
57+
import java.util.concurrent.CompletionStage;
58+
import java.util.function.BiConsumer;
5559
import java.util.function.Supplier;
5660
import java.util.stream.Collectors;
5761
import java.util.stream.StreamSupport;
@@ -114,7 +118,7 @@ public class ResourceMethodInvoker implements Endpoint, ResourceInfo {
114118

115119
/**
116120
* Resource method invoker helper.
117-
*
121+
* <p>
118122
* The builder API provides means for constructing a properly initialized
119123
* {@link ResourceMethodInvoker resource method invoker} instances.
120124
*/
@@ -409,10 +413,46 @@ public ContainerResponse apply(final RequestProcessingContext processingContext)
409413
return null; // return null on current thread
410414
} else {
411415
// TODO replace with processing context factory method.
412-
return new ContainerResponse(request, invoke(processingContext, resource));
416+
Response response = invoke(processingContext, resource);
417+
418+
if (response.hasEntity()) {
419+
Object entityFuture = response.getEntity();
420+
if (entityFuture instanceof CompletionStage) {
421+
CompletableFuture completableFuture = ((CompletionStage) entityFuture).toCompletableFuture();
422+
423+
// suspend - we know that this feature is not done, see AbstractJavaResourceMethodDispatcher#invoke
424+
if (!processingContext.asyncContext().suspend()) {
425+
throw new ProcessingException(LocalizationMessages.ERROR_SUSPENDING_ASYNC_REQUEST());
426+
}
427+
428+
// wait for a response
429+
completableFuture.whenCompleteAsync(
430+
whenComplete(processingContext),
431+
command -> processingContext.asyncContext().invokeManaged(command));
432+
433+
return null; // return null on the current thread
434+
}
435+
}
436+
437+
return new ContainerResponse(request, response);
413438
}
414439
}
415440

441+
private BiConsumer whenComplete(RequestProcessingContext processingContext) {
442+
return (entity, exception) -> {
443+
444+
if (exception != null) {
445+
if (exception instanceof CancellationException) {
446+
processingContext.asyncContext().resume(Response.status(Response.Status.SERVICE_UNAVAILABLE).build());
447+
} else {
448+
processingContext.asyncContext().resume(((Throwable) exception));
449+
}
450+
} else {
451+
processingContext.asyncContext().resume(entity);
452+
}
453+
};
454+
}
455+
416456
private Response invoke(final RequestProcessingContext context, final Object resource) {
417457

418458
Response jaxrsResponse;

core-server/src/main/java/org/glassfish/jersey/server/model/internal/AbstractJavaResourceMethodDispatcher.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
33
*
4-
* Copyright (c) 2011-2015 Oracle and/or its affiliates. All rights reserved.
4+
* Copyright (c) 2011-2017 Oracle and/or its affiliates. All rights reserved.
55
*
66
* The contents of this file are subject to the terms of either the GNU
77
* General Public License Version 2 only ("GPL") or the Common Development
@@ -44,6 +44,9 @@
4444
import java.lang.reflect.Method;
4545
import java.lang.reflect.UndeclaredThrowableException;
4646
import java.security.PrivilegedAction;
47+
import java.util.concurrent.CompletableFuture;
48+
import java.util.concurrent.CompletionStage;
49+
import java.util.concurrent.ExecutionException;
4750

4851
import javax.ws.rs.ProcessingException;
4952
import javax.ws.rs.WebApplicationException;
@@ -141,7 +144,26 @@ public Object run() {
141144
final long timestamp = tracingLogger.timestamp(ServerTraceEvent.METHOD_INVOKE);
142145
try {
143146

144-
return methodHandler.invoke(resource, method, args);
147+
Object result = methodHandler.invoke(resource, method, args);
148+
149+
// if a response is a CompletionStage and is done, we don't need to suspend and resume
150+
if (result instanceof CompletionStage) {
151+
CompletableFuture resultFuture = ((CompletionStage) result).toCompletableFuture();
152+
153+
if (resultFuture.isDone()) {
154+
if (resultFuture.isCancelled()) {
155+
return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
156+
} else {
157+
try {
158+
return resultFuture.get();
159+
} catch (ExecutionException e) {
160+
throw new InvocationTargetException(e.getCause());
161+
}
162+
}
163+
}
164+
}
165+
166+
return result;
145167

146168
} catch (IllegalAccessException | IllegalArgumentException | UndeclaredThrowableException ex) {
147169
throw new ProcessingException(LocalizationMessages.ERROR_RESOURCE_JAVA_METHOD_INVOCATION(), ex);

core-server/src/main/java/org/glassfish/jersey/server/model/internal/JavaResourceMethodDispatcherProvider.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,6 @@ protected Response doDispatch(final Object resource, final ContainerRequest cont
209209

210210
if (o instanceof Response) {
211211
return Response.class.cast(o);
212-
// } else if (o instanceof JResponse) {
213-
// context.getResponseContext().setResponse(((JResponse)o).toResponse());
214212
} else if (o != null) {
215213
return Response.ok().entity(o).build();
216214
} else {
@@ -236,12 +234,10 @@ private static final class TypeOutInvoker extends AbstractMethodParamInvoker {
236234
protected Response doDispatch(final Object resource, final ContainerRequest containerRequest) throws ProcessingException {
237235
final Object o = invoke(containerRequest, resource, getParamValues(containerRequest));
238236
if (o != null) {
239-
240-
Response response = Response.ok().entity(o).build();
241-
// TODO set the method return Java type to the proper context.
242-
// Response r = new ResponseBuilderImpl().
243-
// entityWithType(o, t).status(200).build();
244-
return response;
237+
if (o instanceof Response) {
238+
return Response.class.cast(o);
239+
}
240+
return Response.ok().entity(o).build();
245241
} else {
246242
return Response.noContent().build();
247243
}
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
/*
2+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
3+
*
4+
* Copyright (c) 2017 Oracle and/or its affiliates. All rights reserved.
5+
*
6+
* The contents of this file are subject to the terms of either the GNU
7+
* General Public License Version 2 only ("GPL") or the Common Development
8+
* and Distribution License("CDDL") (collectively, the "License"). You
9+
* may not use this file except in compliance with the License. You can
10+
* obtain a copy of the License at
11+
* http://glassfish.java.net/public/CDDL+GPL_1_1.html
12+
* or packager/legal/LICENSE.txt. See the License for the specific
13+
* language governing permissions and limitations under the License.
14+
*
15+
* When distributing the software, include this License Header Notice in each
16+
* file and include the License file at packager/legal/LICENSE.txt.
17+
*
18+
* GPL Classpath Exception:
19+
* Oracle designates this particular file as subject to the "Classpath"
20+
* exception as provided by Oracle in the GPL Version 2 section of the License
21+
* file that accompanied this code.
22+
*
23+
* Modifications:
24+
* If applicable, add the following below the License Header, with the fields
25+
* enclosed by brackets [] replaced by your own identifying information:
26+
* "Portions Copyright [year] [name of copyright owner]"
27+
*
28+
* Contributor(s):
29+
* If you wish your version of this file to be governed by only the CDDL or
30+
* only the GPL Version 2, indicate your decision by adding "[Contributor]
31+
* elects to include this software in this distribution under the [CDDL or GPL
32+
* Version 2] license." If you don't indicate a single choice of license, a
33+
* recipient has the option to distribute your version of this file under
34+
* either the CDDL, the GPL Version 2 or to extend the choice of license to
35+
* its licensees as provided above. However, if you add GPL Version 2 code
36+
* and therefore, elected the GPL Version 2 license, then the option applies
37+
* only if the new code is made subject to such option by the copyright
38+
* holder.
39+
*/
40+
41+
package org.glassfish.jersey.tests.e2e.server;
42+
43+
import java.util.concurrent.CompletableFuture;
44+
import java.util.concurrent.CompletionStage;
45+
import java.util.concurrent.ExecutorService;
46+
import java.util.concurrent.Executors;
47+
48+
import javax.ws.rs.GET;
49+
import javax.ws.rs.Path;
50+
import javax.ws.rs.WebApplicationException;
51+
import javax.ws.rs.core.Application;
52+
import javax.ws.rs.core.Response;
53+
54+
import org.glassfish.jersey.server.ResourceConfig;
55+
import org.glassfish.jersey.test.JerseyTest;
56+
57+
import org.junit.Test;
58+
import static org.hamcrest.CoreMatchers.is;
59+
import static org.junit.Assert.assertThat;
60+
61+
/**
62+
* @author Pavel Bucek (pavel.bucek at oracle.com)
63+
*/
64+
public class CompletionStageTest extends JerseyTest {
65+
66+
static final String ENTITY = "entity";
67+
// delay of async operations in seconds.
68+
static final int DELAY = 1;
69+
70+
@Override
71+
protected Application configure() {
72+
return new ResourceConfig(CompletionStageResource.class);
73+
}
74+
75+
@Test
76+
public void testGetCompleted() {
77+
Response response = target("cs/completed").request().get();
78+
79+
assertThat(response.getStatus(), is(200));
80+
assertThat(response.readEntity(String.class), is(ENTITY));
81+
}
82+
83+
@Test
84+
public void testGetException400() {
85+
Response response = target("cs/exception400").request().get();
86+
87+
assertThat(response.getStatus(), is(400));
88+
}
89+
90+
@Test
91+
public void testGetException405() {
92+
Response response = target("cs/exception405").request().get();
93+
94+
assertThat(response.getStatus(), is(405));
95+
}
96+
97+
@Test
98+
public void testGetCancelled() {
99+
Response response = target("cs/cancelled").request().get();
100+
101+
assertThat(response.getStatus(), is(503));
102+
}
103+
104+
@Test
105+
public void testGetCompletedAsync() {
106+
Response response = target("cs/completedAsync").request().get();
107+
108+
assertThat(response.getStatus(), is(200));
109+
assertThat(response.readEntity(String.class), is(ENTITY));
110+
}
111+
112+
@Test
113+
public void testGetException400Async() {
114+
Response response = target("cs/exception400Async").request().get();
115+
116+
assertThat(response.getStatus(), is(400));
117+
}
118+
119+
@Test
120+
public void testGetException405Async() {
121+
Response response = target("cs/exception405Async").request().get();
122+
123+
assertThat(response.getStatus(), is(405));
124+
}
125+
126+
@Test
127+
public void testGetCancelledAsync() {
128+
Response response = target("cs/cancelledAsync").request().get();
129+
130+
assertThat(response.getStatus(), is(503));
131+
}
132+
133+
@Path("/cs")
134+
public static class CompletionStageResource {
135+
136+
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
137+
138+
@GET
139+
@Path("/completed")
140+
public CompletionStage<String> getCompleted() {
141+
return CompletableFuture.completedFuture(ENTITY);
142+
}
143+
144+
@GET
145+
@Path("/exception400")
146+
public CompletionStage<String> getException400() {
147+
CompletableFuture<String> cs = new CompletableFuture<>();
148+
cs.completeExceptionally(new WebApplicationException(400));
149+
150+
return cs;
151+
}
152+
153+
@GET
154+
@Path("/exception405")
155+
public CompletionStage<String> getException405() {
156+
CompletableFuture<String> cs = new CompletableFuture<>();
157+
cs.completeExceptionally(new WebApplicationException(405));
158+
159+
return cs;
160+
}
161+
162+
@GET
163+
@Path("/cancelled")
164+
public CompletionStage<String> getCancelled() {
165+
CompletableFuture<String> cs = new CompletableFuture<>();
166+
cs.cancel(true);
167+
168+
return cs;
169+
}
170+
171+
@GET
172+
@Path("/completedAsync")
173+
public CompletionStage<String> getCompletedAsync() {
174+
CompletableFuture<String> cs = new CompletableFuture<>();
175+
delaySubmit(() -> cs.complete(ENTITY));
176+
return cs;
177+
}
178+
179+
@GET
180+
@Path("/exception400Async")
181+
public CompletionStage<String> getException400Async() {
182+
CompletableFuture<String> cs = new CompletableFuture<>();
183+
delaySubmit(() -> cs.completeExceptionally(new WebApplicationException(400)));
184+
185+
return cs;
186+
}
187+
188+
@GET
189+
@Path("/exception405Async")
190+
public CompletionStage<String> getException405Async() {
191+
CompletableFuture<String> cs = new CompletableFuture<>();
192+
delaySubmit(() -> cs.completeExceptionally(new WebApplicationException(405)));
193+
194+
return cs;
195+
}
196+
197+
@GET
198+
@Path("/cancelledAsync")
199+
public CompletionStage<String> getCancelledAsync() {
200+
CompletableFuture<String> cs = new CompletableFuture<>();
201+
delaySubmit(() -> cs.cancel(true));
202+
203+
return cs;
204+
}
205+
206+
private void delaySubmit(Runnable runnable) {
207+
EXECUTOR_SERVICE.submit(() -> {
208+
try {
209+
Thread.sleep(DELAY * 1000);
210+
} catch (InterruptedException e) {
211+
// ignore
212+
}
213+
214+
runnable.run();
215+
});
216+
}
217+
}
218+
}

0 commit comments

Comments
 (0)