Skip to content

Commit 809f134

Browse files
committed
Make Promise implement all default methods without delegation to the same methods of ConfigurableDependentPromise, make ConfigurableDependentPromise merely wraps results;
Fix JDK 12 exceptional methods in *Wrapper classes Create concrete CompletionStageDecorator & ExecutorBoundCompletionStage classes Implement thenFilterAsync in AbstractCompletableTask
1 parent 92187d7 commit 809f134

12 files changed

+372
-438
lines changed

examples/net/tascalate/concurrent/ExecutorBoundCompletionStage.java

Lines changed: 0 additions & 263 deletions
This file was deleted.

src/main/java/net/tascalate/concurrent/AbstractCompletableTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.function.BiFunction;
4444
import java.util.function.Consumer;
4545
import java.util.function.Function;
46+
import java.util.function.Predicate;
4647

4748
/**
4849
* Base superclass for both root and intermediate {@link Promise}-s that
@@ -345,11 +346,10 @@ public Promise<T> exceptionallyComposeAsync(Function<Throwable, ? extends Comple
345346
return nextStage;
346347
}
347348

348-
// Default operation in Promise is ok - it just delegates to thenCompose[Async]
349-
/*
350349
@Override
351350
public Promise<T> thenFilterAsync(Predicate<? super T> predicate, Function<? super T, Throwable> errorSupplier, Executor executor) {
352-
*/
351+
return thenApplyAsync(r -> predicate.test(r) ? r : forwardException(errorSupplier.apply(r)), executor);
352+
}
353353

354354
@Override
355355
public Promise<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor) {

src/main/java/net/tascalate/concurrent/CompletableFutureWrapper.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@
2222

2323
import net.tascalate.concurrent.decorators.CompletableFutureDecorator;
2424

25-
public class CompletableFutureWrapper<T>
26-
extends CompletableFutureDecorator<T> {
25+
public class CompletableFutureWrapper<T> extends CompletableFutureDecorator<T> {
2726

2827
protected CompletableFutureWrapper() {
2928
super();
@@ -76,27 +75,37 @@ public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends CompletionS
7675

7776
@Override
7877
public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,
79-
Executor executor) {
78+
Executor executor) {
8079
CompletionStageRef<U> ref = new CompletionStageRef<>();
8180
return super.thenComposeAsync(ref.captureResult(fn), executor).onCancel(ref.cancel);
8281
}
8382

83+
// Default CompletionStage API implementation for exceptionallyAsync / exceptionallyCompose[Async]
84+
// doesn't handle cancellation well due to numerous orchestrated calls (handle->handleAsync->thenCompose
85+
// Use own implementation here for safety
86+
@Override
87+
public Promise<T> exceptionallyAsync(Function<Throwable, ? extends T> fn) {
88+
return PromiseHelper.exceptionallyAsync(this, fn);
89+
}
90+
91+
@Override
92+
public Promise<T> exceptionallyAsync(Function<Throwable, ? extends T> fn, Executor executor) {
93+
return PromiseHelper.exceptionallyAsync(this, fn, executor);
94+
}
95+
8496
@Override
8597
public Promise<T> exceptionallyCompose(Function<Throwable, ? extends CompletionStage<T>> fn) {
86-
CompletionStageRef<T> ref = new CompletionStageRef<>();
87-
return super.exceptionallyCompose(ref.captureResult(fn)).onCancel(ref.cancel);
98+
return PromiseHelper.exceptionallyCompose(this, fn);
8899
}
89100

90101
@Override
91102
public Promise<T> exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>> fn) {
92-
CompletionStageRef<T> ref = new CompletionStageRef<>();
93-
return super.exceptionallyComposeAsync(ref.captureResult(fn)).onCancel(ref.cancel);
103+
return PromiseHelper.exceptionallyComposeAsync(this, fn);
94104
}
95105

96106
@Override
97107
public Promise<T> exceptionallyComposeAsync(Function<Throwable, ? extends CompletionStage<T>> fn,
98108
Executor executor) {
99-
CompletionStageRef<T> ref = new CompletionStageRef<>();
100-
return super.exceptionallyComposeAsync(ref.captureResult(fn), executor).onCancel(ref.cancel);
101-
}
109+
return PromiseHelper.exceptionallyComposeAsync(this, fn, executor);
110+
}
102111
}

src/main/java/net/tascalate/concurrent/CompletablePromise.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ public CompletablePromise<T> copy() {
9494

9595
public Promise<T> minimalPromise() {
9696
CompletableFutureWrapper<T> result = new CompletableFutureWrapper<>();
97+
// Should we cancel result along this?
9798
if (isDone()) {
9899
try {
99100
result.success(join());

0 commit comments

Comments
 (0)