Skip to content

Commit 61acdf6

Browse files
Viktor KlangDoug Lea
andcommitted
8365065: cancelled ForkJoinPool tasks no longer throw CancellationException
Co-authored-by: Doug Lea <[email protected]> Reviewed-by: alanb
1 parent f9b91a7 commit 61acdf6

File tree

2 files changed

+70
-10
lines changed

2 files changed

+70
-10
lines changed

src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -531,19 +531,22 @@ final void doExec() {
531531
* still correct, although it may contain a misleading stack
532532
* trace.
533533
*
534-
* @param asExecutionException true if wrap as ExecutionException
534+
* @param asExecutionException true if wrap the result as an
535+
* ExecutionException. This applies only to actual exceptions, not
536+
* implicit CancellationExceptions issued when not THROWN or
537+
* available, which are not wrapped because by default they are
538+
* issued separately from ExecutionExceptions by callers. Which
539+
* may require further handling when this is not true (currently
540+
* only in InvokeAnyTask).
535541
* @return the exception, or null if none
536542
*/
537543
private Throwable getException(boolean asExecutionException) {
538544
int s; Throwable ex; Aux a;
539545
if ((s = status) >= 0 || (s & ABNORMAL) == 0)
540546
return null;
541-
else if ((s & THROWN) == 0 || (a = aux) == null || (ex = a.ex) == null) {
542-
ex = new CancellationException();
543-
if (!asExecutionException || !(this instanceof InterruptibleTask))
544-
return ex; // else wrap below
545-
}
546-
else if (a.thread != Thread.currentThread()) {
547+
if ((s & THROWN) == 0 || (a = aux) == null || (ex = a.ex) == null)
548+
return new CancellationException();
549+
if (a.thread != Thread.currentThread()) {
547550
try {
548551
Constructor<?> noArgCtor = null, oneArgCtor = null;
549552
for (Constructor<?> c : ex.getClass().getConstructors()) {
@@ -1814,6 +1817,8 @@ final T invokeAny(Collection<? extends Callable<T>> tasks,
18141817
(t = new InvokeAnyTask<T>(c, this, t)));
18151818
}
18161819
return timed ? get(nanos, TimeUnit.NANOSECONDS) : get();
1820+
} catch (CancellationException ce) {
1821+
throw new ExecutionException(ce);
18171822
} finally {
18181823
for (; t != null; t = t.pred)
18191824
t.onRootCompletion();

test/jdk/java/util/concurrent/tck/ForkJoinPoolTest.java

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,14 @@
3131
* http://creativecommons.org/publicdomain/zero/1.0/
3232
*/
3333

34-
import static java.util.concurrent.TimeUnit.MILLISECONDS;
35-
import static java.util.concurrent.TimeUnit.NANOSECONDS;
36-
3734
import java.security.PrivilegedAction;
3835
import java.security.PrivilegedExceptionAction;
3936
import java.util.ArrayList;
4037
import java.util.Collection;
4138
import java.util.Collections;
4239
import java.util.List;
4340
import java.util.concurrent.Callable;
41+
import java.util.concurrent.CancellationException;
4442
import java.util.concurrent.CountDownLatch;
4543
import java.util.concurrent.ExecutionException;
4644
import java.util.concurrent.Executors;
@@ -51,13 +49,19 @@
5149
import java.util.concurrent.Future;
5250
import java.util.concurrent.RecursiveTask;
5351
import java.util.concurrent.RejectedExecutionException;
52+
import java.util.concurrent.TimeoutException;
53+
import java.util.concurrent.TimeUnit;
54+
import java.util.function.Function;
5455
import java.util.concurrent.atomic.AtomicBoolean;
5556
import java.util.concurrent.atomic.AtomicInteger;
57+
import java.util.concurrent.atomic.AtomicReference;
5658
import java.util.concurrent.locks.ReentrantLock;
5759

5860
import junit.framework.Test;
5961
import junit.framework.TestSuite;
6062

63+
import static java.util.concurrent.TimeUnit.*;
64+
6165
public class ForkJoinPoolTest extends JSR166TestCase {
6266
public static void main(String[] args) {
6367
main(suite(), args);
@@ -482,6 +486,57 @@ public void testSubmitRunnable() throws Throwable {
482486
}
483487
}
484488

489+
public void testCancellationExceptionInGet() throws Exception {
490+
final ExecutorService e = new ForkJoinPool(1);
491+
try (var cleaner = cleaner(e)) {
492+
assertCancellationExceptionFrom(
493+
e::submit,
494+
f -> () -> f.get(1000, TimeUnit.SECONDS)
495+
);
496+
assertCancellationExceptionFrom(
497+
e::submit,
498+
f -> f::get
499+
);
500+
assertCancellationExceptionFrom(
501+
c -> e.submit(() -> { try { c.call(); } catch (Exception ex) { throw new RuntimeException(ex); } }),
502+
f -> () -> f.get(1000, TimeUnit.SECONDS)
503+
);
504+
assertCancellationExceptionFrom(
505+
c -> e.submit(() -> { try { c.call(); } catch (Exception ex) { throw new RuntimeException(ex); } }),
506+
f -> f::get
507+
);
508+
}
509+
}
510+
511+
private void assertCancellationExceptionFrom(
512+
Function<Callable<Void>, Future<?>> createTask,
513+
Function<Future<?>, Callable<?>> getResult) throws Exception {
514+
final var t = new AtomicReference<Thread>();
515+
final var c = new CountDownLatch(1); // Only used to induce WAITING state (never counted down)
516+
final var task = createTask.apply(() -> {
517+
try {
518+
t.set(Thread.currentThread());
519+
c.await();
520+
} catch (InterruptedException ie) {
521+
Thread.currentThread().interrupt();;
522+
}
523+
return null;
524+
});
525+
Thread taskThread;
526+
while((taskThread = t.get()) == null || taskThread.getState() != Thread.State.WAITING) {
527+
if (Thread.interrupted())
528+
throw new InterruptedException();
529+
Thread.onSpinWait();
530+
}
531+
task.cancel(true);
532+
try {
533+
getResult.apply(task).call();
534+
} catch (CancellationException ce) {
535+
return; // Success
536+
}
537+
shouldThrow();
538+
}
539+
485540
/**
486541
* Completed submit(runnable, result) returns result
487542
*/

0 commit comments

Comments
 (0)