diff --git a/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java b/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java index 7503c154ddb..1338f2fd804 100644 --- a/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java +++ b/src/java.base/share/classes/java/util/concurrent/CompletableFuture.java @@ -1904,8 +1904,8 @@ private Object waitingGet(boolean interruptible) { while ((r = result) == null) { if (q == null) { q = new Signaller(interruptible, 0L, 0L); - if (Thread.currentThread() instanceof ForkJoinWorkerThread) - ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q); + if (Thread.currentThread() instanceof ForkJoinWorkerThread wt) + ForkJoinPool.helpAsyncBlocker(wt.pool, q); } else if (!queued) queued = tryPushStack(q); @@ -1950,8 +1950,8 @@ else if (nanos <= 0L) break; else if (q == null) { q = new Signaller(true, nanos, deadline); - if (Thread.currentThread() instanceof ForkJoinWorkerThread) - ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q); + if (Thread.currentThread() instanceof ForkJoinWorkerThread wt) + ForkJoinPool.helpAsyncBlocker(wt.pool, q); } else if (!queued) queued = tryPushStack(q); diff --git a/test/jdk/java/util/concurrent/tck/CompletableFutureTest.java b/test/jdk/java/util/concurrent/tck/CompletableFutureTest.java index de3d1dd1050..fc86936d5da 100644 --- a/test/jdk/java/util/concurrent/tck/CompletableFutureTest.java +++ b/test/jdk/java/util/concurrent/tck/CompletableFutureTest.java @@ -58,6 +58,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.ForkJoinWorkerThread; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -5133,4 +5134,46 @@ public void testDefaultExceptionallyComposeAsyncExecutor_actionFailed() { checkCompletedWithWrappedException(g.toCompletableFuture(), r.ex); r.assertInvoked(); }} + + public void testOnlyHelpsIfInTheSamePool() throws Exception { + class Logic { + interface Extractor { ForkJoinPool pool(CompletableFuture cf) throws Exception; } + static final List executeInnerOuter( + ForkJoinPool outer, ForkJoinPool inner, Logic.Extractor extractor + ) throws Exception { + return CompletableFuture.supplyAsync(() -> + Stream.iterate(1, i -> i + 1) + .limit(64) + .map(i -> CompletableFuture.supplyAsync( + () -> Thread.currentThread() instanceof ForkJoinWorkerThread wt ? wt.getPool() : null, inner) + ) + .map(cf -> { + try { + return extractor.pool(cf); + } catch (Exception ex) { + throw new AssertionError("Unexpected", ex); + } + }) + .toList() + , outer).join(); + } + } + + List extractors = + List.of( + c -> c.get(60, SECONDS), + CompletableFuture::get, + CompletableFuture::join + ); + + try (var pool = new ForkJoinPool(2)) { + for (var extractor : extractors) { + for (var p : Logic.executeInnerOuter(pool, ForkJoinPool.commonPool(), extractor)) + assertTrue(p != pool); // The inners should have all been executed by commonPool + + for (var p : Logic.executeInnerOuter(pool, pool, extractor)) + assertTrue(p == pool); // The inners could have been helped by the outer + } + } + } }