4646import com .google .protobuf .ByteString ;
4747import com .google .protobuf .Message ;
4848import java .io .IOException ;
49+ import java .io .OutputStream ;
4950import java .util .ArrayList ;
5051import java .util .concurrent .CountDownLatch ;
5152import java .util .concurrent .ExecutionException ;
@@ -364,7 +365,7 @@ public void concurrentUploadDownload()
364365 var tasks = new ArrayList <Future <?>>();
365366 // Use 1 MB blobs to increase the window for concurrent access during write/rename.
366367 var contentSize = 1024 * 1024 ;
367- var numConcurrentOps = 5 ;
368+ var numConcurrentOps = 10 ;
368369 for (int attempt = 0 ; attempt < 100 ; attempt ++) {
369370 var contentArray = new byte [contentSize ];
370371 // Fill with a pattern based on the attempt number.
@@ -373,24 +374,56 @@ public void concurrentUploadDownload()
373374 }
374375 var contentBytes = ByteString .copyFrom (contentArray );
375376 var contentDigest = DIGEST_UTIL .compute (contentArray );
376- // Use a latch to ensure all concurrent tasks start their upload at roughly the same time,
377- // maximizing the chance that multiple threads pass the "file exists" check before any
378- // completes the rename.
377+ // Use a latch to ensure all concurrent tasks start at roughly the same time.
379378 var startLatch = new CountDownLatch (numConcurrentOps );
379+ // Half the tasks do uploads, half do downloads with a slow OutputStream to keep the file
380+ // open longer. This maximizes the chance of a rename failing because a download has the
381+ // file open.
380382 for (int concurrentOp = 0 ; concurrentOp < numConcurrentOps ; concurrentOp ++) {
383+ boolean isUploader = concurrentOp % 2 == 0 ;
381384 tasks .add (
382385 executor .submit (
383386 () -> {
384387 // Signal ready and wait for all tasks to be ready.
385388 startLatch .countDown ();
386389 startLatch .await ();
387- nativeClient .uploadBlob (contentDigest , contentBytes );
388- try (var out = ByteString .newOutput ()) {
389- getFromFuture (nativeClient .downloadBlob (contentDigest , out ));
390- var downloadedBytes = out .toByteString ();
391- assertThat (downloadedBytes ).isEqualTo (contentBytes );
392- } catch (CacheNotFoundException ignored ) {
393- // This task won the race over the upload task.
390+ if (isUploader ) {
391+ getFromFuture (nativeClient .uploadBlob (contentDigest , contentBytes ));
392+ } else {
393+ // Use a slow OutputStream that pauses periodically to keep the file open
394+ // longer during download.
395+ var out =
396+ new OutputStream () {
397+ private int bytesWritten = 0 ;
398+
399+ @ Override
400+ public void write (int b ) throws IOException {
401+ bytesWritten ++;
402+ maybeSleep ();
403+ }
404+
405+ @ Override
406+ public void write (byte [] b , int off , int len ) throws IOException {
407+ bytesWritten += len ;
408+ maybeSleep ();
409+ }
410+
411+ private void maybeSleep () {
412+ // Sleep every 64KB to slow down the download.
413+ if (bytesWritten % (64 * 1024 ) < 100 ) {
414+ try {
415+ Thread .sleep (1 );
416+ } catch (InterruptedException e ) {
417+ Thread .currentThread ().interrupt ();
418+ }
419+ }
420+ }
421+ };
422+ try {
423+ getFromFuture (nativeClient .downloadBlob (contentDigest , out ));
424+ } catch (CacheNotFoundException ignored ) {
425+ // File not yet uploaded by another task.
426+ }
394427 }
395428 return null ;
396429 }));
0 commit comments