Skip to content

Commit 025396b

Browse files
authored
Implement failIfAlreadyExists in S3 repositories (#133030)
Today, S3-backed repositories ignore the `failIfAlreadyExists` flag and may therefore overwrite a blob which already exists, potentially corrupting a repository subject to concurrent writes, rather than failing the second write. AWS S3 now supports writes conditional on the non-existence of an object via the `If-None-Match: *` HTTP header. This commit adjusts the S3-backed repository implementation to respect the `failIfAlreadyExists` flag using these conditional writes, eliminating the possibility of overwriting blobs which should not be overwritten. Relates #128565
1 parent 4db0011 commit 025396b

File tree

8 files changed

+351
-35
lines changed

8 files changed

+351
-35
lines changed

docs/changelog/133030.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 133030
2+
summary: Implement `failIfAlreadyExists` in S3 repositories
3+
area: Snapshot/Restore
4+
type: enhancement
5+
issues:
6+
- 128565

modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,11 @@
8383
import java.util.Objects;
8484
import java.util.Set;
8585
import java.util.concurrent.CountDownLatch;
86+
import java.util.concurrent.Executors;
8687
import java.util.concurrent.Semaphore;
88+
import java.util.concurrent.TimeUnit;
8789
import java.util.concurrent.atomic.AtomicBoolean;
90+
import java.util.concurrent.atomic.AtomicInteger;
8891
import java.util.concurrent.atomic.AtomicLong;
8992
import java.util.stream.Collectors;
9093
import java.util.stream.StreamSupport;
@@ -105,6 +108,7 @@
105108
import static org.hamcrest.Matchers.hasSize;
106109
import static org.hamcrest.Matchers.lessThan;
107110
import static org.hamcrest.Matchers.not;
111+
import static org.hamcrest.Matchers.startsWith;
108112

109113
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
110114
// Need to set up a new cluster for each test because cluster settings use randomized authentication settings
@@ -425,7 +429,7 @@ public void testEnforcedCooldownPeriod() throws IOException {
425429
if (randomBoolean()) {
426430
repository.blobStore()
427431
.blobContainer(repository.basePath())
428-
.writeBlobAtomic(randomNonDataPurpose(), getRepositoryDataBlobName(modifiedRepositoryData.getGenId()), serialized, true);
432+
.writeBlobAtomic(randomNonDataPurpose(), getRepositoryDataBlobName(modifiedRepositoryData.getGenId()), serialized, false);
429433
} else {
430434
repository.blobStore()
431435
.blobContainer(repository.basePath())
@@ -434,7 +438,7 @@ public void testEnforcedCooldownPeriod() throws IOException {
434438
getRepositoryDataBlobName(modifiedRepositoryData.getGenId()),
435439
serialized.streamInput(),
436440
serialized.length(),
437-
true
441+
false
438442
);
439443
}
440444

@@ -568,6 +572,52 @@ public void match(LogEvent event) {
568572
}
569573
}
570574

575+
public void testFailIfAlreadyExists() throws IOException, InterruptedException {
576+
try (BlobStore store = newBlobStore()) {
577+
final BlobContainer container = store.blobContainer(BlobPath.EMPTY);
578+
final String blobName = randomAlphaOfLengthBetween(8, 12);
579+
580+
final byte[] data;
581+
if (randomBoolean()) {
582+
// single upload
583+
data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
584+
} else {
585+
// multipart upload
586+
int thresholdInBytes = Math.toIntExact(((S3BlobContainer) container).getLargeBlobThresholdInBytes());
587+
data = randomBytes(randomIntBetween(thresholdInBytes, thresholdInBytes + scaledRandomIntBetween(1024, 1 << 16)));
588+
}
589+
590+
// initial write blob
591+
AtomicInteger exceptionCount = new AtomicInteger(0);
592+
try (var executor = Executors.newFixedThreadPool(2)) {
593+
for (int i = 0; i < 2; i++) {
594+
executor.submit(() -> {
595+
try {
596+
writeBlob(container, blobName, new BytesArray(data), true);
597+
} catch (IOException e) {
598+
exceptionCount.incrementAndGet();
599+
}
600+
});
601+
}
602+
executor.shutdown();
603+
var done = executor.awaitTermination(1, TimeUnit.SECONDS);
604+
assertTrue(done);
605+
}
606+
607+
assertEquals(1, exceptionCount.get());
608+
609+
// overwrite if failIfAlreadyExists is set to false
610+
writeBlob(container, blobName, new BytesArray(data), false);
611+
612+
// throw exception if failIfAlreadyExists is set to true
613+
var exception = expectThrows(IOException.class, () -> writeBlob(container, blobName, new BytesArray(data), true));
614+
615+
assertThat(exception.getMessage(), startsWith("Unable to upload"));
616+
617+
container.delete(randomPurpose());
618+
}
619+
}
620+
571621
/**
572622
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
573623
*/

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -137,18 +137,15 @@ public long readBlobPreferredLength() {
137137
return ByteSizeValue.of(32, ByteSizeUnit.MB).getBytes();
138138
}
139139

140-
/**
141-
* This implementation ignores the failIfAlreadyExists flag as the S3 API has no way to enforce this due to its weak consistency model.
142-
*/
143140
@Override
144141
public void writeBlob(OperationPurpose purpose, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
145142
throws IOException {
146143
assert BlobContainer.assertPurposeConsistency(purpose, blobName);
147144
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
148145
if (blobSize <= getLargeBlobThresholdInBytes()) {
149-
executeSingleUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize);
146+
executeSingleUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
150147
} else {
151-
executeMultipartUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize);
148+
executeMultipartUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
152149
}
153150
}
154151

@@ -545,7 +542,8 @@ void executeSingleUpload(
545542
final S3BlobStore s3BlobStore,
546543
final String blobName,
547544
final InputStream input,
548-
final long blobSize
545+
final long blobSize,
546+
final boolean failIfAlreadyExists
549547
) throws IOException {
550548
try (var clientReference = s3BlobStore.clientReference()) {
551549
// Extra safety checks
@@ -565,6 +563,9 @@ void executeSingleUpload(
565563
if (s3BlobStore.serverSideEncryption()) {
566564
putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
567565
}
566+
if (failIfAlreadyExists) {
567+
putRequestBuilder.ifNoneMatch("*");
568+
}
568569
S3BlobStore.configureRequestForMetrics(putRequestBuilder, blobStore, Operation.PUT_OBJECT, purpose);
569570

570571
final var putRequest = putRequestBuilder.build();
@@ -586,7 +587,8 @@ private void executeMultipart(
586587
final String blobName,
587588
final long partSize,
588589
final long blobSize,
589-
final PartOperation partOperation
590+
final PartOperation partOperation,
591+
final boolean failIfAlreadyExists
590592
) throws IOException {
591593

592594
ensureMultiPartUploadSize(blobSize);
@@ -639,6 +641,11 @@ private void executeMultipart(
639641
.key(blobName)
640642
.uploadId(uploadId)
641643
.multipartUpload(b -> b.parts(parts));
644+
645+
if (failIfAlreadyExists) {
646+
completeMultipartUploadRequestBuilder.ifNoneMatch("*");
647+
}
648+
642649
S3BlobStore.configureRequestForMetrics(completeMultipartUploadRequestBuilder, blobStore, operation, purpose);
643650
final var completeMultipartUploadRequest = completeMultipartUploadRequestBuilder.build();
644651
try (var clientReference = s3BlobStore.clientReference()) {
@@ -663,7 +670,8 @@ void executeMultipartUpload(
663670
final S3BlobStore s3BlobStore,
664671
final String blobName,
665672
final InputStream input,
666-
final long blobSize
673+
final long blobSize,
674+
final boolean failIfAlreadyExists
667675
) throws IOException {
668676
executeMultipart(
669677
purpose,
@@ -680,7 +688,8 @@ void executeMultipartUpload(
680688
.uploadPart(uploadRequest, RequestBody.fromInputStream(input, partSize));
681689
return CompletedPart.builder().partNumber(partNum).eTag(uploadResponse.eTag()).build();
682690
}
683-
}
691+
},
692+
failIfAlreadyExists
684693
);
685694
}
686695

@@ -727,7 +736,8 @@ void executeMultipartCopy(
727736
final var uploadPartCopyResponse = clientReference.client().uploadPartCopy(uploadPartCopyRequest);
728737
return CompletedPart.builder().partNumber(partNum).eTag(uploadPartCopyResponse.copyPartResult().eTag()).build();
729738
}
730-
})
739+
}),
740+
false
731741
);
732742
}
733743

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,14 @@ public void testExecuteSingleUploadBlobSizeTooLarge() {
6969

7070
final IllegalArgumentException e = expectThrows(
7171
IllegalArgumentException.class,
72-
() -> blobContainer.executeSingleUpload(randomPurpose(), blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)
72+
() -> blobContainer.executeSingleUpload(
73+
randomPurpose(),
74+
blobStore,
75+
randomAlphaOfLengthBetween(1, 10),
76+
null,
77+
blobSize,
78+
randomBoolean()
79+
)
7380
);
7481
assertEquals("Upload request size [" + blobSize + "] can't be larger than 5gb", e.getMessage());
7582
}
@@ -88,7 +95,8 @@ public void testExecuteSingleUploadBlobSizeLargerThanBufferSize() {
8895
blobStore,
8996
blobName,
9097
new ByteArrayInputStream(new byte[0]),
91-
ByteSizeUnit.MB.toBytes(2)
98+
ByteSizeUnit.MB.toBytes(2),
99+
randomBoolean()
92100
)
93101
);
94102
assertEquals("Upload request size [2097152] can't be larger than buffer size", e.getMessage());
@@ -123,6 +131,8 @@ public void testExecuteSingleUpload() throws IOException {
123131
when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
124132
}
125133

134+
final boolean failIfAlreadyExists = randomBoolean();
135+
126136
final S3Client client = configureMockClient(blobStore);
127137

128138
final ArgumentCaptor<PutObjectRequest> requestCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);
@@ -131,7 +141,7 @@ public void testExecuteSingleUpload() throws IOException {
131141
when(client.putObject(requestCaptor.capture(), bodyCaptor.capture())).thenReturn(PutObjectResponse.builder().build());
132142

133143
final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[blobSize]);
134-
blobContainer.executeSingleUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize);
144+
blobContainer.executeSingleUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, failIfAlreadyExists);
135145

136146
final PutObjectRequest request = requestCaptor.getValue();
137147
assertEquals(bucketName, request.bucket());
@@ -147,6 +157,10 @@ public void testExecuteSingleUpload() throws IOException {
147157
);
148158
}
149159

160+
if (failIfAlreadyExists) {
161+
assertEquals("*", request.ifNoneMatch());
162+
}
163+
150164
final RequestBody requestBody = bodyCaptor.getValue();
151165
try (var contentStream = requestBody.contentStreamProvider().newStream()) {
152166
assertEquals(inputStream.available(), blobSize);
@@ -164,7 +178,14 @@ public void testExecuteMultipartUploadBlobSizeTooLarge() {
164178

165179
final IllegalArgumentException e = expectThrows(
166180
IllegalArgumentException.class,
167-
() -> blobContainer.executeMultipartUpload(randomPurpose(), blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)
181+
() -> blobContainer.executeMultipartUpload(
182+
randomPurpose(),
183+
blobStore,
184+
randomAlphaOfLengthBetween(1, 10),
185+
null,
186+
blobSize,
187+
randomBoolean()
188+
)
168189
);
169190
assertEquals("Multipart upload request size [" + blobSize + "] can't be larger than 5tb", e.getMessage());
170191
}
@@ -176,7 +197,14 @@ public void testExecuteMultipartUploadBlobSizeTooSmall() {
176197

177198
final IllegalArgumentException e = expectThrows(
178199
IllegalArgumentException.class,
179-
() -> blobContainer.executeMultipartUpload(randomPurpose(), blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)
200+
() -> blobContainer.executeMultipartUpload(
201+
randomPurpose(),
202+
blobStore,
203+
randomAlphaOfLengthBetween(1, 10),
204+
null,
205+
blobSize,
206+
randomBoolean()
207+
)
180208
);
181209
assertEquals("Multipart upload request size [" + blobSize + "] can't be smaller than 5mb", e.getMessage());
182210
}
@@ -225,6 +253,8 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
225253
when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
226254
}
227255

256+
final boolean failIfAlreadyExists = doCopy ? false : randomBoolean();
257+
228258
final S3Client client = configureMockClient(blobStore);
229259

230260
final var uploadId = randomIdentifier();
@@ -273,7 +303,7 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
273303
if (doCopy) {
274304
blobContainer.executeMultipartCopy(randomPurpose(), sourceContainer, sourceBlobName, blobName, blobSize);
275305
} else {
276-
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize);
306+
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, failIfAlreadyExists);
277307
}
278308

279309
final CreateMultipartUploadRequest initRequest = createMultipartUploadRequestCaptor.getValue();
@@ -340,6 +370,10 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
340370
assertEquals(blobPath.buildAsString() + blobName, compRequest.key());
341371
assertEquals(uploadId, compRequest.uploadId());
342372

373+
if (failIfAlreadyExists) {
374+
assertEquals("*", compRequest.ifNoneMatch());
375+
}
376+
343377
final List<String> actualETags = compRequest.multipartUpload()
344378
.parts()
345379
.stream()
@@ -419,7 +453,14 @@ public void close() {}
419453

420454
final IOException e = expectThrows(IOException.class, () -> {
421455
final S3BlobContainer blobContainer = new S3BlobContainer(BlobPath.EMPTY, blobStore);
422-
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, new ByteArrayInputStream(new byte[0]), blobSize);
456+
blobContainer.executeMultipartUpload(
457+
randomPurpose(),
458+
blobStore,
459+
blobName,
460+
new ByteArrayInputStream(new byte[0]),
461+
blobSize,
462+
randomBoolean()
463+
);
423464
});
424465

425466
assertEquals("Unable to upload or copy object [" + blobName + "] using multipart upload", e.getMessage());

plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsRepositoryTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,9 @@ protected void assertCleanupResponse(CleanupRepositoryResponse response, long by
6262
assertThat(response.result().blobs(), equalTo(0L));
6363
}
6464
}
65+
66+
@Override
67+
public void testFailIfAlreadyExists() {
68+
// HDFS does not implement failIfAlreadyExists correctly
69+
}
6570
}

0 commit comments

Comments
 (0)