Skip to content

Commit 8b0602d

Browse files
author
Doug Lea
committed
8319447: Improve performance of delayed task handling
Reviewed-by: vklang, alanb
1 parent fe8bd75 commit 8b0602d

File tree

12 files changed

+1765
-373
lines changed

12 files changed

+1765
-373
lines changed

src/java.base/share/classes/java/util/concurrent/CompletableFuture.java

Lines changed: 103 additions & 154 deletions
Large diffs are not rendered by default.

src/java.base/share/classes/java/util/concurrent/DelayScheduler.java

Lines changed: 568 additions & 0 deletions
Large diffs are not rendered by default.

src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java

Lines changed: 516 additions & 98 deletions
Large diffs are not rendered by default.

src/java.base/share/classes/java/util/concurrent/ForkJoinTask.java

Lines changed: 88 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,8 @@ final boolean casNext(Aux c, Aux v) { // used only in cancellation
273273
static final int ABNORMAL = 1 << 16;
274274
static final int THROWN = 1 << 17;
275275
static final int HAVE_EXCEPTION = DONE | ABNORMAL | THROWN;
276+
static final int NUH_BIT = 24; // no external caller helping
277+
static final int NO_USER_HELP = 1 << NUH_BIT;
276278
static final int MARKER = 1 << 30; // utility marker
277279
static final int SMASK = 0xffff; // short bits for tags
278280
static final int UNCOMPENSATE = 1 << 16; // helpJoin sentinel
@@ -292,6 +294,12 @@ private int getAndBitwiseOrStatus(int v) {
292294
private boolean casStatus(int c, int v) {
293295
return U.compareAndSetInt(this, STATUS, c, v);
294296
}
297+
final int noUserHelp() { // nonvolatile read; return 0 or 1
298+
return (U.getInt(this, STATUS) & NO_USER_HELP) >>> NUH_BIT;
299+
}
300+
final void setNoUserHelp() { // for use in constructors only
301+
U.putInt(this, STATUS, NO_USER_HELP);
302+
}
295303

296304
// Support for waiting and signalling
297305

@@ -330,14 +338,9 @@ private void setDone() {
330338
*/
331339
final int trySetCancelled() {
332340
int s;
333-
for (;;) {
334-
if ((s = status) < 0)
335-
break;
336-
if (casStatus(s, s | (DONE | ABNORMAL))) {
337-
signalWaiters();
338-
break;
339-
}
340-
}
341+
if ((s = status) >= 0 &&
342+
(s = getAndBitwiseOrStatus(DONE | ABNORMAL)) >= 0)
343+
signalWaiters();
341344
return s;
342345
}
343346

@@ -481,7 +484,7 @@ else if (casAux(a, next))
481484
*/
482485
private int awaitDone(boolean interruptible, long deadline) {
483486
ForkJoinWorkerThread wt; ForkJoinPool p; ForkJoinPool.WorkQueue q;
484-
Thread t; boolean internal; int s;
487+
Thread t; boolean internal; int s, ss;
485488
if (internal =
486489
(t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
487490
p = (wt = (ForkJoinWorkerThread)t).pool;
@@ -492,7 +495,7 @@ private int awaitDone(boolean interruptible, long deadline) {
492495
return (((s = (p == null) ? 0 :
493496
((this instanceof CountedCompleter) ?
494497
p.helpComplete(this, q, internal) :
495-
(this instanceof InterruptibleTask) && !internal ? status :
498+
!internal && ((ss = status) & NO_USER_HELP) != 0 ? ss :
496499
p.helpJoin(this, q, internal))) < 0)) ? s :
497500
awaitDone(internal ? p : null, s, interruptible, deadline);
498501
}
@@ -642,7 +645,7 @@ public final ForkJoinTask<V> fork() {
642645
p = wt.pool;
643646
}
644647
else
645-
q = (p = ForkJoinPool.common).externalSubmissionQueue();
648+
q = (p = ForkJoinPool.common).externalSubmissionQueue(false);
646649
q.push(this, p, internal);
647650
return this;
648651
}
@@ -1160,7 +1163,7 @@ public static void helpQuiesce() {
11601163
*/
11611164
public void reinitialize() {
11621165
aux = null;
1163-
status = 0;
1166+
status &= NO_USER_HELP;
11641167
}
11651168

11661169
/**
@@ -1414,7 +1417,8 @@ public final boolean compareAndSetForkJoinTaskTag(short expect, short update) {
14141417
* @return the task
14151418
*/
14161419
public static ForkJoinTask<?> adapt(Runnable runnable) {
1417-
return new AdaptedRunnableAction(runnable);
1420+
return new AdaptedRunnableAction(
1421+
Objects.requireNonNull(runnable));
14181422
}
14191423

14201424
/**
@@ -1428,7 +1432,8 @@ public static ForkJoinTask<?> adapt(Runnable runnable) {
14281432
* @return the task
14291433
*/
14301434
public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
1431-
return new AdaptedRunnable<T>(runnable, result);
1435+
return new AdaptedRunnable<T>(
1436+
Objects.requireNonNull(runnable), result);
14321437
}
14331438

14341439
/**
@@ -1442,7 +1447,8 @@ public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
14421447
* @return the task
14431448
*/
14441449
public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
1445-
return new AdaptedCallable<T>(callable);
1450+
return new AdaptedCallable<T>(
1451+
Objects.requireNonNull(callable));
14461452
}
14471453

14481454
/**
@@ -1460,7 +1466,8 @@ public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
14601466
* @since 19
14611467
*/
14621468
public static <T> ForkJoinTask<T> adaptInterruptible(Callable<? extends T> callable) {
1463-
return new AdaptedInterruptibleCallable<T>(callable);
1469+
return new AdaptedInterruptibleCallable<T>(
1470+
Objects.requireNonNull(callable));
14641471
}
14651472

14661473
/**
@@ -1479,7 +1486,8 @@ public static <T> ForkJoinTask<T> adaptInterruptible(Callable<? extends T> calla
14791486
* @since 22
14801487
*/
14811488
public static <T> ForkJoinTask<T> adaptInterruptible(Runnable runnable, T result) {
1482-
return new AdaptedInterruptibleRunnable<T>(runnable, result);
1489+
return new AdaptedInterruptibleRunnable<T>(
1490+
Objects.requireNonNull(runnable), result);
14831491
}
14841492

14851493
/**
@@ -1497,7 +1505,8 @@ public static <T> ForkJoinTask<T> adaptInterruptible(Runnable runnable, T result
14971505
* @since 22
14981506
*/
14991507
public static ForkJoinTask<?> adaptInterruptible(Runnable runnable) {
1500-
return new AdaptedInterruptibleRunnable<Void>(runnable, null);
1508+
return new AdaptedInterruptibleRunnable<Void>(
1509+
Objects.requireNonNull(runnable), null);
15011510
}
15021511

15031512
// Serialization support
@@ -1556,7 +1565,6 @@ static final class AdaptedRunnable<T> extends ForkJoinTask<T>
15561565
@SuppressWarnings("serial") // Conditionally serializable
15571566
T result;
15581567
AdaptedRunnable(Runnable runnable, T result) {
1559-
Objects.requireNonNull(runnable);
15601568
this.runnable = runnable;
15611569
this.result = result; // OK to set this even before completion
15621570
}
@@ -1578,7 +1586,6 @@ static final class AdaptedRunnableAction extends ForkJoinTask<Void>
15781586
@SuppressWarnings("serial") // Conditionally serializable
15791587
final Runnable runnable;
15801588
AdaptedRunnableAction(Runnable runnable) {
1581-
Objects.requireNonNull(runnable);
15821589
this.runnable = runnable;
15831590
}
15841591
public final Void getRawResult() { return null; }
@@ -1601,7 +1608,6 @@ static final class AdaptedCallable<T> extends ForkJoinTask<T>
16011608
@SuppressWarnings("serial") // Conditionally serializable
16021609
T result;
16031610
AdaptedCallable(Callable<? extends T> callable) {
1604-
Objects.requireNonNull(callable);
16051611
this.callable = callable;
16061612
}
16071613
public final T getRawResult() { return result; }
@@ -1636,6 +1642,9 @@ public String toString() {
16361642
abstract static class InterruptibleTask<T> extends ForkJoinTask<T>
16371643
implements RunnableFuture<T> {
16381644
transient volatile Thread runner;
1645+
InterruptibleTask() {
1646+
setNoUserHelp();
1647+
}
16391648
abstract T compute() throws Exception;
16401649
public final boolean exec() {
16411650
Thread.interrupted();
@@ -1655,20 +1664,29 @@ public final boolean exec() {
16551664
} finally {
16561665
runner = null;
16571666
}
1667+
return postExec();
1668+
}
1669+
boolean postExec() { // cleanup and return completion status to doExec
16581670
return true;
16591671
}
1672+
final boolean interruptIfRunning(boolean enabled) {
1673+
Thread t;
1674+
if ((t = runner) == null) // return false if not running
1675+
return false;
1676+
if (enabled) {
1677+
try {
1678+
t.interrupt();
1679+
} catch (Throwable ignore) {
1680+
}
1681+
}
1682+
return true;
1683+
}
16601684
public boolean cancel(boolean mayInterruptIfRunning) {
1661-
Thread t;
1662-
if (trySetCancelled() >= 0) {
1663-
if (mayInterruptIfRunning && (t = runner) != null) {
1664-
try {
1665-
t.interrupt();
1666-
} catch (Throwable ignore) {
1667-
}
1668-
}
1669-
return true;
1670-
}
1671-
return isCancelled();
1685+
int s;
1686+
if ((s = trySetCancelled()) < 0)
1687+
return ((s & (ABNORMAL | THROWN)) == ABNORMAL);
1688+
interruptIfRunning(mayInterruptIfRunning);
1689+
return true;
16721690
}
16731691
public final void run() { quietlyInvoke(); }
16741692
Object adaptee() { return null; } // for printing and diagnostics
@@ -1690,7 +1708,6 @@ static final class AdaptedInterruptibleCallable<T> extends InterruptibleTask<T>
16901708
@SuppressWarnings("serial") // Conditionally serializable
16911709
T result;
16921710
AdaptedInterruptibleCallable(Callable<? extends T> callable) {
1693-
Objects.requireNonNull(callable);
16941711
this.callable = callable;
16951712
}
16961713
public final T getRawResult() { return result; }
@@ -1709,7 +1726,6 @@ static final class AdaptedInterruptibleRunnable<T> extends InterruptibleTask<T>
17091726
@SuppressWarnings("serial") // Conditionally serializable
17101727
final T result;
17111728
AdaptedInterruptibleRunnable(Runnable runnable, T result) {
1712-
Objects.requireNonNull(runnable);
17131729
this.runnable = runnable;
17141730
this.result = result;
17151731
}
@@ -1727,7 +1743,6 @@ static final class RunnableExecuteAction extends InterruptibleTask<Void> {
17271743
@SuppressWarnings("serial") // Conditionally serializable
17281744
final Runnable runnable;
17291745
RunnableExecuteAction(Runnable runnable) {
1730-
Objects.requireNonNull(runnable);
17311746
this.runnable = runnable;
17321747
}
17331748
public final Void getRawResult() { return null; }
@@ -1793,9 +1808,11 @@ final T invokeAny(Collection<? extends Callable<T>> tasks,
17931808
throw new NullPointerException();
17941809
InvokeAnyTask<T> t = null; // list of submitted tasks
17951810
try {
1796-
for (Callable<T> c : tasks)
1811+
for (Callable<T> c : tasks) {
1812+
Objects.requireNonNull(c);
17971813
pool.execute((ForkJoinTask<?>)
17981814
(t = new InvokeAnyTask<T>(c, this, t)));
1815+
}
17991816
return timed ? get(nanos, TimeUnit.NANOSECONDS) : get();
18001817
} finally {
18011818
for (; t != null; t = t.pred)
@@ -1822,7 +1839,6 @@ static final class InvokeAnyTask<T> extends InterruptibleTask<Void> {
18221839
final InvokeAnyTask<T> pred; // to traverse on cancellation
18231840
InvokeAnyTask(Callable<T> callable, InvokeAnyRoot<T> root,
18241841
InvokeAnyTask<T> pred) {
1825-
Objects.requireNonNull(callable);
18261842
this.callable = callable;
18271843
this.root = root;
18281844
this.pred = pred;
@@ -1857,4 +1873,39 @@ final void onRootCompletion() {
18571873
public final void setRawResult(Void v) { }
18581874
final Object adaptee() { return callable; }
18591875
}
1876+
1877+
/**
1878+
* Adapter for Callable-based interruptible tasks with timeout actions.
1879+
*/
1880+
@SuppressWarnings("serial") // Conditionally serializable
1881+
static final class CallableWithTimeout<T> extends InterruptibleTask<T> {
1882+
Callable<? extends T> callable; // nulled out after use
1883+
ForkJoinTask<?> timeoutAction;
1884+
T result;
1885+
CallableWithTimeout(Callable<? extends T> callable,
1886+
ForkJoinTask<?> timeoutAction) {
1887+
this.callable = callable;
1888+
this.timeoutAction = timeoutAction;
1889+
}
1890+
public final T getRawResult() { return result; }
1891+
public final void setRawResult(T v) { result = v; }
1892+
final Object adaptee() { return callable; }
1893+
final T compute() throws Exception {
1894+
Callable<? extends T> c;
1895+
return ((c = callable) != null) ? c.call() : null;
1896+
}
1897+
final boolean postExec() { // cancel timeout action
1898+
ForkJoinTask<?> t;
1899+
callable = null;
1900+
if ((t = timeoutAction) != null) {
1901+
timeoutAction = null;
1902+
try {
1903+
t.cancel(false);
1904+
} catch (Error | RuntimeException ex) {
1905+
}
1906+
}
1907+
return true;
1908+
}
1909+
}
1910+
18601911
}

src/java.base/share/classes/java/util/concurrent/SubmissionPublisher.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -206,22 +206,6 @@ static final int roundCapacity(int cap) {
206206
(n >= BUFFER_CAPACITY_LIMIT) ? BUFFER_CAPACITY_LIMIT : n + 1;
207207
}
208208

209-
// default Executor setup; nearly the same as CompletableFuture
210-
211-
/**
212-
* Default executor -- ForkJoinPool.commonPool() unless it cannot
213-
* support parallelism.
214-
*/
215-
private static final Executor ASYNC_POOL =
216-
(ForkJoinPool.getCommonPoolParallelism() > 1) ?
217-
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
218-
219-
/** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
220-
private static final class ThreadPerTaskExecutor implements Executor {
221-
ThreadPerTaskExecutor() {} // prevent access constructor creation
222-
public void execute(Runnable r) { new Thread(r).start(); }
223-
}
224-
225209
/**
226210
* Clients (BufferedSubscriptions) are maintained in a linked list
227211
* (via their "next" fields). This works well for publish loops.
@@ -316,7 +300,7 @@ public SubmissionPublisher(Executor executor, int maxBufferCapacity) {
316300
* Flow.Subscriber#onNext(Object) onNext}.
317301
*/
318302
public SubmissionPublisher() {
319-
this(ASYNC_POOL, Flow.defaultBufferSize(), null);
303+
this(ForkJoinPool.asyncCommonPool(), Flow.defaultBufferSize(), null);
320304
}
321305

322306
/**

0 commit comments

Comments
 (0)