Skip to content

Commit 00be57a

Browse files
author
duke
committed
Backport b5b83247da9caea30c88b69543e350783663bc46
1 parent a428b6b commit 00be57a

File tree

2 files changed

+47
-4
lines changed

2 files changed

+47
-4
lines changed

src/java.base/share/classes/java/util/concurrent/CompletableFuture.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1904,8 +1904,8 @@ private Object waitingGet(boolean interruptible) {
19041904
while ((r = result) == null) {
19051905
if (q == null) {
19061906
q = new Signaller(interruptible, 0L, 0L);
1907-
if (Thread.currentThread() instanceof ForkJoinWorkerThread)
1908-
ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
1907+
if (Thread.currentThread() instanceof ForkJoinWorkerThread wt)
1908+
ForkJoinPool.helpAsyncBlocker(wt.pool, q);
19091909
}
19101910
else if (!queued)
19111911
queued = tryPushStack(q);
@@ -1950,8 +1950,8 @@ else if (nanos <= 0L)
19501950
break;
19511951
else if (q == null) {
19521952
q = new Signaller(true, nanos, deadline);
1953-
if (Thread.currentThread() instanceof ForkJoinWorkerThread)
1954-
ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
1953+
if (Thread.currentThread() instanceof ForkJoinWorkerThread wt)
1954+
ForkJoinPool.helpAsyncBlocker(wt.pool, q);
19551955
}
19561956
else if (!queued)
19571957
queued = tryPushStack(q);

test/jdk/java/util/concurrent/tck/CompletableFutureTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.util.concurrent.Executor;
5959
import java.util.concurrent.ForkJoinPool;
6060
import java.util.concurrent.ForkJoinTask;
61+
import java.util.concurrent.ForkJoinWorkerThread;
6162
import java.util.concurrent.RejectedExecutionException;
6263
import java.util.concurrent.TimeoutException;
6364
import java.util.concurrent.atomic.AtomicInteger;
@@ -5133,4 +5134,46 @@ public void testDefaultExceptionallyComposeAsyncExecutor_actionFailed() {
51335134
checkCompletedWithWrappedException(g.toCompletableFuture(), r.ex);
51345135
r.assertInvoked();
51355136
}}
5137+
5138+
public void testOnlyHelpsIfInTheSamePool() throws Exception {
5139+
class Logic {
5140+
interface Extractor { ForkJoinPool pool(CompletableFuture<ForkJoinPool> cf) throws Exception; }
5141+
static final List<ForkJoinPool> executeInnerOuter(
5142+
ForkJoinPool outer, ForkJoinPool inner, Logic.Extractor extractor
5143+
) throws Exception {
5144+
return CompletableFuture.supplyAsync(() ->
5145+
Stream.iterate(1, i -> i + 1)
5146+
.limit(64)
5147+
.map(i -> CompletableFuture.supplyAsync(
5148+
() -> Thread.currentThread() instanceof ForkJoinWorkerThread wt ? wt.getPool() : null, inner)
5149+
)
5150+
.map(cf -> {
5151+
try {
5152+
return extractor.pool(cf);
5153+
} catch (Exception ex) {
5154+
throw new AssertionError("Unexpected", ex);
5155+
}
5156+
})
5157+
.toList()
5158+
, outer).join();
5159+
}
5160+
}
5161+
5162+
List<Logic.Extractor> extractors =
5163+
List.of(
5164+
c -> c.get(60, SECONDS),
5165+
CompletableFuture::get,
5166+
CompletableFuture::join
5167+
);
5168+
5169+
try (var pool = new ForkJoinPool(2)) {
5170+
for (var extractor : extractors) {
5171+
for (var p : Logic.executeInnerOuter(pool, ForkJoinPool.commonPool(), extractor))
5172+
assertTrue(p != pool); // The inners should have all been executed by commonPool
5173+
5174+
for (var p : Logic.executeInnerOuter(pool, pool, extractor))
5175+
assertTrue(p == pool); // The inners could have been helped by the outer
5176+
}
5177+
}
5178+
}
51365179
}

0 commit comments

Comments
 (0)