Skip to content

Commit bc21475

Browse files
committed
Fix currentStage cancelation in AsyncLoop; optimize intermediate stage cancelation in AbstractCompletableTask (thenCompose / exceptionallyCompose)
1 parent 4bf983d commit bc21475

File tree

4 files changed

+60
-55
lines changed

4 files changed

+60
-55
lines changed

src/main/java/net/tascalate/concurrent/AbstractCompletableTask.java

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import static net.tascalate.concurrent.SharedFunctions.wrapCompletionException;
2929
import static net.tascalate.concurrent.SharedFunctions.wrapExecutionException;
3030

31-
import java.util.Arrays;
3231
import java.util.concurrent.Callable;
3332
import java.util.concurrent.CancellationException;
3433
import java.util.concurrent.CompletableFuture;
@@ -66,31 +65,18 @@ protected AbstractCompletableTask(Executor defaultExecutor, Callable<T> action)
6665
this.task = new StageTransition(action);
6766
}
6867

69-
private CompletionStage<?>[] cancellableOrigins;
70-
private Object cancellableOriginsLock = new Object();
71-
72-
protected void resetCancellableOrigins(CompletionStage<?>... origins) {
73-
synchronized (cancellableOriginsLock) {
74-
this.cancellableOrigins = origins;
75-
}
76-
}
77-
78-
protected void cancelOrigins(boolean mayInterruptIfRunning) {
79-
synchronized (cancellableOriginsLock) {
80-
if (null == cancellableOrigins) {
81-
return;
82-
}
83-
Arrays.stream(cancellableOrigins).forEach(p -> cancelPromise(p, mayInterruptIfRunning));
84-
}
85-
}
68+
private volatile CompletionStage<?> intermediateStage;
8669

8770
abstract void fireTransition(Callable<T> code);
8871

8972
@Override
9073
public boolean cancel(boolean mayInterruptIfRunning) {
9174
if (task.cancel(mayInterruptIfRunning)) {
9275
failure(new CancellationException());
93-
cancelOrigins(mayInterruptIfRunning);
76+
CompletionStage<?> s = intermediateStage;
77+
if (null != s) {
78+
cancelPromise(s, mayInterruptIfRunning);
79+
}
9480
return true;
9581
} else {
9682
return false;
@@ -242,7 +228,7 @@ public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends CompletionS
242228
AbstractCompletableTask<U> nextStage = internalCreateCompletionStage(executor);
243229
// Need to enlist tempStage while it is non-visible outside
244230
// and may not be used to interrupt fn.apply();
245-
nextStage.resetCancellableOrigins(tempStage);
231+
nextStage.intermediateStage = tempStage;
246232

247233
// We must ALWAYS run through the execution
248234
// of nextStage.task when this nextStage is
@@ -257,10 +243,16 @@ public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends CompletionS
257243
tempStage,
258244
consumerAsFunction(r -> {
259245
try {
260-
CompletionStage<U> returned = fn.apply(r);
261246
// tempStage is completed successfully, so no sense
262247
// to include it in cancellableOrigins
263-
// However, nextStage is in progress
248+
nextStage.intermediateStage = null;
249+
if (nextStage.isDone()) {
250+
// Was canceled by user code
251+
// Don't need to run anything
252+
return;
253+
}
254+
CompletionStage<U> returned = fn.apply(r);
255+
// nextStage is in progress
264256
// IMPORTANT: it COULD be shared, but typically is not
265257
// So in very rare case some nasty behavior MAY exist
266258
// if others depends on it
@@ -280,16 +272,15 @@ public <U> Promise<U> thenComposeAsync(Function<? super T, ? extends CompletionS
280272
//} catch (InterruptedException ex) {
281273
//}
282274

283-
nextStage.resetCancellableOrigins(returned);
275+
// Always assign before check for cancellation to avoid race
276+
nextStage.intermediateStage = returned;
284277
if (nextStage.isCancelled()) {
285-
nextStage.cancelOrigins(true);
278+
cancelPromise(returned, true);
286279
} else {
287280
// Synchronous, while transition to tempStage is asynchronous already
288281
returned.whenComplete(biConsumer(onResult, onError));
289282
}
290283
} catch (Throwable e) {
291-
// must-have if fn.apply above failed
292-
nextStage.resetCancellableOrigins((CompletionStage<U>)null);
293284
// no need to check nextStage.isCancelled()
294285
// while there are no origins to cancel
295286
// propagate error immediately
@@ -320,7 +311,7 @@ public Promise<T> exceptionallyComposeAsync(Function<Throwable, ? extends Comple
320311
AbstractCompletableTask<Void> tempStage = internalCreateCompletionStage(executor);
321312
AbstractCompletableTask<T> nextStage = internalCreateCompletionStage(executor);
322313

323-
nextStage.resetCancellableOrigins(tempStage);
314+
nextStage.intermediateStage = tempStage;
324315

325316
Consumer<? super T> onResult = nextStage.runTransition(Function.identity());
326317
Consumer<? super Throwable> onError = nextStage.runTransition(AbstractCompletableTask::forwardException);
@@ -330,15 +321,20 @@ public Promise<T> exceptionallyComposeAsync(Function<Throwable, ? extends Comple
330321
consumerAsFunction(onResult),
331322
consumerAsFunction(error -> {
332323
try {
324+
nextStage.intermediateStage = null;
325+
if (nextStage.isDone()) {
326+
// Was canceled by user code
327+
// Don't need to run anything
328+
return;
329+
}
333330
CompletionStage<T> returned = fn.apply(error);
334-
nextStage.resetCancellableOrigins(returned);
331+
nextStage.intermediateStage = returned;
335332
if (nextStage.isCancelled()) {
336-
nextStage.cancelOrigins(true);
333+
cancelPromise(returned, true);
337334
} else {
338335
returned.whenComplete(biConsumer(onResult, onError));
339336
}
340337
} catch (Throwable e) {
341-
nextStage.resetCancellableOrigins((CompletionStage<T>)null);
342338
// In JDK 12 CompletionStage.composeExceptionally[Async] uses *.handle[Async]
343339
// So overwrite returned error with the latest one - as in handle()
344340
e.addSuppressed(error);

src/main/java/net/tascalate/concurrent/AsyncLoop.java

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ class AsyncLoop<T> extends CompletableFutureWrapper<T> {
3535
@Override
3636
public boolean cancel(boolean mayInterruptIfRunning) {
3737
if (super.cancel(mayInterruptIfRunning)) {
38-
cancelCurrentStage(mayInterruptIfRunning);
38+
CompletionStage<?> s = currentStage;
39+
if (null != s) {
40+
SharedFunctions.cancelPromise(s, mayInterruptIfRunning);
41+
}
3942
return true;
4043
} else {
4144
return false;
@@ -60,17 +63,25 @@ private void run(T resolvedValue, Thread initialThread, IterationState<T> initia
6063
if (isDone()) {
6164
break;
6265
} else if (loopCondition.test(currentValue)) {
63-
(currentStage = loopBody.apply(currentValue)).whenComplete((next, ex) -> {
64-
if (ex != null) {
65-
failure(ex);
66-
} else {
67-
run(next, currentThread, currentState);
68-
}
69-
});
66+
CompletionStage<T> returned = loopBody.apply(currentValue);
67+
// Assign before check to avoid race
68+
currentStage = returned;
69+
// isDone() is never slower than isCancel() --
70+
// actually, the test is for cancellation
71+
// but isDone() is ok.
7072
if (isDone()) {
7173
// If race between this.cancel and this.run
7274
// Double-cancel is not an issue
73-
cancelCurrentStage(true);
75+
SharedFunctions.cancelPromise(returned, true);
76+
break;
77+
} else {
78+
returned.whenComplete((next, ex) -> {
79+
if (ex != null) {
80+
failure(ex);
81+
} else {
82+
run(next, currentThread, currentState);
83+
}
84+
});
7485
}
7586
} else {
7687
success(currentValue);
@@ -86,13 +97,6 @@ private void run(T resolvedValue, Thread initialThread, IterationState<T> initia
8697
}
8798
}
8899

89-
private void cancelCurrentStage(boolean mayInterruptIfRunning) {
90-
CompletionStage<?> dependent = currentStage;
91-
if (null != dependent) {
92-
SharedFunctions.cancelPromise(dependent, mayInterruptIfRunning);
93-
}
94-
}
95-
96100
private static final class IterationState<T> {
97101
static final Object END = new Object();
98102

src/main/java/net/tascalate/concurrent/ConfigurableDependentPromise.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,13 @@ protected ConfigurableDependentPromise(Promise<T> delegate,
8787
this.cancellableOrigins = cancellableOrigins;
8888
}
8989

90+
protected ConfigurableDependentPromise<T> setup() {
91+
if (!isEmptyArray(cancellableOrigins)) {
92+
delegate.onCancel(() -> cancelPromises(cancellableOrigins, true));
93+
}
94+
return this;
95+
}
96+
9097
public static <U> DependentPromise<U> from(Promise<U> source) {
9198
return from(source, PromiseOrigin.NONE);
9299
}
@@ -102,8 +109,7 @@ protected <U> DependentPromise<U> wrap(Promise<U> original, CompletionStage<?>[]
102109
private static <U> DependentPromise<U> doWrap(Promise<U> original,
103110
Set<PromiseOrigin> defaultEnlistOptions,
104111
CompletionStage<?>[] cancellableOrigins) {
105-
boolean noOrigins = isEmptyArray(cancellableOrigins);
106-
if (noOrigins) {
112+
if (isEmptyArray(cancellableOrigins)) {
107113
// Nothing to enlist additionally for this "original" instance
108114
if (original instanceof ConfigurableDependentPromise) {
109115
ConfigurableDependentPromise<U> ioriginal = (ConfigurableDependentPromise<U>)original;
@@ -114,10 +120,7 @@ private static <U> DependentPromise<U> doWrap(Promise<U> original,
114120
}
115121
}
116122

117-
return new ConfigurableDependentPromise<>(
118-
noOrigins ? original : original.onCancel(() -> cancelPromises(cancellableOrigins, true)),
119-
defaultEnlistOptions, cancellableOrigins
120-
);
123+
return new ConfigurableDependentPromise<>(original, defaultEnlistOptions, cancellableOrigins).setup();
121124
}
122125

123126
@Override

src/main/java/net/tascalate/concurrent/Promise.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,11 @@ default boolean isCompletedExceptionally() {
100100
}
101101

102102
default Promise<T> onCancel(Runnable action) {
103-
if (isCancelled()) {
104-
action.run();
105-
} else if (!isDone()) {
103+
if (isDone()) {
104+
if (isCancelled()) {
105+
action.run();
106+
}
107+
} else {
106108
exceptionally(__ -> {
107109
if (isCancelled()) {
108110
action.run();

0 commit comments

Comments
 (0)