Skip to content

Commit f3c380d

Browse files
committed
Implement failIfAlreadyExists in S3 repositories
1 parent eac438b commit f3c380d

File tree

7 files changed

+200
-35
lines changed

7 files changed

+200
-35
lines changed

modules/repository-s3/qa/third-party/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ public void testMultipartCopy() {
264264
// executeMultipart requires a minimum part size of 5 MiB
265265
final var blobBytes = randomBytesReference(randomIntBetween(5 * 1024 * 1024, 10 * 1024 * 1024));
266266
final var destinationBlobName = randomIdentifier();
267+
final var failIfAlreadyExists = randomBoolean();
267268

268269
final var repository = getRepository();
269270

@@ -277,7 +278,8 @@ public void testMultipartCopy() {
277278
(S3BlobContainer) sourceBlobContainer,
278279
sourceBlobName,
279280
destinationBlobName,
280-
blobBytes.length()
281+
blobBytes.length(),
282+
failIfAlreadyExists
281283
);
282284

283285
return destinationBlobContainer.readBlob(randomPurpose(), destinationBlobName).readAllBytes();

modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
import static org.hamcrest.Matchers.hasSize;
106106
import static org.hamcrest.Matchers.lessThan;
107107
import static org.hamcrest.Matchers.not;
108+
import static org.hamcrest.Matchers.startsWith;
108109

109110
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
110111
// Need to set up a new cluster for each test because cluster settings use randomized authentication settings
@@ -425,7 +426,7 @@ public void testEnforcedCooldownPeriod() throws IOException {
425426
if (randomBoolean()) {
426427
repository.blobStore()
427428
.blobContainer(repository.basePath())
428-
.writeBlobAtomic(randomNonDataPurpose(), getRepositoryDataBlobName(modifiedRepositoryData.getGenId()), serialized, true);
429+
.writeBlobAtomic(randomNonDataPurpose(), getRepositoryDataBlobName(modifiedRepositoryData.getGenId()), serialized, false);
429430
} else {
430431
repository.blobStore()
431432
.blobContainer(repository.basePath())
@@ -434,7 +435,7 @@ public void testEnforcedCooldownPeriod() throws IOException {
434435
getRepositoryDataBlobName(modifiedRepositoryData.getGenId()),
435436
serialized.streamInput(),
436437
serialized.length(),
437-
true
438+
false
438439
);
439440
}
440441

@@ -568,6 +569,27 @@ public void match(LogEvent event) {
568569
}
569570
}
570571

572+
public void testFailIfAlreadyExists() throws IOException {
573+
try (BlobStore store = newBlobStore()) {
574+
final BlobContainer container = store.blobContainer(BlobPath.EMPTY);
575+
final String blobName = randomAlphaOfLengthBetween(8, 12);
576+
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
577+
578+
// initial write blob
579+
writeBlob(container, blobName, new BytesArray(data), true);
580+
581+
// override if failIfAlreadyExists is set to false
582+
writeBlob(container, blobName, new BytesArray(data), false);
583+
584+
// throw exception if failIfAlreadyExists is set to true
585+
var exception = expectThrows(IOException.class, () -> writeBlob(container, blobName, new BytesArray(data), true));
586+
587+
assertThat(exception.getMessage(), startsWith("Unable to upload object"));
588+
589+
container.delete(randomPurpose());
590+
}
591+
}
592+
571593
/**
572594
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
573595
*/

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -137,18 +137,15 @@ public long readBlobPreferredLength() {
137137
return ByteSizeValue.of(32, ByteSizeUnit.MB).getBytes();
138138
}
139139

140-
/**
141-
* This implementation ignores the failIfAlreadyExists flag as the S3 API has no way to enforce this due to its weak consistency model.
142-
*/
143140
@Override
144141
public void writeBlob(OperationPurpose purpose, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
145142
throws IOException {
146143
assert BlobContainer.assertPurposeConsistency(purpose, blobName);
147144
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
148145
if (blobSize <= getLargeBlobThresholdInBytes()) {
149-
executeSingleUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize);
146+
executeSingleUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
150147
} else {
151-
executeMultipartUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize);
148+
executeMultipartUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
152149
}
153150
}
154151

@@ -366,7 +363,7 @@ public void copyBlob(
366363

367364
try {
368365
if (blobSize > getMaxCopySizeBeforeMultipart()) {
369-
executeMultipartCopy(purpose, s3SourceBlobContainer, sourceBlobName, blobName, blobSize);
366+
executeMultipartCopy(purpose, s3SourceBlobContainer, sourceBlobName, blobName, blobSize, false);
370367
} else {
371368
// metadata is inherited from source, but not canned ACL or storage class
372369
final var blobKey = buildKey(blobName);
@@ -545,7 +542,8 @@ void executeSingleUpload(
545542
final S3BlobStore s3BlobStore,
546543
final String blobName,
547544
final InputStream input,
548-
final long blobSize
545+
final long blobSize,
546+
final boolean failIfAlreadyExists
549547
) throws IOException {
550548
try (var clientReference = s3BlobStore.clientReference()) {
551549
// Extra safety checks
@@ -565,6 +563,9 @@ void executeSingleUpload(
565563
if (s3BlobStore.serverSideEncryption()) {
566564
putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
567565
}
566+
if (failIfAlreadyExists) {
567+
putRequestBuilder.ifNoneMatch("*");
568+
}
568569
S3BlobStore.configureRequestForMetrics(putRequestBuilder, blobStore, Operation.PUT_OBJECT, purpose);
569570

570571
final var putRequest = putRequestBuilder.build();
@@ -586,7 +587,8 @@ private void executeMultipart(
586587
final String blobName,
587588
final long partSize,
588589
final long blobSize,
589-
final PartOperation partOperation
590+
final PartOperation partOperation,
591+
final boolean failIfAlreadyExists
590592
) throws IOException {
591593

592594
ensureMultiPartUploadSize(blobSize);
@@ -639,6 +641,11 @@ private void executeMultipart(
639641
.key(blobName)
640642
.uploadId(uploadId)
641643
.multipartUpload(b -> b.parts(parts));
644+
645+
if (failIfAlreadyExists) {
646+
completeMultipartUploadRequestBuilder.ifNoneMatch("*");
647+
}
648+
642649
S3BlobStore.configureRequestForMetrics(completeMultipartUploadRequestBuilder, blobStore, operation, purpose);
643650
final var completeMultipartUploadRequest = completeMultipartUploadRequestBuilder.build();
644651
try (var clientReference = s3BlobStore.clientReference()) {
@@ -663,7 +670,8 @@ void executeMultipartUpload(
663670
final S3BlobStore s3BlobStore,
664671
final String blobName,
665672
final InputStream input,
666-
final long blobSize
673+
final long blobSize,
674+
final boolean failIfAlreadyExists
667675
) throws IOException {
668676
executeMultipart(
669677
purpose,
@@ -680,7 +688,8 @@ void executeMultipartUpload(
680688
.uploadPart(uploadRequest, RequestBody.fromInputStream(input, partSize));
681689
return CompletedPart.builder().partNumber(partNum).eTag(uploadResponse.eTag()).build();
682690
}
683-
}
691+
},
692+
failIfAlreadyExists
684693
);
685694
}
686695

@@ -699,7 +708,8 @@ void executeMultipartCopy(
699708
final S3BlobContainer sourceContainer,
700709
final String sourceBlobName,
701710
final String destinationBlobName,
702-
final long blobSize
711+
final long blobSize,
712+
final boolean failIfAlreadyExists
703713
) throws IOException {
704714
final long copyPartSize = MAX_FILE_SIZE.getBytes();
705715
final var destinationKey = buildKey(destinationBlobName);
@@ -727,7 +737,8 @@ void executeMultipartCopy(
727737
final var uploadPartCopyResponse = clientReference.client().uploadPartCopy(uploadPartCopyRequest);
728738
return CompletedPart.builder().partNumber(partNum).eTag(uploadPartCopyResponse.copyPartResult().eTag()).build();
729739
}
730-
})
740+
}),
741+
failIfAlreadyExists
731742
);
732743
}
733744

modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreContainerTests.java

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,14 @@ public void testExecuteSingleUploadBlobSizeTooLarge() {
6969

7070
final IllegalArgumentException e = expectThrows(
7171
IllegalArgumentException.class,
72-
() -> blobContainer.executeSingleUpload(randomPurpose(), blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)
72+
() -> blobContainer.executeSingleUpload(
73+
randomPurpose(),
74+
blobStore,
75+
randomAlphaOfLengthBetween(1, 10),
76+
null,
77+
blobSize,
78+
randomBoolean()
79+
)
7380
);
7481
assertEquals("Upload request size [" + blobSize + "] can't be larger than 5gb", e.getMessage());
7582
}
@@ -88,7 +95,8 @@ public void testExecuteSingleUploadBlobSizeLargerThanBufferSize() {
8895
blobStore,
8996
blobName,
9097
new ByteArrayInputStream(new byte[0]),
91-
ByteSizeUnit.MB.toBytes(2)
98+
ByteSizeUnit.MB.toBytes(2),
99+
randomBoolean()
92100
)
93101
);
94102
assertEquals("Upload request size [2097152] can't be larger than buffer size", e.getMessage());
@@ -123,6 +131,8 @@ public void testExecuteSingleUpload() throws IOException {
123131
when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
124132
}
125133

134+
final boolean failIfAlreadyExists = randomBoolean();
135+
126136
final S3Client client = configureMockClient(blobStore);
127137

128138
final ArgumentCaptor<PutObjectRequest> requestCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);
@@ -131,7 +141,7 @@ public void testExecuteSingleUpload() throws IOException {
131141
when(client.putObject(requestCaptor.capture(), bodyCaptor.capture())).thenReturn(PutObjectResponse.builder().build());
132142

133143
final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[blobSize]);
134-
blobContainer.executeSingleUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize);
144+
blobContainer.executeSingleUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, failIfAlreadyExists);
135145

136146
final PutObjectRequest request = requestCaptor.getValue();
137147
assertEquals(bucketName, request.bucket());
@@ -147,6 +157,10 @@ public void testExecuteSingleUpload() throws IOException {
147157
);
148158
}
149159

160+
if (failIfAlreadyExists) {
161+
assertEquals("*", request.ifNoneMatch());
162+
}
163+
150164
final RequestBody requestBody = bodyCaptor.getValue();
151165
try (var contentStream = requestBody.contentStreamProvider().newStream()) {
152166
assertEquals(inputStream.available(), blobSize);
@@ -164,7 +178,14 @@ public void testExecuteMultipartUploadBlobSizeTooLarge() {
164178

165179
final IllegalArgumentException e = expectThrows(
166180
IllegalArgumentException.class,
167-
() -> blobContainer.executeMultipartUpload(randomPurpose(), blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)
181+
() -> blobContainer.executeMultipartUpload(
182+
randomPurpose(),
183+
blobStore,
184+
randomAlphaOfLengthBetween(1, 10),
185+
null,
186+
blobSize,
187+
randomBoolean()
188+
)
168189
);
169190
assertEquals("Multipart upload request size [" + blobSize + "] can't be larger than 5tb", e.getMessage());
170191
}
@@ -176,7 +197,14 @@ public void testExecuteMultipartUploadBlobSizeTooSmall() {
176197

177198
final IllegalArgumentException e = expectThrows(
178199
IllegalArgumentException.class,
179-
() -> blobContainer.executeMultipartUpload(randomPurpose(), blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)
200+
() -> blobContainer.executeMultipartUpload(
201+
randomPurpose(),
202+
blobStore,
203+
randomAlphaOfLengthBetween(1, 10),
204+
null,
205+
blobSize,
206+
randomBoolean()
207+
)
180208
);
181209
assertEquals("Multipart upload request size [" + blobSize + "] can't be smaller than 5mb", e.getMessage());
182210
}
@@ -225,6 +253,8 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
225253
when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
226254
}
227255

256+
final boolean failIfAlreadyExists = doCopy ? false : randomBoolean();
257+
228258
final S3Client client = configureMockClient(blobStore);
229259

230260
final var uploadId = randomIdentifier();
@@ -271,9 +301,9 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
271301
final S3BlobContainer sourceContainer = new S3BlobContainer(sourceBlobPath, sourceBlobStore);
272302

273303
if (doCopy) {
274-
blobContainer.executeMultipartCopy(randomPurpose(), sourceContainer, sourceBlobName, blobName, blobSize);
304+
blobContainer.executeMultipartCopy(randomPurpose(), sourceContainer, sourceBlobName, blobName, blobSize, false);
275305
} else {
276-
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize);
306+
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, failIfAlreadyExists);
277307
}
278308

279309
final CreateMultipartUploadRequest initRequest = createMultipartUploadRequestCaptor.getValue();
@@ -340,6 +370,10 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
340370
assertEquals(blobPath.buildAsString() + blobName, compRequest.key());
341371
assertEquals(uploadId, compRequest.uploadId());
342372

373+
if (failIfAlreadyExists) {
374+
assertEquals("*", compRequest.ifNoneMatch());
375+
}
376+
343377
final List<String> actualETags = compRequest.multipartUpload()
344378
.parts()
345379
.stream()
@@ -419,7 +453,14 @@ public void close() {}
419453

420454
final IOException e = expectThrows(IOException.class, () -> {
421455
final S3BlobContainer blobContainer = new S3BlobContainer(BlobPath.EMPTY, blobStore);
422-
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, new ByteArrayInputStream(new byte[0]), blobSize);
456+
blobContainer.executeMultipartUpload(
457+
randomPurpose(),
458+
blobStore,
459+
blobName,
460+
new ByteArrayInputStream(new byte[0]),
461+
blobSize,
462+
randomBoolean()
463+
);
423464
});
424465

425466
assertEquals("Unable to upload or copy object [" + blobName + "] using multipart upload", e.getMessage());

test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ public void handle(final HttpExchange exchange) throws IOException {
189189

190190
} else if (request.isCompleteMultipartUploadRequest()) {
191191
final byte[] responseBody;
192+
boolean preconditionFailed = false;
192193
synchronized (uploads) {
193194
final var upload = removeUpload(request.getQueryParamOnce("uploadId"));
194195
if (upload == null) {
@@ -206,19 +207,27 @@ public void handle(final HttpExchange exchange) throws IOException {
206207
}
207208
} else {
208209
final var blobContents = upload.complete(extractPartEtags(Streams.readFully(exchange.getRequestBody())));
209-
blobs.put(request.path(), blobContents);
210-
responseBody = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
211-
+ "<CompleteMultipartUploadResult>\n"
212-
+ "<Bucket>"
213-
+ bucket
214-
+ "</Bucket>\n"
215-
+ "<Key>"
216-
+ request.path()
217-
+ "</Key>\n"
218-
+ "</CompleteMultipartUploadResult>").getBytes(StandardCharsets.UTF_8);
210+
211+
if (isProtectOverwrite(exchange) && blobs.containsKey(request.path())) {
212+
preconditionFailed = true;
213+
responseBody = null;
214+
} else {
215+
blobs.put(request.path(), blobContents);
216+
responseBody = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
217+
+ "<CompleteMultipartUploadResult>\n"
218+
+ "<Bucket>"
219+
+ bucket
220+
+ "</Bucket>\n"
221+
+ "<Key>"
222+
+ request.path()
223+
+ "</Key>\n"
224+
+ "</CompleteMultipartUploadResult>").getBytes(StandardCharsets.UTF_8);
225+
}
219226
}
220227
}
221-
if (responseBody == null) {
228+
if (preconditionFailed) {
229+
exchange.sendResponseHeaders(RestStatus.PRECONDITION_FAILED.getStatus(), -1);
230+
} else if (responseBody == null) {
222231
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
223232
} else {
224233
exchange.getResponseHeaders().add("Content-Type", "application/xml");
@@ -232,7 +241,9 @@ public void handle(final HttpExchange exchange) throws IOException {
232241
} else if (request.isPutObjectRequest()) {
233242
// a copy request is a put request with an X-amz-copy-source header
234243
final var copySource = copySourceName(exchange);
235-
if (copySource != null) {
244+
if (isProtectOverwrite(exchange) && blobs.containsKey(request.path())) {
245+
exchange.sendResponseHeaders(RestStatus.PRECONDITION_FAILED.getStatus(), -1);
246+
} else if (copySource != null) {
236247
var sourceBlob = blobs.get(copySource);
237248
if (sourceBlob == null) {
238249
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
@@ -540,6 +551,18 @@ private static HttpHeaderParser.Range parsePartRange(final HttpExchange exchange
540551
return parseRangeHeader(sourceRangeHeaders.getFirst());
541552
}
542553

554+
private static boolean isProtectOverwrite(final HttpExchange exchange) {
555+
final var ifNoneMatch = exchange.getRequestHeaders().getFirst("If-None-Match");
556+
557+
if (ifNoneMatch == null) {
558+
return false;
559+
} else if (ifNoneMatch.equals("*")) {
560+
return true;
561+
}
562+
563+
throw new AssertionError("invalid If-None-Match header: " + ifNoneMatch);
564+
}
565+
543566
MultipartUpload putUpload(String path) {
544567
final var upload = new MultipartUpload(UUIDs.randomBase64UUID(), path);
545568
synchronized (uploads) {

0 commit comments

Comments
 (0)