Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public abstract class AbstractActionInputPrefetcher implements ActionInputPrefet
protected final Path execRoot;
protected final RemoteOutputChecker remoteOutputChecker;

private final ActionOutputDirectoryHelper outputDirectoryHelper;
protected final ActionOutputDirectoryHelper outputDirectoryHelper;

/** The state of a directory tracked by {@link DirectoryTracker}, as explained below. */
enum DirectoryState {
Expand Down Expand Up @@ -129,8 +129,9 @@ private void setWritable(Path dir, DirectoryState newState) throws IOException {
directoryStateMap.compute(
dir,
(unusedKey, oldState) -> {
if (oldState == DirectoryState.TEMPORARILY_WRITABLE
|| oldState == DirectoryState.PERMANENTLY_WRITABLE) {
if (!forceRefetch(dir)
&& (oldState == DirectoryState.TEMPORARILY_WRITABLE
|| oldState == DirectoryState.PERMANENTLY_WRITABLE)) {
// Already writable, but must potentially upgrade from temporary to permanent.
return newState == DirectoryState.PERMANENTLY_WRITABLE ? newState : oldState;
}
Expand Down Expand Up @@ -162,8 +163,9 @@ void setOutputPermissions(Path dir) throws IOException {
directoryStateMap.compute(
dir,
(unusedKey, oldState) -> {
if (oldState == DirectoryState.OUTPUT_PERMISSIONS
|| oldState == DirectoryState.PERMANENTLY_WRITABLE) {
if (!forceRefetch(dir)
&& (oldState == DirectoryState.OUTPUT_PERMISSIONS
|| oldState == DirectoryState.PERMANENTLY_WRITABLE)) {
// Either the output permissions have already been set, or we're not changing the
// permissions ever again.
return oldState;
Expand Down Expand Up @@ -240,6 +242,12 @@ private boolean shouldDownloadFile(Path path, FileArtifactValue metadata)

protected abstract boolean canDownloadFile(Path path, FileArtifactValue metadata);

/**
* If true, then all previously acquired knowledge of the file system state of this path (e.g. the
* existence of tree artifact directories or previously downloaded files) must be discarded.
*/
protected abstract boolean forceRefetch(Path path);

/**
* Downloads file to the given path via its metadata.
*
Expand Down Expand Up @@ -565,15 +573,16 @@ private Completable downloadFileNoCheckRx(
return Completable.error(new CacheNotFoundException(digest, execPath));
}));

return downloadCache.executeIfNot(
return downloadCache.execute(
finalPath,
Completable.defer(
() -> {
if (shouldDownloadFile(finalPath, metadata)) {
return download;
}
return Completable.complete();
}));
}),
forceRefetch(finalPath));
}

private void finalizeDownload(
Expand Down Expand Up @@ -646,7 +655,7 @@ private void deletePartialDownload(Path path) {
}

private Completable plantSymlink(Symlink symlink) {
return downloadCache.executeIfNot(
return downloadCache.execute(
execRoot.getRelative(symlink.linkExecPath()),
Completable.defer(
() -> {
Expand All @@ -657,7 +666,8 @@ private Completable plantSymlink(Symlink symlink) {
link.delete();
link.createSymbolicLink(target);
return Completable.complete();
}));
}),
forceRefetch(execRoot.getRelative(symlink.linkExecPath())));
}

public ImmutableSet<Path> downloadedFiles() {
Expand Down Expand Up @@ -694,7 +704,7 @@ public void finalizeAction(Action action, OutputMetadataStore outputMetadataStor
}

var metadata = outputMetadataStore.getOutputMetadata(output);
if (!metadata.isRemote()) {
if (!canDownloadFile(output.getPath(), metadata)) {
continue;
}

Expand Down
13 changes: 13 additions & 0 deletions src/main/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ java_library(
srcs = glob(
["*.java"],
exclude = [
"ConcurrentArtifactPathTrie.java",
"ExecutionStatusException.java",
"ReferenceCountedChannel.java",
"ChannelConnectionWithServerCapabilitiesFactory.java",
Expand All @@ -51,6 +52,7 @@ java_library(
":ReferenceCountedChannel",
":Retrier",
":abstract_action_input_prefetcher",
":concurrent_artifact_path_trie",
":lease_service",
":remote_output_checker",
":scrubber",
Expand Down Expand Up @@ -208,6 +210,7 @@ java_library(
name = "remote_output_checker",
srcs = ["RemoteOutputChecker.java"],
deps = [
":concurrent_artifact_path_trie",
"//src/main/java/com/google/devtools/build/lib/actions:artifacts",
"//src/main/java/com/google/devtools/build/lib/actions:file_metadata",
"//src/main/java/com/google/devtools/build/lib/analysis:analysis_cluster",
Expand Down Expand Up @@ -281,6 +284,16 @@ java_library(
],
)

java_library(
name = "concurrent_artifact_path_trie",
srcs = ["ConcurrentArtifactPathTrie.java"],
deps = [
"//src/main/java/com/google/devtools/build/lib/actions:artifacts",
"//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
"//third_party:guava",
],
)

java_library(
name = "store",
srcs = ["Store.java"],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2026 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote;

import com.google.common.base.Preconditions;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.Artifact;
import com.google.devtools.build.lib.vfs.PathFragment;
import java.util.concurrent.ConcurrentSkipListSet;

/**
* A specialized concurrent trie that stores paths of artifacts and allows checking whether a given
* path is contained in (in the case of a tree artifact) or exactly matches (in any other case) an
* artifact in the trie.
*/
final class ConcurrentArtifactPathTrie {
// Invariant: no path in this set is a prefix of another path.
private final ConcurrentSkipListSet<PathFragment> paths =
new ConcurrentSkipListSet<>(PathFragment.HIERARCHICAL_COMPARATOR);

/**
* Adds the given {@link ActionInput} to the trie.
*
* <p>The caller must ensure that no object's path passed to this method is a prefix of any
* previously added object's path. Bazel enforces this for non-aggregate artifacts. Callers must
* not pass in {@link Artifact.TreeFileArtifact}s (which have exec paths that have their parent
* tree artifact's exec path as a prefix) or non-Artifact {@link ActionInput}s that violate this
* invariant.
*/
void add(ActionInput input) {
Preconditions.checkArgument(
!(input instanceof Artifact.TreeFileArtifact),
"TreeFileArtifacts should not be added to the trie: %s",
input);
paths.add(input.getExecPath());
}

/** Checks whether the given {@link PathFragment} is contained in an artifact in the trie. */
boolean contains(PathFragment execPath) {
// By the invariant of this set, there is at most one prefix of execPath in the set. Since the
// comparator sorts all children of a path right after the path itself, if such a prefix
// exists, it must thus sort right before execPath (or be equal to it).
var floorPath = paths.floor(execPath);
return floorPath != null && execPath.startsWith(floorPath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.common.base.Preconditions.checkArgument;

import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.RequestMetadata;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
import com.google.devtools.build.lib.actions.ActionOutputDirectoryHelper;
import com.google.devtools.build.lib.actions.Artifact;
import com.google.devtools.build.lib.actions.FileArtifactValue;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.events.Reporter;
Expand All @@ -31,7 +31,10 @@
import com.google.devtools.build.lib.vfs.OutputPermissions;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.Symlinks;
import java.io.IOException;
import java.util.Collection;
import javax.annotation.Nullable;

/**
* Stages output files that are stored remotely to the local filesystem.
Expand All @@ -44,6 +47,7 @@ public class RemoteActionInputFetcher extends AbstractActionInputPrefetcher {
private final String buildRequestId;
private final String commandId;
private final CombinedCache combinedCache;
private final ConcurrentArtifactPathTrie rewoundActionOutputs = new ConcurrentArtifactPathTrie();

RemoteActionInputFetcher(
Reporter reporter,
Expand Down Expand Up @@ -79,7 +83,18 @@ protected void prefetchVirtualActionInput(VirtualActionInput input) throws IOExc

@Override
protected boolean canDownloadFile(Path path, FileArtifactValue metadata) {
return metadata.isRemote();
// When action rewinding is enabled, an action that had remote metadata at some point during the
// build may have been re-executed locally to regenerate lost inputs, but may then be rewound
// again and thus have its (now local) outputs deleted. In this case, we need to download the
// outputs again, even if they are now considered local.
return metadata.isRemote() || (forceRefetch(path) && !path.exists(Symlinks.NOFOLLOW));
}

@Override
protected boolean forceRefetch(Path path) {
// Caches for download operations and output directory creation need to be disregarded for the
// outputs of rewound actions as they may have been deleted after they were first created.
return path.startsWith(execRoot) && rewoundActionOutputs.contains(path.relativeTo(execRoot));
}

@Override
Expand All @@ -92,7 +107,6 @@ protected ListenableFuture<Void> doDownloadFile(
Priority priority,
Reason reason)
throws IOException {
checkArgument(metadata.isRemote(), "Cannot download file that is not a remote file.");
RequestMetadata requestMetadata =
TracingMetadataUtils.buildMetadata(
buildRequestId,
Expand All @@ -117,4 +131,22 @@ protected ListenableFuture<Void> doDownloadFile(
execPath.toString(),
digest.getSizeBytes()));
}

public void handleRewoundActionOutputs(Collection<Artifact> outputs) {
// SkyframeActionExecutor#prepareForRewinding does *not* call this method because the
// RemoteActionFileSystem corresponds to an ActionFileSystemType with inMemoryFileSystem() ==
// true. While it is true that resetting outputDirectoryHelper isn't necessary to undo the
// caching of output directory creation during action preparation, we still need to reset here
// since outputDirectoryHelper is also used by AbstractActionInputPrefetcher.
outputDirectoryHelper.invalidateTreeArtifactDirectoryCreation(outputs);
for (Artifact output : outputs) {
// Action templates have TreeFileArtifacts as outputs, which isn't supported by the trie. We
// only need to track the tree artifacts themselves.
if (output instanceof Artifact.TreeFileArtifact) {
rewoundActionOutputs.add(output.getParent());
} else {
rewoundActionOutputs.add(output);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.Artifact;
Expand Down Expand Up @@ -156,8 +158,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -194,9 +196,10 @@ public class RemoteExecutionService {
private final Set<String> reportedErrors = new HashSet<>();

@SuppressWarnings("AllowVirtualThreads")
private final ExecutorService backgroundTaskExecutor =
Executors.newThreadPerTaskExecutor(
Thread.ofVirtual().name("remote-execution-bg-", 0).factory());
private final ListeningExecutorService backgroundTaskExecutor =
MoreExecutors.listeningDecorator(
Executors.newThreadPerTaskExecutor(
Thread.ofVirtual().name("remote-execution-bg-", 0).factory()));

private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final AtomicBoolean buildInterrupted = new AtomicBoolean(false);
Expand Down Expand Up @@ -1873,16 +1876,31 @@ public void uploadOutputs(

if (remoteOptions.remoteCacheAsync
&& !action.getSpawn().getResourceOwner().mayModifySpawnOutputsAfterExecution()) {
backgroundTaskExecutor.execute(
() -> {
try {
doUploadOutputs(action, spawnResult, onUploadComplete);
} catch (ExecException e) {
reportUploadError(e);
} catch (InterruptedException ignored) {
// ThreadPerTaskExecutor does not care about interrupt status.
}
});
var uploadDone = new CountDownLatch(1);
var future =
backgroundTaskExecutor.submit(
() -> {
try {
doUploadOutputs(action, spawnResult, onUploadComplete);
} catch (ExecException e) {
reportUploadError(e);
} catch (InterruptedException ignored) {
// ThreadPerTaskExecutor does not care about interrupt status.
} finally {
uploadDone.countDown();
}
});

if (outputService instanceof RemoteOutputService remoteOutputService
&& remoteOutputService.getRewoundActionSynchronizer()
instanceof RemoteRewoundActionSynchronizer remoteRewoundActionSynchronizer) {
remoteRewoundActionSynchronizer.registerOutputUploadTask(
action.getRemoteActionExecutionContext().getSpawnOwner(),
() -> {
future.cancel(true);
uploadDone.await();
});
}
} else {
doUploadOutputs(action, spawnResult, onUploadComplete);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,10 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
bazelOutputServiceChannel,
lastBuildId);
} else {
outputService = new RemoteOutputService(env.getDirectories());
outputService =
new RemoteOutputService(
env.getDirectories(),
buildRequestOptions != null && buildRequestOptions.rewindLostInputs);
}

if ((enableHttpCache || enableDiskCache) && !enableGrpcCache) {
Expand Down
Loading
Loading