Skip to content

Commit a0760f1

Browse files
tjgqcopybara-github
authored andcommitted
Fix infinite spin-wait and missing cancellation propagation in TaskDeduplicator. (#28938)
The test TaskDeduplicatorTest.executeIfNeeded_executeAndCancelLoop_noErrors sporadically hung during the ExecutorService.close() call. This was due to two issues: 1. In executeIfNew, if a thread encountered a RefcountedFuture that was already canceled (refcount = 0), it would call Thread.yield() and continue the while(true) loop. It relied on a listener attached to the canceled future to eventually remove it from the map. However, when using virtual threads, many threads could enter this spin loop simultaneously, saturating the underlying carrier threads. This prevented the listener from being scheduled, creating a deadlock where the spinning threads never saw the entry removed. 2. RefcountedFuture was not propagating the cancel() call to its delegate future. This meant that even if all callers canceled their interest in a task, the task would continue to execute in the background, wasting resources and increasing contention. This PR makes the following changes: 1. executeIfNew and maybeJoinExecution are modified to explicitly call inFlightTasks.remove(key, future) if retain() fails. This ensures the spin loop is broken immediately by the next thread to encounter the canceled future, rather than waiting for an asynchronous listener. 2. RefcountedFuture.cancel now calls delegate.cancel(mayInterruptIfRunning) when the internal reference count drops to zero. 3. Thread.yield() and the associated @SuppressWarnings("ThreadPriorityCheck") are removed, as they are no longer necessary. Fixes #28302. Closes #28938. PiperOrigin-RevId: 883147973 Change-Id: I28c1db252573a4c39b1a9e53d32e218327340054
1 parent 17b4415 commit a0760f1

File tree

1 file changed

+13
-12
lines changed

1 file changed

+13
-12
lines changed

src/main/java/com/google/devtools/build/lib/concurrent/TaskDeduplicator.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ public final class TaskDeduplicator<K, V> {
4545
* effects.
4646
*/
4747
@CheckReturnValue
48-
@SuppressWarnings("ThreadPriorityCheck") // for Thread.yield()
4948
public ListenableFuture<V> executeIfNew(K key, Supplier<ListenableFuture<V>> taskSupplier) {
5049
while (true) {
5150
var isNewHolder = new boolean[1];
@@ -59,12 +58,9 @@ public ListenableFuture<V> executeIfNew(K key, Supplier<ListenableFuture<V>> tas
5958
if (isNewHolder[0]) {
6059
future.addListener(() -> inFlightTasks.remove(key, future), directExecutor());
6160
} else {
62-
// The shared future may have been canceled between the lookup and the call to retain(). In
63-
// that unlikely case, just look it up again - the listener above will remove it.
61+
// The shared future may have been canceled between the lookup and the call to retain().
6462
if (!future.retain()) {
65-
// Avoid spinning to increase the chance that the listener gets to run and removes the
66-
// canceled future.
67-
Thread.yield();
63+
inFlightTasks.remove(key, future);
6864
continue;
6965
}
7066
}
@@ -100,7 +96,11 @@ public ListenableFuture<V> executeUnconditionally(
10096
@Nullable
10197
public ListenableFuture<V> maybeJoinExecution(K key) {
10298
var future = inFlightTasks.get(key);
103-
if (future == null || !future.retain()) {
99+
if (future == null) {
100+
return null;
101+
}
102+
if (!future.retain()) {
103+
inFlightTasks.remove(key, future);
104104
return null;
105105
}
106106
return IndividuallyCancelableFuture.wrap(future);
@@ -125,20 +125,21 @@ static <V> RefcountedFuture<V> wrap(ListenableFuture<V> delegate) {
125125

126126
RefcountedFuture(ListenableFuture<V> delegate) {
127127
this.delegate = delegate;
128+
setFuture(delegate);
128129
}
129130

130131
@Override
131-
public void run() {
132-
setFuture(delegate);
133-
}
132+
public void run() {}
134133

135134
@Override
136135
public boolean cancel(boolean mayInterruptIfRunning) {
137136
if (!mayInterruptIfRunning) {
138137
this.mayInterruptIfRunning = false;
139138
}
140-
return refcount.updateAndGet(oldCount -> oldCount >= 1 ? oldCount - 1 : 0) == 0
141-
&& super.cancel(this.mayInterruptIfRunning);
139+
if (refcount.updateAndGet(oldCount -> oldCount >= 1 ? oldCount - 1 : 0) == 0) {
140+
return super.cancel(this.mayInterruptIfRunning);
141+
}
142+
return false;
142143
}
143144

144145
@Nullable

0 commit comments

Comments
 (0)