Skip to content

Commit 4bf983d

Browse files
committed
Fix cancellation in AsyncLoop
1 parent 84f351c commit 4bf983d

File tree

3 files changed

+17
-8
lines changed

3 files changed

+17
-8
lines changed

src/main/java/net/tascalate/concurrent/AggregatingPromise.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ void onComplete(int idx, T result, Throwable error) {
181181
}
182182
}
183183

184-
Promise<List<R>> postConstruct() {
184+
Promise<List<R>> start() {
185185
int i = 0;
186186
for (CompletionStage<? extends T> promise : promises) {
187187
int idx = i++;

src/main/java/net/tascalate/concurrent/AsyncLoop.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,7 @@ class AsyncLoop<T> extends CompletableFutureWrapper<T> {
3535
@Override
3636
public boolean cancel(boolean mayInterruptIfRunning) {
3737
if (super.cancel(mayInterruptIfRunning)) {
38-
CompletionStage<?> dependent = currentStage;
39-
if (null != dependent) {
40-
SharedFunctions.cancelPromise(dependent, mayInterruptIfRunning);
41-
}
38+
cancelCurrentStage(mayInterruptIfRunning);
4239
return true;
4340
} else {
4441
return false;
@@ -49,13 +46,13 @@ void run(T initialValue) {
4946
run(initialValue, null, null);
5047
}
5148

52-
private void run(T resolvedValue, Thread initialThread, AsyncLoop.IterationState<T> initialState) {
49+
private void run(T resolvedValue, Thread initialThread, IterationState<T> initialState) {
5350
Thread currentThread = Thread.currentThread();
5451
if (currentThread.equals(initialThread) && initialState.running) {
5552
initialState.put(resolvedValue);
5653
return;
5754
}
58-
AsyncLoop.IterationState<T> currentState = new AsyncLoop.IterationState<>();
55+
IterationState<T> currentState = new IterationState<>();
5956
T currentValue = resolvedValue;
6057
try {
6158
do {
@@ -70,6 +67,11 @@ private void run(T resolvedValue, Thread initialThread, AsyncLoop.IterationState
7067
run(next, currentThread, currentState);
7168
}
7269
});
70+
if (isDone()) {
71+
// If race between this.cancel and this.run
72+
// Double-cancel is not an issue
73+
cancelCurrentStage(true);
74+
}
7375
} else {
7476
success(currentValue);
7577
break;
@@ -84,6 +86,13 @@ private void run(T resolvedValue, Thread initialThread, AsyncLoop.IterationState
8486
}
8587
}
8688

89+
private void cancelCurrentStage(boolean mayInterruptIfRunning) {
90+
CompletionStage<?> dependent = currentStage;
91+
if (null != dependent) {
92+
SharedFunctions.cancelPromise(dependent, mayInterruptIfRunning);
93+
}
94+
}
95+
8796
private static final class IterationState<T> {
8897
static final Object END = new Object();
8998

src/main/java/net/tascalate/concurrent/Promises.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -990,7 +990,7 @@ private static <T, R> Promise<List<R>> atLeast(int minResultsCount, int maxError
990990
return transform(stage, singleResultMapper, Promises::wrapMultitargetException);
991991
} else {
992992
return ctr.create(minResultsCount, maxErrorsCount, cancelRemaining, promises)
993-
.postConstruct();
993+
.start();
994994
}
995995
}
996996

0 commit comments

Comments
 (0)