|
83 | 83 | import java.util.Objects;
|
84 | 84 | import java.util.Set;
|
85 | 85 | import java.util.concurrent.CountDownLatch;
|
86 |
| -import java.util.concurrent.Executors; |
| 86 | +import java.util.concurrent.CyclicBarrier; |
87 | 87 | import java.util.concurrent.Semaphore;
|
88 |
| -import java.util.concurrent.TimeUnit; |
89 | 88 | import java.util.concurrent.atomic.AtomicBoolean;
|
90 | 89 | import java.util.concurrent.atomic.AtomicInteger;
|
91 | 90 | import java.util.concurrent.atomic.AtomicLong;
|
| 91 | +import java.util.concurrent.atomic.AtomicReference; |
92 | 92 | import java.util.stream.Collectors;
|
93 | 93 | import java.util.stream.StreamSupport;
|
94 | 94 |
|
@@ -572,52 +572,65 @@ public void match(LogEvent event) {
|
572 | 572 | }
|
573 | 573 | }
|
574 | 574 |
|
575 |
| - public void testFailIfAlreadyExists() throws IOException, InterruptedException { |
| 575 | + public void testFailIfAlreadyExists() throws IOException { |
576 | 576 | try (BlobStore store = newBlobStore()) {
|
577 | 577 | final BlobContainer container = store.blobContainer(BlobPath.EMPTY);
|
578 | 578 | final String blobName = randomAlphaOfLengthBetween(8, 12);
|
579 | 579 |
|
580 |
| - final byte[] data; |
581 |
| - if (randomBoolean()) { |
582 |
| - // single upload |
583 |
| - data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); |
584 |
| - } else { |
585 |
| - // multipart upload |
586 |
| - int thresholdInBytes = Math.toIntExact(((S3BlobContainer) container).getLargeBlobThresholdInBytes()); |
587 |
| - data = randomBytes(randomIntBetween(thresholdInBytes, thresholdInBytes + scaledRandomIntBetween(1024, 1 << 16))); |
588 |
| - } |
589 |
| - |
590 |
| - // initial write blob |
591 |
| - AtomicInteger exceptionCount = new AtomicInteger(0); |
592 |
| - try (var executor = Executors.newFixedThreadPool(2)) { |
593 |
| - for (int i = 0; i < 2; i++) { |
594 |
| - executor.submit(() -> { |
595 |
| - try { |
596 |
| - writeBlob(container, blobName, new BytesArray(data), true); |
597 |
| - } catch (IOException e) { |
598 |
| - exceptionCount.incrementAndGet(); |
599 |
| - } |
600 |
| - }); |
| 580 | + // initial write: exactly one of these may succeed |
| 581 | + final var parallelWrites = between(1, 4); |
| 582 | + final var exceptionCount = new AtomicInteger(0); |
| 583 | + final var successData = new AtomicReference<byte[]>(); |
| 584 | + final var writeBarrier = new CyclicBarrier(parallelWrites); |
| 585 | + runInParallel(parallelWrites, ignored -> { |
| 586 | + final byte[] data = getRandomData(container); |
| 587 | + safeAwait(writeBarrier); |
| 588 | + try { |
| 589 | + writeBlob(container, blobName, new BytesArray(data), true); |
| 590 | + assertTrue(successData.compareAndSet(null, data)); |
| 591 | + } catch (IOException e) { |
| 592 | + exceptionCount.incrementAndGet(); |
601 | 593 | }
|
602 |
| - executor.shutdown(); |
603 |
| - var done = executor.awaitTermination(1, TimeUnit.SECONDS); |
604 |
| - assertTrue(done); |
605 |
| - } |
| 594 | + }); |
606 | 595 |
|
607 |
| - assertEquals(1, exceptionCount.get()); |
| 596 | + // the data in the blob comes from the successful write |
| 597 | + assertNotNull(successData.get()); |
| 598 | + assertArrayEquals(successData.get(), readBlobFully(container, blobName, successData.get().length)); |
608 | 599 |
|
609 |
| - // overwrite if failIfAlreadyExists is set to false |
610 |
| - writeBlob(container, blobName, new BytesArray(data), false); |
| 600 | + // all the other writes failed with an IOException |
| 601 | + assertEquals(parallelWrites - 1, exceptionCount.get()); |
611 | 602 |
|
612 |
| - // throw exception if failIfAlreadyExists is set to true |
613 |
| - var exception = expectThrows(IOException.class, () -> writeBlob(container, blobName, new BytesArray(data), true)); |
| 603 | + // verify that another write succeeds |
| 604 | + final var overwriteData = getRandomData(container); |
| 605 | + writeBlob(container, blobName, new BytesArray(overwriteData), false); |
| 606 | + assertArrayEquals(overwriteData, readBlobFully(container, blobName, overwriteData.length)); |
614 | 607 |
|
| 608 | + // throw exception if failIfAlreadyExists is set to true |
| 609 | + var exception = expectThrows( |
| 610 | + IOException.class, |
| 611 | + () -> writeBlob(container, blobName, new BytesArray(getRandomData(container)), true) |
| 612 | + ); |
615 | 613 | assertThat(exception.getMessage(), startsWith("Unable to upload"));
|
616 | 614 |
|
| 615 | + assertArrayEquals(overwriteData, readBlobFully(container, blobName, overwriteData.length)); |
| 616 | + |
617 | 617 | container.delete(randomPurpose());
|
618 | 618 | }
|
619 | 619 | }
|
620 | 620 |
|
| 621 | + private static byte[] getRandomData(BlobContainer container) { |
| 622 | + final byte[] data; |
| 623 | + if (randomBoolean()) { |
| 624 | + // single upload |
| 625 | + data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); |
| 626 | + } else { |
| 627 | + // multipart upload |
| 628 | + int thresholdInBytes = Math.toIntExact(((S3BlobContainer) container).getLargeBlobThresholdInBytes()); |
| 629 | + data = randomBytes(randomIntBetween(thresholdInBytes, thresholdInBytes + scaledRandomIntBetween(1024, 1 << 16))); |
| 630 | + } |
| 631 | + return data; |
| 632 | + } |
| 633 | + |
621 | 634 | /**
|
622 | 635 | * S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
|
623 | 636 | */
|
|
0 commit comments