|
32 | 32 | import com.google.devtools.build.lib.remote.common.OutputDigestMismatchException; |
33 | 33 | import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey; |
34 | 34 | import com.google.devtools.build.lib.remote.util.DigestUtil; |
| 35 | +import com.google.devtools.build.lib.testutil.TestUtils; |
35 | 36 | import com.google.devtools.build.lib.vfs.DigestHashFunction; |
36 | 37 | import com.google.devtools.build.lib.vfs.FileSystem; |
37 | 38 | import com.google.devtools.build.lib.vfs.FileSystemUtils; |
38 | 39 | import com.google.devtools.build.lib.vfs.Path; |
39 | 40 | import com.google.devtools.build.lib.vfs.SyscallCache; |
40 | 41 | import com.google.devtools.build.lib.vfs.bazel.BazelHashFunctions; |
41 | 42 | import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; |
| 43 | +import com.google.devtools.build.lib.vfs.util.FileSystems; |
42 | 44 | import com.google.errorprone.annotations.CanIgnoreReturnValue; |
43 | 45 | import com.google.protobuf.ByteString; |
44 | 46 | import com.google.protobuf.Message; |
45 | 47 | import java.io.IOException; |
| 48 | +import java.io.OutputStream; |
| 49 | +import java.util.ArrayList; |
| 50 | +import java.util.concurrent.CountDownLatch; |
| 51 | +import java.util.concurrent.ExecutionException; |
| 52 | +import java.util.concurrent.Executors; |
| 53 | +import java.util.concurrent.Future; |
46 | 54 | import org.junit.After; |
47 | 55 | import org.junit.Before; |
48 | 56 | import org.junit.Test; |
@@ -346,6 +354,86 @@ public void downloadActionResult_withReferencedTreeFileMissing_returnsNull() thr |
346 | 354 | assertThat(result).isNull(); |
347 | 355 | } |
348 | 356 |
|
| 357 | + @Test |
| 358 | + public void concurrentUploadDownload() |
| 359 | + throws IOException, ExecutionException, InterruptedException { |
| 360 | + var nativeDiskCacheDir = TestUtils.createUniqueTmpDir(FileSystems.getNativeFileSystem()); |
| 361 | + var nativeClient = |
| 362 | + new DiskCacheClient(nativeDiskCacheDir, DIGEST_UTIL, /* verifyDownloads= */ false); |
| 363 | + var tasks = new ArrayList<Future<?>>(); |
| 364 | + // Use 1 MB blobs to increase the window for concurrent access during write/rename. |
| 365 | + var contentSize = 1024 * 1024; |
| 366 | + var numConcurrentOps = 10; |
| 367 | + try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { |
| 368 | + for (int attempt = 0; attempt < 100; attempt++) { |
| 369 | + var contentArray = new byte[contentSize]; |
| 370 | + // Fill with a pattern based on the attempt number. |
| 371 | + for (int i = 0; i < contentSize; i++) { |
| 372 | + contentArray[i] = (byte) (attempt + i); |
| 373 | + } |
| 374 | + var contentBytes = ByteString.copyFrom(contentArray); |
| 375 | + var contentDigest = DIGEST_UTIL.compute(contentArray); |
| 376 | + // Use a latch to ensure all concurrent tasks start at roughly the same time. |
| 377 | + var startLatch = new CountDownLatch(numConcurrentOps); |
| 378 | + // Half the tasks do uploads, half do downloads with a slow OutputStream to keep the file |
| 379 | + // open longer. This maximizes the chance of a rename failing because a download has the |
| 380 | + // file open. |
| 381 | + for (int concurrentOp = 0; concurrentOp < numConcurrentOps; concurrentOp++) { |
| 382 | + boolean isUploader = concurrentOp % 2 == 0; |
| 383 | + tasks.add( |
| 384 | + executor.submit( |
| 385 | + () -> { |
| 386 | + // Signal ready and wait for all tasks to be ready. |
| 387 | + startLatch.countDown(); |
| 388 | + startLatch.await(); |
| 389 | + if (isUploader) { |
| 390 | + getFromFuture(nativeClient.uploadBlob(contentDigest, contentBytes)); |
| 391 | + } else { |
| 392 | + // Use a slow OutputStream that pauses periodically to keep the file open |
| 393 | + // longer during download. |
| 394 | + var out = |
| 395 | + new OutputStream() { |
| 396 | + private int bytesWritten = 0; |
| 397 | + |
| 398 | + @Override |
| 399 | + public void write(int b) throws IOException { |
| 400 | + bytesWritten++; |
| 401 | + maybeSleep(); |
| 402 | + } |
| 403 | + |
| 404 | + @Override |
| 405 | + public void write(byte[] b, int off, int len) throws IOException { |
| 406 | + bytesWritten += len; |
| 407 | + maybeSleep(); |
| 408 | + } |
| 409 | + |
| 410 | + private void maybeSleep() { |
| 411 | + // Sleep every 64KB to slow down the download. |
| 412 | + if (bytesWritten % (64 * 1024) < 100) { |
| 413 | + try { |
| 414 | + Thread.sleep(1); |
| 415 | + } catch (InterruptedException e) { |
| 416 | + Thread.currentThread().interrupt(); |
| 417 | + } |
| 418 | + } |
| 419 | + } |
| 420 | + }; |
| 421 | + try { |
| 422 | + getFromFuture(nativeClient.downloadBlob(contentDigest, out)); |
| 423 | + } catch (CacheNotFoundException ignored) { |
| 424 | + // File not yet uploaded by another task. |
| 425 | + } |
| 426 | + } |
| 427 | + return null; |
| 428 | + })); |
| 429 | + } |
| 430 | + } |
| 431 | + for (var task : tasks) { |
| 432 | + task.get(); |
| 433 | + } |
| 434 | + } |
| 435 | + } |
| 436 | + |
349 | 437 | private Tree getTreeWithFile(Digest fileDigest) { |
350 | 438 | return Tree.newBuilder() |
351 | 439 | .addChildren(Directory.newBuilder().addFiles(FileNode.newBuilder().setDigest(fileDigest))) |
|
0 commit comments