|
83 | 83 | import java.util.Objects; |
84 | 84 | import java.util.Set; |
85 | 85 | import java.util.concurrent.CountDownLatch; |
86 | | -import java.util.concurrent.Executors; |
| 86 | +import java.util.concurrent.CyclicBarrier; |
87 | 87 | import java.util.concurrent.Semaphore; |
88 | | -import java.util.concurrent.TimeUnit; |
89 | 88 | import java.util.concurrent.atomic.AtomicBoolean; |
90 | 89 | import java.util.concurrent.atomic.AtomicInteger; |
91 | 90 | import java.util.concurrent.atomic.AtomicLong; |
| 91 | +import java.util.concurrent.atomic.AtomicReference; |
92 | 92 | import java.util.stream.Collectors; |
93 | 93 | import java.util.stream.StreamSupport; |
94 | 94 |
|
@@ -572,52 +572,65 @@ public void match(LogEvent event) { |
572 | 572 | } |
573 | 573 | } |
574 | 574 |
|
575 | | - public void testFailIfAlreadyExists() throws IOException, InterruptedException { |
| 575 | + public void testFailIfAlreadyExists() throws IOException { |
576 | 576 | try (BlobStore store = newBlobStore()) { |
577 | 577 | final BlobContainer container = store.blobContainer(BlobPath.EMPTY); |
578 | 578 | final String blobName = randomAlphaOfLengthBetween(8, 12); |
579 | 579 |
|
580 | | - final byte[] data; |
581 | | - if (randomBoolean()) { |
582 | | - // single upload |
583 | | - data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); |
584 | | - } else { |
585 | | - // multipart upload |
586 | | - int thresholdInBytes = Math.toIntExact(((S3BlobContainer) container).getLargeBlobThresholdInBytes()); |
587 | | - data = randomBytes(randomIntBetween(thresholdInBytes, thresholdInBytes + scaledRandomIntBetween(1024, 1 << 16))); |
588 | | - } |
589 | | - |
590 | | - // initial write blob |
591 | | - AtomicInteger exceptionCount = new AtomicInteger(0); |
592 | | - try (var executor = Executors.newFixedThreadPool(2)) { |
593 | | - for (int i = 0; i < 2; i++) { |
594 | | - executor.submit(() -> { |
595 | | - try { |
596 | | - writeBlob(container, blobName, new BytesArray(data), true); |
597 | | - } catch (IOException e) { |
598 | | - exceptionCount.incrementAndGet(); |
599 | | - } |
600 | | - }); |
| 580 | + // initial write: exactly one of these may succeed |
| 581 | + final var parallelWrites = between(1, 4); |
| 582 | + final var exceptionCount = new AtomicInteger(0); |
| 583 | + final var successData = new AtomicReference<byte[]>(); |
| 584 | + final var writeBarrier = new CyclicBarrier(parallelWrites); |
| 585 | + runInParallel(parallelWrites, ignored -> { |
| 586 | + final byte[] data = getRandomData(container); |
| 587 | + safeAwait(writeBarrier); |
| 588 | + try { |
| 589 | + writeBlob(container, blobName, new BytesArray(data), true); |
| 590 | + assertTrue(successData.compareAndSet(null, data)); |
| 591 | + } catch (IOException e) { |
| 592 | + exceptionCount.incrementAndGet(); |
601 | 593 | } |
602 | | - executor.shutdown(); |
603 | | - var done = executor.awaitTermination(1, TimeUnit.SECONDS); |
604 | | - assertTrue(done); |
605 | | - } |
| 594 | + }); |
606 | 595 |
|
607 | | - assertEquals(1, exceptionCount.get()); |
| 596 | + // the data in the blob comes from the successful write |
| 597 | + assertNotNull(successData.get()); |
| 598 | + assertArrayEquals(successData.get(), readBlobFully(container, blobName, successData.get().length)); |
608 | 599 |
|
609 | | - // overwrite if failIfAlreadyExists is set to false |
610 | | - writeBlob(container, blobName, new BytesArray(data), false); |
| 600 | + // all the other writes failed with an IOException |
| 601 | + assertEquals(parallelWrites - 1, exceptionCount.get()); |
611 | 602 |
|
612 | | - // throw exception if failIfAlreadyExists is set to true |
613 | | - var exception = expectThrows(IOException.class, () -> writeBlob(container, blobName, new BytesArray(data), true)); |
| 603 | + // verify that another write succeeds |
| 604 | + final var overwriteData = getRandomData(container); |
| 605 | + writeBlob(container, blobName, new BytesArray(overwriteData), false); |
| 606 | + assertArrayEquals(overwriteData, readBlobFully(container, blobName, overwriteData.length)); |
614 | 607 |
|
| 608 | + // throw exception if failIfAlreadyExists is set to true |
| 609 | + var exception = expectThrows( |
| 610 | + IOException.class, |
| 611 | + () -> writeBlob(container, blobName, new BytesArray(getRandomData(container)), true) |
| 612 | + ); |
615 | 613 | assertThat(exception.getMessage(), startsWith("Unable to upload")); |
616 | 614 |
|
| 615 | + assertArrayEquals(overwriteData, readBlobFully(container, blobName, overwriteData.length)); |
| 616 | + |
617 | 617 | container.delete(randomPurpose()); |
618 | 618 | } |
619 | 619 | } |
620 | 620 |
|
| 621 | + private static byte[] getRandomData(BlobContainer container) { |
| 622 | + final byte[] data; |
| 623 | + if (randomBoolean()) { |
| 624 | + // single upload |
| 625 | + data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); |
| 626 | + } else { |
| 627 | + // multipart upload |
| 628 | + int thresholdInBytes = Math.toIntExact(((S3BlobContainer) container).getLargeBlobThresholdInBytes()); |
| 629 | + data = randomBytes(randomIntBetween(thresholdInBytes, thresholdInBytes + scaledRandomIntBetween(1024, 1 << 16))); |
| 630 | + } |
| 631 | + return data; |
| 632 | + } |
| 633 | + |
621 | 634 | /** |
622 | 635 | * S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload. |
623 | 636 | */ |
|
0 commit comments