Skip to content

Commit dcbfe39

Browse files
committed
Address comment by moving classes around
1 parent d8d9c1e commit dcbfe39

File tree

4 files changed

+349
-328
lines changed

4 files changed

+349
-328
lines changed

src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1779,8 +1779,10 @@ public void uploadOutputs(
17791779
}
17801780
});
17811781

1782-
if (outputService instanceof RemoteOutputService remoteOutputService) {
1783-
remoteOutputService.registerOutputUploadTask(
1782+
if (outputService instanceof RemoteOutputService remoteOutputService
1783+
&& remoteOutputService.getRewoundActionSynchronizer()
1784+
instanceof RemoteRewoundActionSynchronizer remoteRewoundActionSynchronizer) {
1785+
remoteRewoundActionSynchronizer.registerOutputUploadTask(
17841786
action.getRemoteActionExecutionContext().getSpawnOwner(),
17851787
() -> {
17861788
future.cancel(true);

src/main/java/com/google/devtools/build/lib/remote/RemoteImportantOutputHandler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.google.devtools.build.lib.actions.ImportantOutputHandler;
2828
import com.google.devtools.build.lib.actions.InputMetadataProvider;
2929
import com.google.devtools.build.lib.profiler.SilentCloseable;
30-
import com.google.devtools.build.lib.remote.RemoteOutputService.RemoteRewoundActionSynchronizer;
3130
import com.google.devtools.build.lib.remote.common.BulkTransferException;
3231
import com.google.devtools.build.lib.remote.util.Utils;
3332
import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;

src/main/java/com/google/devtools/build/lib/remote/RemoteOutputService.java

Lines changed: 1 addition & 325 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,12 @@
1616

1717
import static com.google.common.base.Preconditions.checkNotNull;
1818

19-
import com.github.benmanes.caffeine.cache.Caffeine;
20-
import com.github.benmanes.caffeine.cache.LoadingCache;
2119
import com.google.common.collect.ImmutableList;
22-
import com.google.common.collect.Iterables;
2320
import com.google.common.eventbus.Subscribe;
2421
import com.google.devtools.build.lib.actions.Action;
2522
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
2623
import com.google.devtools.build.lib.actions.ActionInputMap;
27-
import com.google.devtools.build.lib.actions.ActionLookupData;
2824
import com.google.devtools.build.lib.actions.Artifact;
29-
import com.google.devtools.build.lib.actions.Artifact.DerivedArtifact;
3025
import com.google.devtools.build.lib.actions.ArtifactPathResolver;
3126
import com.google.devtools.build.lib.actions.InputMetadataProvider;
3227
import com.google.devtools.build.lib.actions.LostInputsActionExecutionException;
@@ -35,9 +30,6 @@
3530
import com.google.devtools.build.lib.analysis.BlazeDirectories;
3631
import com.google.devtools.build.lib.buildtool.buildevent.ExecutionPhaseCompleteEvent;
3732
import com.google.devtools.build.lib.events.EventHandler;
38-
import com.google.devtools.build.lib.profiler.Profiler;
39-
import com.google.devtools.build.lib.profiler.ProfilerTask;
40-
import com.google.devtools.build.lib.profiler.SilentCloseable;
4133
import com.google.devtools.build.lib.server.FailureDetails.Execution;
4234
import com.google.devtools.build.lib.server.FailureDetails.Execution.Code;
4335
import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
@@ -53,20 +45,13 @@
5345
import java.io.IOException;
5446
import java.util.Map;
5547
import java.util.UUID;
56-
import java.util.concurrent.ConcurrentHashMap;
57-
import java.util.concurrent.locks.Condition;
58-
import java.util.concurrent.locks.Lock;
59-
import java.util.concurrent.locks.ReadWriteLock;
60-
import java.util.concurrent.locks.ReentrantReadWriteLock;
6148
import javax.annotation.Nullable;
6249

6350
/** Output service implementation for the remote build without local output service daemon. */
6451
public class RemoteOutputService implements OutputService {
6552

6653
private final BlazeDirectories directories;
6754
private final boolean rewindLostInputs;
68-
private final ConcurrentHashMap<ActionExecutionMetadata, Cancellable> outputUploadTasks =
69-
new ConcurrentHashMap<>();
7055

7156
private RewoundActionSynchronizer rewoundActionSynchronizer = RewoundActionSynchronizer.NOOP;
7257

@@ -86,7 +71,7 @@ void setRemoteOutputChecker(RemoteOutputChecker remoteOutputChecker) {
8671
void setActionInputFetcher(RemoteActionInputFetcher actionInputFetcher) {
8772
this.actionInputFetcher = checkNotNull(actionInputFetcher, "actionInputFetcher");
8873
if (rewindLostInputs) {
89-
rewoundActionSynchronizer = new RemoteRewoundActionSynchronizer();
74+
this.rewoundActionSynchronizer = new RemoteRewoundActionSynchronizer(actionInputFetcher);
9075
}
9176
}
9277

@@ -249,317 +234,8 @@ public void checkActionFileSystemForLostInputs(FileSystem actionFileSystem, Acti
249234
}
250235
}
251236

252-
/** A task with a cancellation callback. */
253-
public interface Cancellable {
254-
void cancel() throws InterruptedException;
255-
}
256-
257-
/**
258-
* Registers a cancellation callback for an upload of action outputs that may still be running
259-
* after the action has completed.
260-
*/
261-
public void registerOutputUploadTask(ActionExecutionMetadata action, Cancellable task) {
262-
// We don't expect to have multiple output upload tasks for the same action registered at the
263-
// same time.
264-
outputUploadTasks.merge(
265-
action,
266-
task,
267-
(oldTask, newTask) -> {
268-
throw new IllegalStateException(
269-
"Attempted to register multiple output upload tasks for %s: %s and %s"
270-
.formatted(action, oldTask, newTask));
271-
});
272-
}
273-
274-
/**
275-
* Cancels and awaits the completion of all tasks registered with {@link
276-
* #registerOutputUploadTask}.
277-
*/
278-
public void cancelOutputUploadTasks(ActionExecutionMetadata action) throws InterruptedException {
279-
Cancellable task = outputUploadTasks.remove(action);
280-
if (task != null) {
281-
task.cancel();
282-
}
283-
}
284-
285237
@Override
286238
public RewoundActionSynchronizer getRewoundActionSynchronizer() {
287239
return rewoundActionSynchronizer;
288240
}
289-
290-
/**
291-
* A {@link RewoundActionSynchronizer} implementation for Bazel's remote filesystem, which is
292-
* backed by actual files on disk and requires synchronization to ensure that action outputs
293-
* aren't deleted while they are being read.
294-
*/
295-
final class RemoteRewoundActionSynchronizer implements RewoundActionSynchronizer {
296-
// A single coarse lock is used to synchronize rewound actions (writers) and both rewound and
297-
// non-rewound actions (readers) as long as no rewound action has attempted to prepare for its
298-
// execution.
299-
// This ensures high throughput and low memory footprint for the common case of no rewound
300-
// actions. In this case, there won't be any writers and the performance characteristics of a
301-
// ReentrantReadWriteLock are comparable to that of an atomic counter.
302-
// Note that it wouldn't be correct to only start using this lock once an action is rewound,
303-
// because a non-rewound action consuming its non-lost outputs could have already started
304-
// executing.
305-
@Nullable private volatile ReadWriteLock coarseLock = new ReentrantReadWriteLock();
306-
307-
// A fine-grained lock structure that is switched to when the first rewound action attempts to
308-
// prepare for its execution. This structure is used to ensure that rewound actions do not
309-
// delete their outputs while they are being read by other actions, while still allowing
310-
// rewound actions and non-rewound actions to run concurrently (i.e., not force the equivalent
311-
// of --jobs=1 for as long as a rewound action is running, as the coarse lock would).
312-
// A rewound action will acquire a write lock on its lookup data before it prepares for
313-
// execution, while any action will acquire a read lock on the lookup data of any generating
314-
// action of its inputs before it starts executing.
315-
// The values of this cache are weakly referenced to ensure that locks are cleaned up when they
316-
// are no longer needed.
317-
@Nullable private volatile LoadingCache<ActionLookupData, ReadWriteLock> fineLocks;
318-
319-
/*
320-
Proof of deadlock freedom:
321-
322-
As long as the coarse lock is used, there can't be any deadlock because there is only a single
323-
read-write lock.
324-
325-
Now assume that there is a deadlock while the fine locks are used. First, note that the logic in
326-
ImportantOutputHandler that is guarded by enterProcessOutputsAndGetLostArtifacts does not block
327-
on any (rewound or non-rewound) action executions while it holds read locks and can thus be
328-
ignored in the following. Consider the directed labeled "wait-for" graph defined as follows:
329-
330-
* Nodes are given by the currently active Skyframe action execution threads, each of which is
331-
identified with the action it is (or will be) executing. Actions are in one-to-one
332-
correspondence with the ActionLookupData that is used as the key in the fine locks map.
333-
* For each pair of actions A_1 and A_2, there is an edge from A_1 to A_2 labeled with XY(A_3)
334-
if A_1 is waiting for the X lock of A_3 and A_2 currently holds the Y lock of A_3, where X and
335-
Y are either R (for read) or W (for write). The resulting graph may have parallel edges with
336-
distinct labels.
337-
338-
Let C be any directed cycle in the graph representing a deadlock, let A_1 -[XY(A_3)]-> A_2 be an
339-
edge in C and consider the following cases for the pair XY:
340-
341-
* RR: Since a read-write lock whose read lock is held by at least one thread doesn't
342-
block any other thread from acquiring its read lock, this case doesn't occur.
343-
* WW: The write lock of A_3 is only ever (attempted to be) acquired by A_3 itself when it is
344-
rewound, which means that the edge would necessarily be of the shape A_3 -[WW(A_3)]-> A_3.
345-
But this isn't possible since the write lock for an action is only acquired in one place (
346-
enterActionPreparationForRewinding) and not recursively.
347-
* WR: In this case, A_1 attempts to acquire a write lock, which only happens when A_1 is a
348-
rewound action about to prepare for its (re-)execution. This means that the edge is
349-
necessarily of the shape A_1 -[WR(A_1)]-> A_2. While a rewound action is waiting for its
350-
own write lock in enterActionPreparation, it doesn't hold any locks since
351-
enterActionExecution hasn't been called yet in SkyframeActionExecutor and all past
352-
executions of the action have released all their locks due to use of try-with-resources.
353-
This means that A_1 can't have any incoming edges in the wait-for graph, which is a
354-
contradiction to the assumption that it is contained in the directed cycle C.
355-
356-
We conclude that XY = RW. Since the write lock of A_3 is only ever acquired by A_3 itself, all
357-
edges in C are of the form A_1 -[RW(A_2)]-> A_2. But by construction of inputKeysFor, the
358-
action A_1 is attempting to acquire the read locks of all its inputs' generating actions, and
359-
thus the action A_1 depends on one of the outputs of A_2 (*).
360-
361-
Applied to all edges of C, we conclude that there is a corresponding directed cycle in the
362-
action graph, which is a contradiction since Bazel disallows dependency cycles.
363-
364-
Notes:
365-
* The proof would not go through at (*) if fineLocks were replaced by a Striped lock structure
366-
with a fixed number of locks. In fact, this gives rise to a deadlock if the number of stripes
367-
is at least 2, but low enough that distinct generating actions hash to the same stripe.
368-
*/
369-
370-
@Override
371-
public SilentCloseable enterActionPreparation(Action action, boolean wasRewound)
372-
throws InterruptedException {
373-
// Skyframe schedules non-rewound actions such that they never run concurrently with actions
374-
// that consume their outputs.
375-
if (!wasRewound) {
376-
return () -> {};
377-
}
378-
try (SilentCloseable c =
379-
Profiler.instance().profile(ProfilerTask.ACTION_LOCK, "action.enterActionPreparation")) {
380-
return enterActionPreparationForRewinding(action);
381-
}
382-
}
383-
384-
private SilentCloseable enterActionPreparationForRewinding(Action action)
385-
throws InterruptedException {
386-
var localCoarseLock = coarseLock;
387-
if (localCoarseLock != null) {
388-
// This is the first time a rewound action has attempted to prepare for its execution.
389-
// Switch to using the fine locks under the protection of the coarse write lock.
390-
localCoarseLock.writeLock().lockInterruptibly();
391-
try {
392-
// Check again under the lock to avoid a race between multiple rewound actions attempting
393-
// to prepare for execution at the same time.
394-
if (fineLocks == null) {
395-
fineLocks =
396-
Caffeine.newBuilder()
397-
.weakValues()
398-
// TODO: Investigate whether fair locks would be beneficial.
399-
.build((ActionLookupData unused) -> new WeakSafeReentrantReadWriteLock());
400-
coarseLock = null;
401-
}
402-
} finally {
403-
localCoarseLock.writeLock().unlock();
404-
}
405-
}
406-
407-
var writeLock = fineLocks.get(outputKeyFor(action)).writeLock();
408-
writeLock.lockInterruptibly();
409-
prepareOutputsForRewinding(action);
410-
return writeLock::unlock;
411-
}
412-
413-
/**
414-
* Cancels all async tasks that operate on the action's outputs and resets any cached data about
415-
* their prefetching state.
416-
*/
417-
private void prepareOutputsForRewinding(Action action) throws InterruptedException {
418-
cancelOutputUploadTasks(action);
419-
actionInputFetcher.handleRewoundActionOutputs(action.getOutputs());
420-
}
421-
422-
@Override
423-
public SilentCloseable enterActionExecution(
424-
Action action, InputMetadataProvider metadataProvider) throws InterruptedException {
425-
try (SilentCloseable c =
426-
Profiler.instance().profile(ProfilerTask.ACTION_LOCK, "action.enterActionExecution")) {
427-
return lockArtifactsForConsumption(
428-
() -> action.getInputs().toList().iterator(), metadataProvider);
429-
}
430-
}
431-
432-
/**
433-
* Guards a call to {@link
434-
* RemoteImportantOutputHandler#processOutputsAndGetLostArtifacts(Iterable,
435-
* InputMetadataProvider, InputMetadataProvider)}.
436-
*/
437-
public SilentCloseable enterProcessOutputsAndGetLostArtifacts(
438-
Iterable<Artifact> importantOutputs, InputMetadataProvider fullMetadataProvider)
439-
throws InterruptedException {
440-
try (SilentCloseable c =
441-
Profiler.instance()
442-
.profile(ProfilerTask.ACTION_LOCK, "action.enterProcessOutputsAndGetLostArtifacts")) {
443-
return lockArtifactsForConsumption(importantOutputs, fullMetadataProvider);
444-
}
445-
}
446-
447-
private SilentCloseable lockArtifactsForConsumption(
448-
Iterable<Artifact> artifacts, InputMetadataProvider metadataProvider)
449-
throws InterruptedException {
450-
var localCoarseLock = coarseLock;
451-
if (localCoarseLock != null) {
452-
// Common case for builds without any rewound actions: acquire the single lock that is never
453-
// acquired by a writer.
454-
localCoarseLock.readLock().lockInterruptibly();
455-
}
456-
// Read the fine locks after acquiring the coarse lock to allow the fine locks to be inflated
457-
// lazily.
458-
var localFineLocks = fineLocks;
459-
if (localFineLocks == null) {
460-
// Continuation of the common case for builds without any rewound actions: the fine locks
461-
// have not been inflated.
462-
return localCoarseLock.readLock()::unlock;
463-
}
464-
465-
// At this point, there has been at least one rewound action that has inflated the fine locks.
466-
// We need to switch to it.
467-
if (localCoarseLock != null) {
468-
localCoarseLock.readLock().unlock();
469-
}
470-
var allReadWriteLocks =
471-
localFineLocks.getAll(inputKeysFor(artifacts, metadataProvider)).values();
472-
var locksToUnlockBuilder =
473-
ImmutableList.<Lock>builderWithExpectedSize(allReadWriteLocks.size());
474-
try {
475-
for (var readWriteLock : allReadWriteLocks) {
476-
var readLock = readWriteLock.readLock();
477-
readLock.lockInterruptibly();
478-
locksToUnlockBuilder.add(readLock);
479-
}
480-
} catch (InterruptedException e) {
481-
for (var readLock : locksToUnlockBuilder.build()) {
482-
readLock.unlock();
483-
}
484-
throw e;
485-
}
486-
var locksToUnlock = locksToUnlockBuilder.build();
487-
return () -> locksToUnlock.forEach(Lock::unlock);
488-
}
489-
490-
private static Iterable<ActionLookupData> inputKeysFor(
491-
Iterable<Artifact> artifacts, InputMetadataProvider metadataProvider) {
492-
var allArtifacts =
493-
Iterables.concat(
494-
artifacts,
495-
Iterables.concat(
496-
Iterables.transform(
497-
metadataProvider.getRunfilesTrees(),
498-
runfilesTree -> runfilesTree.getArtifacts().toList())));
499-
return Iterables.transform(
500-
Iterables.filter(allArtifacts, artifact -> artifact instanceof DerivedArtifact),
501-
artifact -> ((DerivedArtifact) artifact).getGeneratingActionKey());
502-
}
503-
504-
private static ActionLookupData outputKeyFor(Action action) {
505-
return ((DerivedArtifact) action.getPrimaryOutput()).getGeneratingActionKey();
506-
}
507-
508-
// Classes below are based on Guava's Striped class, but optimized for memory usage by using
509-
// extension rather than delegation:
510-
// https://github.com/google/guava/blob/d25d62fc843ece1c3866859bc8639b815093eac8/guava/src/com/google/common/util/concurrent/Striped.java#L282-L326
511-
512-
/**
513-
* ReadWriteLock implementation whose read and write locks retain a reference back to this lock.
514-
* Otherwise, a reference to just the read lock or just the write lock would not suffice to
515-
* ensure the {@code ReadWriteLock} is retained.
516-
*
517-
* <p>{@see https://bugs.openjdk.org/browse/JDK-8189598}
518-
*/
519-
private static final class WeakSafeReentrantReadWriteLock extends ReentrantReadWriteLock {
520-
private final WeakSafeReadLock readLock = new WeakSafeReadLock(this);
521-
private final WeakSafeWriteLock writeLock = new WeakSafeWriteLock(this);
522-
523-
@Override
524-
public WeakSafeReadLock readLock() {
525-
return readLock;
526-
}
527-
528-
@Override
529-
public WeakSafeWriteLock writeLock() {
530-
return writeLock;
531-
}
532-
}
533-
534-
/**
535-
* A read lock that ensures a strong reference is retained to the owning {@link ReadWriteLock}.
536-
*/
537-
private static final class WeakSafeReadLock extends ReentrantReadWriteLock.ReadLock {
538-
@SuppressWarnings({"unused", "FieldCanBeLocal"})
539-
private final WeakSafeReentrantReadWriteLock strongReference;
540-
541-
WeakSafeReadLock(WeakSafeReentrantReadWriteLock readWriteLock) {
542-
super(readWriteLock);
543-
this.strongReference = readWriteLock;
544-
}
545-
}
546-
547-
/**
548-
* A write lock that ensures a strong reference is retained to the owning {@link ReadWriteLock}.
549-
*/
550-
private static final class WeakSafeWriteLock extends ReentrantReadWriteLock.WriteLock {
551-
@SuppressWarnings({"unused", "FieldCanBeLocal"})
552-
private final WeakSafeReentrantReadWriteLock strongReference;
553-
554-
WeakSafeWriteLock(WeakSafeReentrantReadWriteLock readWriteLock) {
555-
super(readWriteLock);
556-
this.strongReference = readWriteLock;
557-
}
558-
559-
@Override
560-
public Condition newCondition() {
561-
throw new UnsupportedOperationException();
562-
}
563-
}
564-
}
565241
}

0 commit comments

Comments
 (0)