Skip to content

Commit 99dc964

Browse files
committed
PROTOTYPE: support remote cache chunking using FastCDC
1 parent d5a9978 commit 99dc964

23 files changed

+1265
-8
lines changed

src/main/java/com/google/devtools/build/lib/remote/BUILD

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ java_library(
2828
srcs = glob(
2929
["*.java"],
3030
exclude = [
31+
"ChunkingConfig.java",
3132
"ExecutionStatusException.java",
33+
"FastCDCChunker.java",
3234
"ReferenceCountedChannel.java",
3335
"ChannelConnectionWithServerCapabilitiesFactory.java",
3436
"RemoteRetrier.java",
@@ -53,6 +55,7 @@ java_library(
5355
":Retrier",
5456
":abstract_action_input_prefetcher",
5557
":lease_service",
58+
"//src/main/java/com/google/devtools/build/lib/concurrent:task_deduplicator",
5659
":remote_important_output_handler",
5760
":remote_output_checker",
5861
":scrubber",
@@ -97,6 +100,7 @@ java_library(
97100
"//src/main/java/com/google/devtools/build/lib/exec/local",
98101
"//src/main/java/com/google/devtools/build/lib/packages/semantics",
99102
"//src/main/java/com/google/devtools/build/lib/profiler",
103+
"//src/main/java/com/google/devtools/build/lib/remote/chunking",
100104
"//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker",
101105
"//src/main/java/com/google/devtools/build/lib/remote/common",
102106
"//src/main/java/com/google/devtools/build/lib/remote/common:bulk_transfer_exception",
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Copyright 2026 The Bazel Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.google.devtools.build.lib.remote;
16+
17+
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
18+
import build.bazel.remote.execution.v2.Digest;
19+
import build.bazel.remote.execution.v2.SplitBlobResponse;
20+
import com.google.common.flogger.GoogleLogger;
21+
import com.google.common.util.concurrent.ListenableFuture;
22+
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
23+
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
24+
import java.io.IOException;
25+
import java.io.OutputStream;
26+
import java.util.concurrent.ExecutionException;
27+
import java.util.List;
28+
29+
/**
30+
* Downloads blobs by sequentially fetching chunks via the SplitBlob API.
31+
*/
32+
public class ChunkedBlobDownloader {
33+
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
34+
35+
private final GrpcCacheClient grpcCacheClient;
36+
37+
public ChunkedBlobDownloader(GrpcCacheClient grpcCacheClient) {
38+
this.grpcCacheClient = grpcCacheClient;
39+
}
40+
41+
/**
42+
* Downloads a blob using chunked download via the SplitBlob API. This should be
43+
* called with virtual threads.
44+
*/
45+
public void downloadChunked(
46+
RemoteActionExecutionContext context, Digest blobDigest, OutputStream out)
47+
throws CacheNotFoundException, InterruptedException {
48+
try {
49+
doDownloadChunked(context, blobDigest, out);
50+
} catch (IOException e) {
51+
logger.atWarning().withCause(e).log("Chunked download failed for %s", blobDigest.getHash());
52+
throw new CacheNotFoundException(blobDigest);
53+
}
54+
}
55+
56+
private void doDownloadChunked(
57+
RemoteActionExecutionContext context, Digest blobDigest, OutputStream out)
58+
throws IOException, InterruptedException {
59+
ListenableFuture<SplitBlobResponse> splitResponseFuture = grpcCacheClient.splitBlob(context, blobDigest);
60+
if (splitResponseFuture == null) {
61+
throw new CacheNotFoundException(blobDigest);
62+
}
63+
downloadAndReassembleChunks(context, getFromFuture(splitResponseFuture).getChunkDigestsList(), out);
64+
}
65+
66+
private void downloadAndReassembleChunks(
67+
RemoteActionExecutionContext context, List<Digest> chunkDigests, OutputStream out)
68+
throws IOException, InterruptedException {
69+
for (Digest chunkDigest : chunkDigests) {
70+
getFromFuture(grpcCacheClient.downloadBlob(context, chunkDigest, out));
71+
}
72+
}
73+
}
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
// Copyright 2026 The Bazel Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.google.devtools.build.lib.remote;
16+
17+
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
18+
import build.bazel.remote.execution.v2.Digest;
19+
import build.bazel.remote.execution.v2.SplitBlobResponse;
20+
import com.google.common.collect.ImmutableSet;
21+
import com.google.common.collect.Lists;
22+
import com.google.common.util.concurrent.Futures;
23+
import com.google.common.util.concurrent.ListenableFuture;
24+
import com.google.devtools.build.lib.concurrent.TaskDeduplicator;
25+
import com.google.devtools.build.lib.remote.chunking.ChunkingConfig;
26+
import com.google.devtools.build.lib.remote.chunking.FastCDCChunker;
27+
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
28+
import com.google.devtools.build.lib.remote.util.DigestUtil;
29+
import com.google.devtools.build.lib.vfs.Path;
30+
import java.io.ByteArrayInputStream;
31+
import java.io.IOException;
32+
import java.io.InputStream;
33+
import java.util.ArrayList;
34+
import java.util.HashMap;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.concurrent.ExecutionException;
38+
39+
/**
40+
* Uploads blobs in chunks using Content-Defined Chunking with FastCDC 2020.
41+
*
42+
* <p>
43+
* Upload flow for blobs above threshold:
44+
*
45+
* <ol>
46+
* <li>Chunk file with FastCDC
47+
* <li>Call findMissingDigests on chunk digests
48+
* <li>Upload only missing chunks
49+
* <li>Call SpliceBlob to register the blob as the concatenation of chunks
50+
* </ol>
51+
*/
52+
public class ChunkedBlobUploader {
53+
private final GrpcCacheClient grpcCacheClient;
54+
private final FastCDCChunker chunker;
55+
private final long chunkingThreshold;
56+
private final TaskDeduplicator<Digest, Void> chunkUploadDeduplicator = new TaskDeduplicator<>();
57+
58+
public ChunkedBlobUploader(GrpcCacheClient grpcCacheClient, DigestUtil digestUtil) {
59+
this(grpcCacheClient, ChunkingConfig.defaults(), digestUtil);
60+
}
61+
62+
public ChunkedBlobUploader(
63+
GrpcCacheClient grpcCacheClient, ChunkingConfig config, DigestUtil digestUtil) {
64+
this.grpcCacheClient = grpcCacheClient;
65+
this.chunker = new FastCDCChunker(config, digestUtil);
66+
this.chunkingThreshold = config.chunkingThreshold();
67+
}
68+
69+
public long getChunkingThreshold() {
70+
return chunkingThreshold;
71+
}
72+
73+
public void uploadChunked(RemoteActionExecutionContext context, Digest blobDigest, Path file)
74+
throws IOException, InterruptedException {
75+
if (isAlreadyChunkedOnServer(context, blobDigest)) {
76+
return;
77+
}
78+
doChunkedUpload(context, blobDigest, file);
79+
}
80+
81+
private boolean isAlreadyChunkedOnServer(
82+
RemoteActionExecutionContext context, Digest blobDigest) throws InterruptedException {
83+
ListenableFuture<SplitBlobResponse> splitFuture = grpcCacheClient.splitBlob(context, blobDigest);
84+
if (splitFuture == null) {
85+
return false;
86+
}
87+
try {
88+
SplitBlobResponse response = splitFuture.get();
89+
return isTrulyChunked(response, blobDigest);
90+
} catch (ExecutionException e) {
91+
return false;
92+
}
93+
}
94+
95+
// TODO(https://github.com/bazelbuild/remote-apis/pull/358): should make this check unnecessary.
96+
private static boolean isTrulyChunked(SplitBlobResponse response, Digest blobDigest) {
97+
if (response == null || response.getChunkDigestsCount() == 0) {
98+
return false;
99+
}
100+
if (response.getChunkDigestsCount() == 1 && response.getChunkDigests(0).equals(blobDigest)) {
101+
return false;
102+
}
103+
return true;
104+
}
105+
106+
private void doChunkedUpload(RemoteActionExecutionContext context, Digest blobDigest, Path file)
107+
throws IOException, InterruptedException {
108+
List<Digest> chunkDigests;
109+
try (InputStream input = file.getInputStream()) {
110+
chunkDigests = chunker.chunkToDigests(input);
111+
}
112+
if (chunkDigests.isEmpty()) {
113+
return;
114+
}
115+
116+
ImmutableSet<Digest> missingDigests;
117+
try {
118+
missingDigests = grpcCacheClient.findMissingDigests(context, chunkDigests).get();
119+
} catch (ExecutionException e) {
120+
throw new IOException("Failed to find missing digests", e.getCause());
121+
}
122+
123+
uploadMissingChunks(context, missingDigests, chunkDigests, file);
124+
125+
try {
126+
grpcCacheClient.spliceBlob(context, blobDigest, chunkDigests).get();
127+
} catch (ExecutionException e) {
128+
throw new IOException("Failed to splice blob", e.getCause());
129+
}
130+
}
131+
132+
private void uploadMissingChunks(
133+
RemoteActionExecutionContext context,
134+
ImmutableSet<Digest> missingDigests,
135+
List<Digest> chunkDigests,
136+
Path file)
137+
throws IOException, InterruptedException {
138+
if (missingDigests.isEmpty()) {
139+
return;
140+
}
141+
142+
// Rather than keeping the offsets of the chunks,
143+
// We can just use the size from the digests of the prev
144+
// chunks to compute just the offsets we need.
145+
Map<Digest, Long> digestToOffset = new HashMap<>();
146+
long offset = 0;
147+
for (Digest digest : chunkDigests) {
148+
if (missingDigests.contains(digest)) {
149+
digestToOffset.put(digest, offset);
150+
}
151+
offset += digest.getSizeBytes();
152+
}
153+
154+
for (Digest chunkDigest : missingDigests) {
155+
long chunkOffset = digestToOffset.get(chunkDigest);
156+
getFromFuture(chunkUploadDeduplicator.executeIfNew(chunkDigest,
157+
() -> uploadChunk(context, chunkDigest, chunkOffset, file)));
158+
}
159+
}
160+
161+
private ListenableFuture<Void> uploadChunk(RemoteActionExecutionContext context, Digest digest, long offset,
162+
Path file) {
163+
try {
164+
byte[] data = readChunkData(file, offset, (int) digest.getSizeBytes());
165+
return grpcCacheClient.uploadBlob(context, digest, () -> new ByteArrayInputStream(data));
166+
} catch (IOException e) {
167+
return Futures.immediateFailedFuture(e);
168+
}
169+
170+
}
171+
172+
private byte[] readChunkData(Path file, long offset, int length) throws IOException {
173+
try (InputStream input = file.getInputStream()) {
174+
input.skipNBytes(offset);
175+
return input.readNBytes(length);
176+
}
177+
}
178+
}

src/main/java/com/google/devtools/build/lib/remote/CombinedCache.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,12 @@
3232
import com.google.common.flogger.GoogleLogger;
3333
import com.google.common.util.concurrent.Futures;
3434
import com.google.common.util.concurrent.ListenableFuture;
35+
import com.google.common.util.concurrent.ListeningExecutorService;
36+
import com.google.common.util.concurrent.MoreExecutors;
3537
import com.google.devtools.build.lib.concurrent.ThreadSafety;
3638
import com.google.devtools.build.lib.exec.SpawnCheckingCacheEvent;
3739
import com.google.devtools.build.lib.exec.SpawnProgressEvent;
40+
import com.google.devtools.build.lib.remote.chunking.ChunkingConfig;
3841
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
3942
import com.google.devtools.build.lib.remote.common.LazyFileOutputStream;
4043
import com.google.devtools.build.lib.remote.common.OutputDigestMismatchException;
@@ -64,6 +67,7 @@
6467
import java.util.List;
6568
import java.util.Set;
6669
import java.util.concurrent.CountDownLatch;
70+
import java.util.concurrent.Executors;
6771
import java.util.concurrent.atomic.AtomicLong;
6872
import java.util.regex.Matcher;
6973
import java.util.regex.Pattern;
@@ -93,6 +97,11 @@ public class CombinedCache extends AbstractReferenceCounted {
9397
private final CountDownLatch closeCountDownLatch = new CountDownLatch(1);
9498
protected final AsyncTaskCache.NoResult<Digest> casUploadCache = AsyncTaskCache.NoResult.create();
9599

100+
@SuppressWarnings("AllowVirtualThreads")
101+
private final ListeningExecutorService virtualThreadExecutor =
102+
MoreExecutors.listeningDecorator(
103+
Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("combined-cache-", 0).factory()));
104+
96105
@Nullable protected final RemoteCacheClient remoteCacheClient;
97106
@Nullable protected final DiskCacheClient diskCacheClient;
98107
@Nullable protected final String symlinkTemplate;
@@ -130,6 +139,15 @@ public ServerCapabilities getRemoteServerCapabilities() throws IOException {
130139
return remoteCacheClient.getServerCapabilities();
131140
}
132141

142+
@Nullable
143+
public ChunkingConfig getChunkingConfig() {
144+
try {
145+
return ChunkingConfig.fromServerCapabilities(getRemoteServerCapabilities());
146+
} catch (IOException e) {
147+
return null;
148+
}
149+
}
150+
133151
/**
134152
* Class to keep track of which cache (disk or remote) a given [cached] ActionResult comes from.
135153
*/
@@ -440,6 +458,29 @@ private ListenableFuture<Void> downloadBlobFromRemote(
440458
RemoteActionExecutionContext context, Digest digest, OutputStream out) {
441459
checkState(remoteCacheClient != null && context.getReadCachePolicy().allowRemoteCache());
442460

461+
if (remoteCacheClient instanceof GrpcCacheClient grpcClient) {
462+
ChunkedBlobDownloader chunkedDownloader = grpcClient.getChunkedDownloader();
463+
if (chunkedDownloader != null && digest.getSizeBytes() > grpcClient.getChunkingThreshold()) {
464+
ListenableFuture<Void> chunkedDownloadFuture =
465+
virtualThreadExecutor.submit(() -> {
466+
chunkedDownloader.downloadChunked(context, digest, out);
467+
return null;
468+
});
469+
return Futures.catchingAsync(
470+
chunkedDownloadFuture,
471+
CacheNotFoundException.class,
472+
(e) -> regularDownloadBlobFromRemote(context, digest, out),
473+
directExecutor());
474+
}
475+
}
476+
477+
return regularDownloadBlobFromRemote(context, digest, out);
478+
}
479+
480+
private ListenableFuture<Void> regularDownloadBlobFromRemote(
481+
RemoteActionExecutionContext context, Digest digest, OutputStream out) {
482+
checkState(remoteCacheClient != null && context.getReadCachePolicy().allowRemoteCache());
483+
443484
if (diskCacheClient != null && context.getWriteCachePolicy().allowDiskCache()) {
444485
Path tempPath = diskCacheClient.getTempPath();
445486
LazyFileOutputStream tempOut = new LazyFileOutputStream(tempPath);

0 commit comments

Comments
 (0)