Skip to content

Commit 5018448

Browse files
committed
Fix onCancel in ConfigurableDependentPromise; making wrappers more smart in decorators; new DelayPolicies.
1 parent ec411d9 commit 5018448

27 files changed

+152
-76
lines changed

src/main/java/net/tascalate/concurrent/CompletableFutureWrapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public CompletableFuture<T> toCompletableFuture() {
6767
}
6868

6969
@Override
70-
protected <U> Promise<U> wrap(CompletionStage<U> original) {
70+
protected <U> Promise<U> wrapNew(CompletionStage<U> original) {
7171
return new CompletableFutureWrapper<>((CompletableFuture<U>)original);
7272
}
7373

src/main/java/net/tascalate/concurrent/CompletablePromise.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public Promise<T> completeAsync(Supplier<? extends T> supplier, Executor executo
7878
}
7979

8080
public CompletablePromise<T> copy() {
81-
CompletablePromise<T> result = wrap(new CompletableFuture<>());
81+
CompletablePromise<T> result = wrapNew(new CompletableFuture<>());
8282
if (isDone()) {
8383
try {
8484
result.complete(join());
@@ -125,7 +125,7 @@ public CompletableFuture<T> toCompletableFuture() {
125125
}
126126

127127
@Override
128-
protected <U> CompletablePromise<U> wrap(CompletionStage<U> original) {
128+
protected <U> CompletablePromise<U> wrapNew(CompletionStage<U> original) {
129129
return new CompletablePromise<>((CompletableFuture<U>)original);
130130
}
131131

@@ -135,7 +135,7 @@ public MinimalCompletionStage(CompletionStage<T> delegate) {
135135
}
136136

137137
@Override
138-
protected <U> CompletionStage<U> wrap(CompletionStage<U> original) {
138+
protected <U> CompletionStage<U> wrapNew(CompletionStage<U> original) {
139139
return new MinimalCompletionStage<>(original);
140140
}
141141

src/main/java/net/tascalate/concurrent/CompletableTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public EnforcedExecutorBoundPromise(Promise<T> delegate) {
142142
}
143143

144144
@Override
145-
protected <U> Promise<U> wrap(CompletionStage<U> original) {
145+
protected <U> Promise<U> wrapNew(CompletionStage<U> original) {
146146
return new EnforcedExecutorBoundPromise<>((Promise<U>)original);
147147
}
148148

src/main/java/net/tascalate/concurrent/CompletionStageWrapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public T join() {
123123
}
124124

125125
@Override
126-
protected <U> Promise<U> wrap(CompletionStage<U> original) {
126+
protected final <U> Promise<U> wrapNew(CompletionStage<U> original) {
127127
return Promises.from(original);
128128
}
129129

@@ -164,7 +164,7 @@ public Promise<T> raw() {
164164
}
165165

166166
@Override
167-
protected <U> Promise<U> wrap(CompletionStage<U> original) {
167+
protected <U> Promise<U> wrapNew(CompletionStage<U> original) {
168168
return new StrictPromise<>((Promise<U>)original);
169169
}
170170

src/main/java/net/tascalate/concurrent/ConfigurableDependentPromise.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import static net.tascalate.concurrent.SharedFunctions.cancelPromise;
1919
import static net.tascalate.concurrent.SharedFunctions.iif;
20-
import static net.tascalate.concurrent.SharedFunctions.whenCancel;
2120

2221
import java.time.Duration;
2322
import java.util.Arrays;
@@ -116,11 +115,17 @@ private static <U> DependentPromise<U> doWrap(Promise<U> original,
116115
}
117116

118117
return new ConfigurableDependentPromise<>(
119-
noOrigins ? original : whenCancel(original, () -> cancelPromises(cancellableOrigins, true)),
118+
noOrigins ? original : original.onCancel(() -> cancelPromises(cancellableOrigins, true)),
120119
defaultEnlistOptions, cancellableOrigins
121120
);
122121
}
123122

123+
@Override
124+
public DependentPromise<T> onCancel(Runnable action) {
125+
delegate.onCancel(action);
126+
return this;
127+
}
128+
124129
// All delay overloads delegate to these methods
125130
@Override
126131
public DependentPromise<T> delay(Duration duration, boolean delayOnError) {

src/main/java/net/tascalate/concurrent/DelayPolicy.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import net.tascalate.concurrent.delays.ExponentialDelayPolicy;
3232
import net.tascalate.concurrent.delays.FirstRetryNoDelayPolicy;
3333
import net.tascalate.concurrent.delays.FixedIntervalDelayPolicy;
34+
import net.tascalate.concurrent.delays.OnFailureNoDelayPolicy;
35+
import net.tascalate.concurrent.delays.OnSuccessNoDelayPolicy;
3436
import net.tascalate.concurrent.delays.ProportionalRandomDelayPolicy;
3537
import net.tascalate.concurrent.delays.UniformRandomDelayPolicy;
3638

@@ -132,6 +134,14 @@ default DelayPolicy<T> withFirstRetryNoDelay() {
132134
return new FirstRetryNoDelayPolicy<>(this);
133135
}
134136

137+
default DelayPolicy<T> withOnFailureNoDelay() {
138+
return new OnFailureNoDelayPolicy<>(this);
139+
}
140+
141+
default DelayPolicy<T> withOnSuccessNoDelay() {
142+
return new OnSuccessNoDelayPolicy<>(this);
143+
}
144+
135145
default <D extends DelayPolicy<T>> D withCusomizer(Function<? super DelayPolicy<T>, D> fn) {
136146
return fn.apply(this);
137147
}

src/main/java/net/tascalate/concurrent/DependentPromise.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,7 @@ public static <U> DependentPromise<U> from(Promise<U> source, Set<PromiseOrigin>
8888
}
8989

9090
@Override
91-
default DependentPromise<T> onCancel(Runnable action) {
92-
// Safe here regardless of origins used
93-
// Max is self-origin, but whenComplete used
94-
// when self is completed already
95-
return SharedFunctions.whenCancel(this, action);
96-
}
91+
DependentPromise<T> onCancel(Runnable action);
9792

9893
@Override
9994
default DependentPromise<T> defaultAsyncOn(Executor executor) {

src/main/java/net/tascalate/concurrent/Promise.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static net.tascalate.concurrent.SharedFunctions.failure;
2020
import static net.tascalate.concurrent.SharedFunctions.selectFirst;
2121
import static net.tascalate.concurrent.SharedFunctions.unwrapExecutionException;
22-
import static net.tascalate.concurrent.SharedFunctions.whenCancel;
2322
import static net.tascalate.concurrent.SharedFunctions.wrapCompletionException;
2423

2524
import java.time.Duration;
@@ -101,7 +100,17 @@ default boolean isCompletedExceptionally() {
101100
}
102101

103102
default Promise<T> onCancel(Runnable action) {
104-
return whenCancel(this, action);
103+
if (isCancelled()) {
104+
action.run();
105+
} else if (!isDone()) {
106+
exceptionally(__ -> {
107+
if (isCancelled()) {
108+
action.run();
109+
}
110+
return null;
111+
});
112+
}
113+
return this;
105114
}
106115

107116
default Promise<T> delay(long timeout, TimeUnit unit) {

src/main/java/net/tascalate/concurrent/PromiseOrigin.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
import java.util.EnumSet;
1919
import java.util.Set;
2020

21+
import static java.util.Collections.unmodifiableSet;
22+
2123
public enum PromiseOrigin {
2224
THIS, PARAM;
2325

24-
public static Set<PromiseOrigin> ALL = EnumSet.of(THIS, PARAM);
25-
public static Set<PromiseOrigin> NONE = EnumSet.noneOf(PromiseOrigin.class);
26-
public static Set<PromiseOrigin> THIS_ONLY = EnumSet.of(THIS);
27-
public static Set<PromiseOrigin> PARAM_ONLY = EnumSet.of(PARAM);
26+
public static final Set<PromiseOrigin> ALL = unmodifiableSet(EnumSet.of(THIS, PARAM));
27+
public static final Set<PromiseOrigin> NONE = unmodifiableSet(EnumSet.noneOf(PromiseOrigin.class));
28+
public static final Set<PromiseOrigin> THIS_ONLY = unmodifiableSet(EnumSet.of(THIS));
29+
public static final Set<PromiseOrigin> PARAM_ONLY = unmodifiableSet(EnumSet.of(PARAM));
2830
}

src/main/java/net/tascalate/concurrent/SharedFunctions.java

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -68,22 +68,7 @@ static <T> CompletionStage<T> failure(Function<? super T, Throwable> errorSuppli
6868
result.completeExceptionally(errorSupplier.apply(value));
6969
return result;
7070
}
71-
72-
static <T, D extends Promise<T>> D whenCancel(D promise, Runnable action) {
73-
if (promise.isCancelled()) {
74-
action.run();
75-
} else if (promise.isDone()) {
76-
//
77-
} else {
78-
promise.whenComplete((r, e) -> {
79-
if (promise.isCancelled()) {
80-
action.run();
81-
}
82-
});
83-
}
84-
return promise;
85-
}
86-
71+
8772
static boolean cancelPromise(CompletionStage<?> promise, boolean mayInterruptIfRunning) {
8873
if (promise instanceof Future) {
8974
Future<?> future = (Future<?>) promise;

0 commit comments

Comments
 (0)