|
83 | 83 | import java.util.Objects; |
84 | 84 | import java.util.Set; |
85 | 85 | import java.util.concurrent.CountDownLatch; |
| 86 | +import java.util.concurrent.Executors; |
86 | 87 | import java.util.concurrent.Semaphore; |
| 88 | +import java.util.concurrent.TimeUnit; |
87 | 89 | import java.util.concurrent.atomic.AtomicBoolean; |
| 90 | +import java.util.concurrent.atomic.AtomicInteger; |
88 | 91 | import java.util.concurrent.atomic.AtomicLong; |
89 | 92 | import java.util.stream.Collectors; |
90 | 93 | import java.util.stream.StreamSupport; |
@@ -569,22 +572,47 @@ public void match(LogEvent event) { |
569 | 572 | } |
570 | 573 | } |
571 | 574 |
|
572 | | - public void testFailIfAlreadyExists() throws IOException { |
| 575 | + public void testFailIfAlreadyExists() throws IOException, InterruptedException { |
573 | 576 | try (BlobStore store = newBlobStore()) { |
574 | 577 | final BlobContainer container = store.blobContainer(BlobPath.EMPTY); |
575 | 578 | final String blobName = randomAlphaOfLengthBetween(8, 12); |
576 | | - byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); |
| 579 | + |
| 580 | + final byte[] data; |
| 581 | + if (randomBoolean()) { |
| 582 | + // single upload |
| 583 | + data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); |
| 584 | + } else { |
| 585 | + // multipart upload |
| 586 | + int thresholdInBytes = Math.toIntExact(((S3BlobContainer) container).getLargeBlobThresholdInBytes()); |
| 587 | + data = randomBytes(randomIntBetween(thresholdInBytes, thresholdInBytes + scaledRandomIntBetween(1024, 1 << 16))); |
| 588 | + } |
577 | 589 |
|
578 | 590 | // initial write blob |
579 | | - writeBlob(container, blobName, new BytesArray(data), true); |
| 591 | + AtomicInteger exceptionCount = new AtomicInteger(0); |
| 592 | + try (var executor = Executors.newFixedThreadPool(2)) { |
| 593 | + for (int i = 0; i < 2; i++) { |
| 594 | + executor.submit(() -> { |
| 595 | + try { |
| 596 | + writeBlob(container, blobName, new BytesArray(data), true); |
| 597 | + } catch (IOException e) { |
| 598 | + exceptionCount.incrementAndGet(); |
| 599 | + } |
| 600 | + }); |
| 601 | + } |
| 602 | + executor.shutdown(); |
| 603 | + var done = executor.awaitTermination(1, TimeUnit.SECONDS); |
| 604 | + assertTrue(done); |
| 605 | + } |
| 606 | + |
| 607 | + assertEquals(1, exceptionCount.get()); |
580 | 608 |
|
581 | | - // override if failIfAlreadyExists is set to false |
| 609 | + // overwrite if failIfAlreadyExists is set to false |
582 | 610 | writeBlob(container, blobName, new BytesArray(data), false); |
583 | 611 |
|
584 | 612 | // throw exception if failIfAlreadyExists is set to true |
585 | 613 | var exception = expectThrows(IOException.class, () -> writeBlob(container, blobName, new BytesArray(data), true)); |
586 | 614 |
|
587 | | - assertThat(exception.getMessage(), startsWith("Unable to upload object")); |
| 615 | + assertThat(exception.getMessage(), startsWith("Unable to upload")); |
588 | 616 |
|
589 | 617 | container.delete(randomPurpose()); |
590 | 618 | } |
|
0 commit comments