Skip to content
This repository was archived by the owner on May 28, 2018. It is now read-only.

Commit aa46795

Browse files
committed
CompletionStage returned from the resource methods might not support #toCompletableFuture.
Change-Id: I30b4e57c44979df35a643a214b14e8572a84b517
1 parent 324cfb0 commit aa46795

File tree

4 files changed

+269
-6
lines changed

4 files changed

+269
-6
lines changed

core-client/src/main/java/org/glassfish/jersey/client/JerseyCompletionStageRxInvoker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
*/
5858
public class JerseyCompletionStageRxInvoker extends AbstractRxInvoker<CompletionStage> implements CompletionStageRxInvoker {
5959

60-
public JerseyCompletionStageRxInvoker(Invocation.Builder builder, ExecutorService executor) {
60+
JerseyCompletionStageRxInvoker(Invocation.Builder builder, ExecutorService executor) {
6161
super(builder, executor);
6262
}
6363

core-server/src/main/java/org/glassfish/jersey/server/model/ResourceMethodInvoker.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import java.util.Map;
5454
import java.util.Set;
5555
import java.util.concurrent.CancellationException;
56-
import java.util.concurrent.CompletableFuture;
5756
import java.util.concurrent.CompletionStage;
5857
import java.util.function.BiConsumer;
5958
import java.util.function.Supplier;
@@ -418,15 +417,15 @@ public ContainerResponse apply(final RequestProcessingContext processingContext)
418417
if (response.hasEntity()) {
419418
Object entityFuture = response.getEntity();
420419
if (entityFuture instanceof CompletionStage) {
421-
CompletableFuture completableFuture = ((CompletionStage) entityFuture).toCompletableFuture();
420+
CompletionStage completionStage = ((CompletionStage) entityFuture);
422421

423422
// suspend - we know that this feature is not done, see AbstractJavaResourceMethodDispatcher#invoke
424423
if (!processingContext.asyncContext().suspend()) {
425424
throw new ProcessingException(LocalizationMessages.ERROR_SUSPENDING_ASYNC_REQUEST());
426425
}
427426

428427
// wait for a response
429-
completableFuture.whenComplete(whenComplete(processingContext));
428+
completionStage.whenComplete(whenComplete(processingContext));
430429

431430
return null; // return null on the current thread
432431
}

core-server/src/main/java/org/glassfish/jersey/server/model/internal/AbstractJavaResourceMethodDispatcher.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,16 @@ public Object run() {
148148

149149
// if a response is a CompletionStage and is done, we don't need to suspend and resume
150150
if (result instanceof CompletionStage) {
151-
CompletableFuture resultFuture = ((CompletionStage) result).toCompletableFuture();
151+
CompletableFuture resultFuture;
152+
try {
153+
resultFuture = ((CompletionStage) result).toCompletableFuture();
154+
} catch (UnsupportedOperationException e) {
155+
// CompletionStage is not required to implement "toCompletableFuture". If it doesn't
156+
// we treat it as "uncompleted" future.
157+
return result;
158+
}
152159

153-
if (resultFuture.isDone()) {
160+
if (resultFuture != null && resultFuture.isDone()) {
154161
if (resultFuture.isCancelled()) {
155162
return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
156163
} else {

tests/e2e-server/src/test/java/org/glassfish/jersey/tests/e2e/server/CompletionStageTest.java

Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,13 @@
4242

4343
import java.util.concurrent.CompletableFuture;
4444
import java.util.concurrent.CompletionStage;
45+
import java.util.concurrent.Executor;
4546
import java.util.concurrent.ExecutorService;
4647
import java.util.concurrent.Executors;
48+
import java.util.function.BiConsumer;
49+
import java.util.function.BiFunction;
50+
import java.util.function.Consumer;
51+
import java.util.function.Function;
4752

4853
import javax.ws.rs.GET;
4954
import javax.ws.rs.Path;
@@ -130,6 +135,22 @@ public void testGetCancelledAsync() {
130135
assertThat(response.getStatus(), is(503));
131136
}
132137

138+
@Test
139+
public void testGetCustomCompleted() {
140+
Response response = target("cs/custom").request().get();
141+
142+
assertThat(response.getStatus(), is(200));
143+
assertThat(response.readEntity(String.class), is(ENTITY));
144+
}
145+
146+
@Test
147+
public void testGetCustomAsync() {
148+
Response response = target("cs/customAsync").request().get();
149+
150+
assertThat(response.getStatus(), is(200));
151+
assertThat(response.readEntity(String.class), is(ENTITY));
152+
}
153+
133154
@Path("/cs")
134155
public static class CompletionStageResource {
135156

@@ -203,6 +224,32 @@ public CompletionStage<String> getCancelledAsync() {
203224
return cs;
204225
}
205226

227+
/**
228+
* Return completed CompletionStage which doesn't support #toCompletableFuture().
229+
*
230+
* @return CompletionStage which doesn't support #toCompletableFuture().
231+
*/
232+
@GET
233+
@Path("/custom")
234+
public CompletionStage<String> getCustomCompletionStage() {
235+
return new CustomCompletionStage<>(CompletableFuture.completedFuture(ENTITY));
236+
}
237+
238+
/**
239+
* Return uncompleted CompletionStage which doesn't support #toCompletableFuture().
240+
*
241+
* @return CompletionStage which doesn't support #toCompletableFuture().
242+
*/
243+
@GET
244+
@Path("/customAsync")
245+
public CompletionStage<String> getCustomCompletionStageAsync() {
246+
CompletableFuture<String> cf = new CompletableFuture<>();
247+
CustomCompletionStage<String> cs = new CustomCompletionStage<>(cf);
248+
delaySubmit(() -> cf.complete(ENTITY));
249+
250+
return cs;
251+
}
252+
206253
private void delaySubmit(Runnable runnable) {
207254
EXECUTOR_SERVICE.submit(() -> {
208255
try {
@@ -215,4 +262,214 @@ private void delaySubmit(Runnable runnable) {
215262
});
216263
}
217264
}
265+
266+
private static class CustomCompletionStage<T> implements CompletionStage<T> {
267+
268+
private final CompletionStage<T> completedFuture;
269+
270+
CustomCompletionStage(CompletionStage<T> completedFuture) {
271+
this.completedFuture = completedFuture;
272+
}
273+
274+
@Override
275+
public <U> CompletionStage<U> thenApply(Function<? super T, ? extends U> fn) {
276+
return completedFuture.thenApply(fn);
277+
}
278+
279+
@Override
280+
public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn) {
281+
return completedFuture.thenApplyAsync(fn);
282+
}
283+
284+
@Override
285+
public <U> CompletionStage<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor) {
286+
return completedFuture.thenApplyAsync(fn, executor);
287+
}
288+
289+
@Override
290+
public CompletionStage<Void> thenAccept(Consumer<? super T> action) {
291+
return completedFuture.thenAccept(action);
292+
}
293+
294+
@Override
295+
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action) {
296+
return completedFuture.thenAcceptAsync(action);
297+
}
298+
299+
@Override
300+
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) {
301+
return completedFuture.thenAcceptAsync(action, executor);
302+
}
303+
304+
@Override
305+
public CompletionStage<Void> thenRun(Runnable action) {
306+
return completedFuture.thenRun(action);
307+
}
308+
309+
@Override
310+
public CompletionStage<Void> thenRunAsync(Runnable action) {
311+
return completedFuture.thenRunAsync(action);
312+
}
313+
314+
@Override
315+
public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor) {
316+
return completedFuture.thenRunAsync(action, executor);
317+
}
318+
319+
@Override
320+
public <U, V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,
321+
BiFunction<? super T, ? super U, ? extends V> fn) {
322+
return completedFuture.thenCombine(other, fn);
323+
}
324+
325+
@Override
326+
public <U, V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,
327+
BiFunction<? super T, ? super U, ? extends V> fn) {
328+
return completedFuture.thenCombineAsync(other, fn);
329+
}
330+
331+
@Override
332+
public <U, V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,
333+
BiFunction<? super T, ? super U, ? extends V> fn, Executor executor) {
334+
return completedFuture.thenCombineAsync(other, fn, executor);
335+
}
336+
337+
@Override
338+
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,
339+
BiConsumer<? super T, ? super U> action) {
340+
return completedFuture.thenAcceptBoth(other, action);
341+
}
342+
343+
@Override
344+
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
345+
BiConsumer<? super T, ? super U> action) {
346+
return completedFuture.thenAcceptBothAsync(other, action);
347+
}
348+
349+
@Override
350+
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
351+
BiConsumer<? super T, ? super U> action, Executor executor) {
352+
return completedFuture.thenAcceptBothAsync(other, action, executor);
353+
}
354+
355+
@Override
356+
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action) {
357+
return completedFuture.runAfterBoth(other, action);
358+
}
359+
360+
@Override
361+
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) {
362+
return completedFuture.runAfterBothAsync(other, action);
363+
}
364+
365+
@Override
366+
public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor) {
367+
return completedFuture.runAfterBothAsync(other, action, executor);
368+
}
369+
370+
@Override
371+
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) {
372+
return completedFuture.applyToEither(other, fn);
373+
}
374+
375+
@Override
376+
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn) {
377+
return completedFuture.applyToEitherAsync(other, fn);
378+
}
379+
380+
@Override
381+
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,
382+
Function<? super T, U> fn,
383+
Executor executor) {
384+
return completedFuture.applyToEitherAsync(other, fn, executor);
385+
}
386+
387+
@Override
388+
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) {
389+
return completedFuture.acceptEither(other, action);
390+
}
391+
392+
@Override
393+
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) {
394+
return completedFuture.acceptEitherAsync(other, action);
395+
}
396+
397+
@Override
398+
public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,
399+
Consumer<? super T> action,
400+
Executor executor) {
401+
return completedFuture.acceptEitherAsync(other, action, executor);
402+
}
403+
404+
@Override
405+
public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action) {
406+
return completedFuture.runAfterEither(other, action);
407+
}
408+
409+
@Override
410+
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action) {
411+
return completedFuture.runAfterEitherAsync(other, action);
412+
}
413+
414+
@Override
415+
public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor) {
416+
return completedFuture.runAfterEitherAsync(other, action, executor);
417+
}
418+
419+
@Override
420+
public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
421+
return completedFuture.thenCompose(fn);
422+
}
423+
424+
@Override
425+
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) {
426+
return completedFuture.thenComposeAsync(fn);
427+
}
428+
429+
@Override
430+
public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,
431+
Executor executor) {
432+
return completedFuture.thenComposeAsync(fn, executor);
433+
}
434+
435+
@Override
436+
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn) {
437+
return completedFuture.exceptionally(fn);
438+
}
439+
440+
@Override
441+
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {
442+
return completedFuture.whenComplete(action);
443+
}
444+
445+
@Override
446+
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {
447+
return completedFuture.whenCompleteAsync(action);
448+
}
449+
450+
@Override
451+
public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) {
452+
return completedFuture.whenCompleteAsync(action, executor);
453+
}
454+
455+
@Override
456+
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
457+
return completedFuture.handle(fn);
458+
}
459+
460+
@Override
461+
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) {
462+
return completedFuture.handleAsync(fn);
463+
}
464+
465+
@Override
466+
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
467+
return completedFuture.handleAsync(fn, executor);
468+
}
469+
470+
@Override
471+
public CompletableFuture<T> toCompletableFuture() {
472+
throw new UnsupportedOperationException();
473+
}
474+
}
218475
}

0 commit comments

Comments
 (0)