Skip to content

Commit b8ccec7

Browse files
author
duke
committed
Backport cc5b35bf69dcf9e7e8037642c94e8d7e5847952d
1 parent 5de7202 commit b8ccec7

File tree

2 files changed

+191
-18
lines changed

2 files changed

+191
-18
lines changed

src/java.base/share/classes/java/util/concurrent/DelayScheduler.java

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -360,31 +360,36 @@ private static int replace(ScheduledForkJoinTask<?>[] h, int k, int n) {
360360
u.heapIndex = -1;
361361
}
362362
}
363-
if (t != null) { // sift down
364-
for (int cs; (cs = (k << 2) + 1) < n; ) {
365-
ScheduledForkJoinTask<?> leastChild = null, c;
363+
if (t != null) {
364+
while (k > 0) { // sift up if replaced with smaller value
365+
ScheduledForkJoinTask<?> parent; int pk;
366+
if ((parent = h[pk = (k - 1) >>> 2]) == null ||
367+
parent.when <= d)
368+
break;
369+
parent.heapIndex = k;
370+
h[k] = parent;
371+
k = pk;
372+
}
373+
for (int cs; (cs = (k << 2) + 1) < n; ) { // sift down
374+
ScheduledForkJoinTask<?> leastChild = null;
366375
int leastIndex = 0;
367-
long leastValue = Long.MAX_VALUE;
368-
for (int ck = cs, j = 4;;) { // at most 4 children
369-
if ((c = h[ck]) == null)
370-
break;
371-
long cd = c.when;
372-
if (c.status < 0 && alsoReplace < 0) {
373-
alsoReplace = ck; // at most once per pass
374-
c.heapIndex = -1;
375-
}
376-
else if (leastChild == null || cd < leastValue) {
376+
long leastValue = d; // at most 4 children
377+
for (int ck, j = 0; j < 4 && (ck = j + cs) < n; ++j) {
378+
ScheduledForkJoinTask<?> c; long cd;
379+
if ((c = h[ck]) != null && (cd = c.when) < leastValue) {
377380
leastValue = cd;
378381
leastIndex = ck;
379382
leastChild = c;
380383
}
381-
if (--j == 0 || ++ck >= n)
382-
break;
383384
}
384-
if (leastChild == null || d <= leastValue)
385+
if (leastChild == null) // already ordered
385386
break;
386-
leastChild.heapIndex = k;
387-
h[k] = leastChild;
387+
if ((h[k] = leastChild).status >= 0 || alsoReplace >= 0)
388+
leastChild.heapIndex = k;
389+
else {
390+
leastChild.heapIndex = -1;
391+
alsoReplace = k;
392+
}
388393
k = leastIndex;
389394
}
390395
t.heapIndex = k;
@@ -393,6 +398,7 @@ else if (leastChild == null || cd < leastValue) {
393398
k = alsoReplace;
394399
}
395400
}
401+
assert checkHeap(h, n);
396402
return n;
397403
}
398404

@@ -451,6 +457,33 @@ private static void cancelAll(ScheduledForkJoinTask<?>[] h, int n) {
451457
}
452458
}
453459

460+
/**
461+
* Invariant checks
462+
*/
463+
private static boolean checkHeap(ScheduledForkJoinTask<?>[] h, int n) {
464+
for (int i = 0; i < h.length; ++i) {
465+
ScheduledForkJoinTask<?> t = h[i];
466+
if (t == null) { // unused slots all null
467+
if (i < n)
468+
return false;
469+
}
470+
else {
471+
long v = t.when;
472+
int x = t.heapIndex;
473+
if (x != i && x >= 0) // valid index unless removing
474+
return false;
475+
if (i > 0 && h[(i - 1) >>> 2].when > v) // ordered wrt parent
476+
return false;
477+
int cs = (i << 2) + 1; // ordered wrt children
478+
for (int ck, j = 0; j < 4 && (ck = cs + j) < n; ++j) {
479+
if (h[ck].when < v)
480+
return false;
481+
}
482+
}
483+
}
484+
return true;
485+
}
486+
454487
/**
455488
* Task class for DelayScheduler operations
456489
*/
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
3+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4+
*
5+
* This code is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU General Public License version 2 only, as
7+
* published by the Free Software Foundation.
8+
*
9+
* This code is distributed in the hope that it will be useful, but WITHOUT
10+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12+
* version 2 for more details (a copy is included in the LICENSE file that
13+
* accompanied this code).
14+
*
15+
* You should have received a copy of the GNU General Public License version
16+
* 2 along with this work; if not, write to the Free Software Foundation,
17+
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18+
*
19+
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20+
* or visit www.oracle.com if you need additional information or have any
21+
* questions.
22+
*/
23+
24+
/*
25+
* @test
26+
* @bug 8370887
27+
* @summary Test that cancelling a delayed task doesn't impact the ordering that other
28+
* delayed tasks execute
29+
*/
30+
31+
import java.time.Duration;
32+
import java.time.Instant;
33+
import java.util.Arrays;
34+
import java.util.concurrent.ForkJoinPool;
35+
import java.util.concurrent.Future;
36+
import java.util.concurrent.LinkedTransferQueue;
37+
import java.util.stream.Collectors;
38+
import java.util.stream.IntStream;
39+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
40+
41+
public class AscendingOrderAfterReplace {
42+
43+
private static final int[] DELAYS_IN_MS = { 3000, 3400, 3900, 3800, 3700, 3600, 3430, 3420, 3310, 3500, 3200 };
44+
45+
public static void main(String[] args) throws Exception {
46+
for (int i = 1; i < DELAYS_IN_MS.length; i++) {
47+
System.out.println("=== Test " + i + " ===");
48+
while (!testCancel(DELAYS_IN_MS, i)) { }
49+
}
50+
}
51+
52+
/**
53+
* Schedule the delayed tasks, cancel one of them, and check that the remaining tasks
54+
* execute in the ascending order of delay.
55+
* @return true if the test passed, false if a retry is needed
56+
* @throws RuntimeException if the test fails
57+
*/
58+
private static boolean testCancel(int[] delays, int indexToCancel) throws Exception {
59+
log("Delayed tasks: " + toString(delays));
60+
61+
// delayed tasks add to this queue when they execute
62+
var queue = new LinkedTransferQueue<Integer>();
63+
64+
// pool with one thread to ensure that delayed tasks don't execute concurrently
65+
try (var pool = new ForkJoinPool(1)) {
66+
long startNanos = System.nanoTime();
67+
Future<?>[] futures = Arrays.stream(delays)
68+
.mapToObj(d -> pool.schedule(() -> {
69+
log("Triggered " + d);
70+
queue.add(d);
71+
}, d, MILLISECONDS))
72+
.toArray(Future[]::new);
73+
long endNanos = System.nanoTime();
74+
log("Delayed tasks submitted");
75+
76+
// check submit took < min diffs between two delays
77+
long submitTime = Duration.ofNanos(endNanos - startNanos).toMillis();
78+
long minDiff = minDifference(delays);
79+
if (submitTime >= minDiff) {
80+
log("Submit took >= " + minDiff + " ms, need to retry");
81+
pool.shutdownNow();
82+
return false;
83+
}
84+
85+
// give a bit of time for -delayScheduler thread to process pending tasks
86+
Thread.sleep(minValue(delays) / 2);
87+
log("Cancel " + delays[indexToCancel]);
88+
futures[indexToCancel].cancel(true);
89+
}
90+
91+
// delayed tasks should have executed in ascending order of their delay
92+
int[] executed = queue.stream().mapToInt(Integer::intValue).toArray();
93+
log("Executed: " + toString(executed));
94+
if (!isAscendingOrder(executed)) {
95+
throw new RuntimeException("Not in ascending order!");
96+
}
97+
return true;
98+
}
99+
100+
/**
101+
* Return the minimum element.
102+
*/
103+
private static int minValue(int[] array) {
104+
return IntStream.of(array).min().orElseThrow();
105+
}
106+
107+
/**
108+
* Return the minimum difference between any two elements.
109+
*/
110+
private static int minDifference(int[] array) {
111+
int[] sorted = array.clone();
112+
Arrays.sort(sorted);
113+
return IntStream.range(1, sorted.length)
114+
.map(i -> sorted[i] - sorted[i - 1])
115+
.min()
116+
.orElse(0);
117+
}
118+
119+
/**
120+
* Return true if the array is in ascending order.
121+
*/
122+
private static boolean isAscendingOrder(int[] array) {
123+
return IntStream.range(1, array.length)
124+
.allMatch(i -> array[i - 1] <= array[i]);
125+
}
126+
127+
/**
128+
* Returns a String containing the elements of an array in index order.
129+
*/
130+
private static String toString(int[] array) {
131+
return IntStream.of(array)
132+
.mapToObj(Integer::toString)
133+
.collect(Collectors.joining(", ", "[", "]"));
134+
}
135+
136+
private static void log(String message) {
137+
System.out.println(Instant.now() + " " + message);
138+
}
139+
}
140+

0 commit comments

Comments
 (0)