Skip to content

Commit e77d78f

Browse files
committed
Dispose of async tasks
1 parent 8280acc commit e77d78f

File tree

7 files changed

+74
-7
lines changed

7 files changed

+74
-7
lines changed

src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@
152152
import java.util.concurrent.CompletableFuture;
153153
import java.util.concurrent.CompletionException;
154154
import java.util.concurrent.ConcurrentLinkedQueue;
155+
import java.util.concurrent.CountDownLatch;
155156
import java.util.concurrent.ExecutionException;
156157
import java.util.concurrent.Executor;
157158
import java.util.concurrent.Phaser;
@@ -1815,7 +1816,8 @@ public void uploadOutputs(RemoteAction action, SpawnResult spawnResult, Runnable
18151816
if (remoteOptions.remoteCacheAsync
18161817
&& !action.getSpawn().getResourceOwner().mayModifySpawnOutputsAfterExecution()) {
18171818
AtomicLong startTime = new AtomicLong();
1818-
var unused =
1819+
CountDownLatch done = new CountDownLatch(1);
1820+
var asyncUpload =
18191821
Single.using(
18201822
() -> {
18211823
backgroundTaskPhaser.register();
@@ -1837,10 +1839,20 @@ public void uploadOutputs(RemoteAction action, SpawnResult spawnResult, Runnable
18371839
backgroundTaskPhaser.arriveAndDeregister();
18381840
onUploadComplete.run();
18391841
cacheResource.release();
1842+
done.countDown();
1843+
// Clean up the done task, cancelling it is a no-op at this point.
1844+
outputService.cancelTasks(
1845+
action.getRemoteActionExecutionContext().getSpawnOwner());
18401846
},
18411847
/* eager= */ false)
18421848
.subscribeOn(scheduler)
18431849
.subscribe(result -> {}, this::reportUploadError);
1850+
outputService.registerCancellableTask(
1851+
action.getRemoteActionExecutionContext().getSpawnOwner(),
1852+
() -> {
1853+
asyncUpload.dispose();
1854+
done.await();
1855+
});
18441856
} else {
18451857
try (SilentCloseable c =
18461858
Profiler.instance().profile(ProfilerTask.UPLOAD_TIME, "upload outputs")) {

src/main/java/com/google/devtools/build/lib/remote/RemoteOutputService.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,16 @@
4949
import java.io.IOException;
5050
import java.util.Map;
5151
import java.util.UUID;
52+
import java.util.concurrent.ConcurrentHashMap;
5253
import java.util.function.Supplier;
5354
import javax.annotation.Nullable;
5455

5556
/** Output service implementation for the remote build without local output service daemon. */
5657
public class RemoteOutputService implements OutputService {
5758

5859
private final BlazeDirectories directories;
60+
private final ConcurrentHashMap<ActionExecutionMetadata, Cancellable> cancellableTasks =
61+
new ConcurrentHashMap<>();
5962

6063
@Nullable private RemoteOutputChecker remoteOutputChecker;
6164
@Nullable private RemoteActionInputFetcher actionInputFetcher;
@@ -240,4 +243,26 @@ public ArtifactPathResolver createPathResolverForArtifactValues(
240243
actionInputFetcher);
241244
return ArtifactPathResolver.createPathResolver(remoteFileSystem, fileSystem.getPath(execRoot));
242245
}
246+
247+
@Override
248+
public void registerCancellableTask(ActionExecutionMetadata action, Cancellable task) {
249+
// We don't expect to have multiple cancellable tasks for the same action, so we avoid the
250+
// overhead of a multi-valued map.
251+
cancellableTasks.merge(
252+
action,
253+
task,
254+
(oldTask, newTask) ->
255+
() -> {
256+
oldTask.cancel();
257+
newTask.cancel();
258+
});
259+
}
260+
261+
@Override
262+
public void cancelTasks(ActionExecutionMetadata action) throws InterruptedException {
263+
Cancellable task = cancellableTasks.remove(action);
264+
if (task != null) {
265+
task.cancel();
266+
}
267+
}
243268
}

src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context)
116116
try {
117117
thisExecution = LocalExecution.createIfDeduplicatable(action);
118118
if (shouldUploadLocalResults && thisExecution != null) {
119+
System.err.println("Deduplication");
119120
LocalExecution previousOrThisExecution =
120121
inFlightExecutions.merge(
121122
action.getActionKey(),
@@ -145,6 +146,7 @@ public CacheHandle lookup(Spawn spawn, SpawnExecutionContext context)
145146
Stopwatch fetchTime = Stopwatch.createStarted();
146147
InMemoryOutput inMemoryOutput;
147148
try (SilentCloseable c = prof.profile(REMOTE_DOWNLOAD, "download outputs")) {
149+
System.err.println("RemoteSpawnCache.lookup: download outputs " + result);
148150
inMemoryOutput = remoteExecutionService.downloadOutputs(action, result);
149151
}
150152
fetchTime.stop();

src/main/java/com/google/devtools/build/lib/skyframe/SkyframeActionExecutor.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1070,23 +1070,27 @@ public ActionStepOrResult run(Environment env)
10701070
boolean unlockRewindingLock = false;
10711071

10721072
try {
1073+
ActionStartedEvent event = new ActionStartedEvent(action, actionStartTimeNanos);
1074+
if (statusReporter != null) {
1075+
statusReporter.updateStatus(event);
1076+
}
1077+
env.getListener().post(event);
10731078
if (mustAcquireExclusiveRewindingLock(action)) {
10741079
try (SilentCloseable d =
10751080
profiler.profile(ProfilerTask.ACTION_LOCK, "action.acquireWriteLockForRewinding")) {
10761081
rewindingLock.writeLock().lockInterruptibly();
1082+
System.err.println("Acquired exclusive rewinding lock for " + action);
10771083
unlockRewindingLock = true;
1084+
outputService.cancelTasks(action);
10781085
}
10791086
}
1080-
ActionStartedEvent event = new ActionStartedEvent(action, actionStartTimeNanos);
1081-
if (statusReporter != null) {
1082-
statusReporter.updateStatus(event);
1083-
}
1084-
env.getListener().post(event);
10851087
if (actionFileSystemType().shouldDoEagerActionPrep()) {
10861088
try (SilentCloseable d = profiler.profile(ProfilerTask.INFO, "action.prepare")) {
10871089
// This call generally deletes any files at locations that are declared outputs of the
10881090
// action, although some actions perform additional work, while others intentionally
10891091
// keep previous outputs in place.
1092+
System.err.println(
1093+
"Deleting outputs of " + action + (wasRewound(action) ? " (rewound)" : ""));
10901094
action.prepare(
10911095
actionExecutionContext.getExecRoot(),
10921096
actionExecutionContext.getPathResolver(),
@@ -1121,6 +1125,7 @@ public ActionStepOrResult run(Environment env)
11211125
return ActionStepOrResult.of(e);
11221126
} finally {
11231127
if (unlockRewindingLock) {
1128+
System.err.println("Released exclusive rewinding lock for " + action);
11241129
rewindingLock.writeLock().unlock();
11251130
}
11261131
notifyActionCompletion(env.getListener(), !lostInputs);
@@ -1230,12 +1235,14 @@ private ActionStepOrResult executeAction(ExtendedEventHandler eventHandler, Acti
12301235
boolean unlockRewindingLock = false;
12311236
if (mustAcquireSharedRewindingLock()) {
12321237
rewindingLock.readLock().lockInterruptibly();
1238+
System.err.println("Acquired shared rewinding lock for " + action);
12331239
unlockRewindingLock = true;
12341240
}
12351241
try {
12361242
result = action.execute(actionExecutionContext);
12371243
} finally {
12381244
if (unlockRewindingLock) {
1245+
System.err.println("Released shared rewinding lock for " + action);
12391246
rewindingLock.readLock().unlock();
12401247
}
12411248
}

src/main/java/com/google/devtools/build/lib/vfs/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,5 +103,6 @@ java_library(
103103
"//src/main/protobuf:failure_details_java_proto",
104104
"//third_party:guava",
105105
"//third_party:jsr305",
106+
"//third_party:rxjava3",
106107
],
107108
)

src/main/java/com/google/devtools/build/lib/vfs/OutputService.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,4 +265,24 @@ default BulkDeleter bulkDeleter() {
265265
default XattrProvider getXattrProvider(XattrProvider delegate) {
266266
return delegate;
267267
}
268+
269+
/** A task with a cancellation callback. */
270+
@FunctionalInterface
271+
interface Cancellable {
272+
void cancel() throws InterruptedException;
273+
}
274+
275+
/**
276+
* Registers a cancellation callback for a task that may still be running after the action has
277+
* completed.
278+
*/
279+
default void registerCancellableTask(ActionExecutionMetadata action, Cancellable task) {
280+
throw new UnsupportedOperationException();
281+
}
282+
283+
/**
284+
* Cancels and awaits the completion of all tasks registered with {@link
285+
* #registerCancellableTask}.
286+
*/
287+
default void cancelTasks(ActionExecutionMetadata action) throws InterruptedException {}
268288
}

src/test/java/com/google/devtools/build/lib/skyframe/rewinding/RewindingTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public void multipleLostInputsForRewindPlan() throws Exception {
144144

145145
@Test
146146
public void multipleLostInputsForRewindPlan_standalone() throws Exception {
147-
addOptions("--remote_executor=", "--spawn_strategy=standalone");
147+
addOptions("--spawn_strategy=standalone", "--remote_executor=");
148148
helper.runMultipleLostInputsForRewindPlan();
149149
}
150150

0 commit comments

Comments
 (0)