Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
f3c380d
Implement failIfAlreadyExists in S3 repositories
kvanerum Aug 16, 2025
35d427c
Test failIfAlreadyExists for concurrent writes
kvanerum Aug 18, 2025
3d54119
Disable testFailIfAlreadyExists for HdfsRepositoryTests
kvanerum Aug 21, 2025
8e8f3e7
Fix race in S3HttpHandler overwrite protection
kvanerum Aug 21, 2025
3c2c833
Replace `failIfAlreadyExists` variable with inline `randomBoolean` in…
kvanerum Aug 21, 2025
c1974ad
Don't remove uploads when precondition failed on S3 completeMultipart…
kvanerum Aug 23, 2025
4c479f4
Test failIfAlreadyExists in S3BlobStoreRepositoryTests with multipart…
kvanerum Aug 23, 2025
a903309
Merge branch 'main' into s3-failIfAlreadyExists
kvanerum Aug 23, 2025
ad360ca
Cleanup
kvanerum Aug 23, 2025
cef778c
Merge branch 'main' into 0reviews/2025/09/01/kvanerum/s3-failIfAlread…
DaveCTurner Sep 1, 2025
6979fbe
Add changelog
DaveCTurner Sep 1, 2025
9c860cd
Inline failIfAlreadyExists in executeMultipartCopy
kvanerum Sep 3, 2025
b5c0088
Throw error on multiple If-None-Match headers in S3HttpHandler
kvanerum Sep 3, 2025
75a91cf
Refactor S3HttpHandlerTests to simplify task creation with helper met…
kvanerum Sep 3, 2025
c975a60
Use constant for safe await timeout in S3HttpHandlerTests
kvanerum Sep 3, 2025
249e8bb
Improve readability in AbstractThirdPartyRepositoryTestCase.testFailI…
kvanerum Sep 3, 2025
884dcc8
Simplify copy object logic in S3HttpHandler by removing unused precon…
kvanerum Sep 4, 2025
a839a47
Merge branch 'main' into s3-failIfAlreadyExists
kvanerum Sep 4, 2025
419f4ab
Merge branch 'main' into s3-failIfAlreadyExists
DaveCTurner Sep 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ public void testMultipartCopy() {
// executeMultipart requires a minimum part size of 5 MiB
final var blobBytes = randomBytesReference(randomIntBetween(5 * 1024 * 1024, 10 * 1024 * 1024));
final var destinationBlobName = randomIdentifier();
final var failIfAlreadyExists = randomBoolean();

final var repository = getRepository();

Expand All @@ -277,7 +278,8 @@ public void testMultipartCopy() {
(S3BlobContainer) sourceBlobContainer,
sourceBlobName,
destinationBlobName,
blobBytes.length()
blobBytes.length(),
failIfAlreadyExists
);

return destinationBlobContainer.readBlob(randomPurpose(), destinationBlobName).readAllBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;

@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
// Need to set up a new cluster for each test because cluster settings use randomized authentication settings
Expand Down Expand Up @@ -425,7 +426,7 @@ public void testEnforcedCooldownPeriod() throws IOException {
if (randomBoolean()) {
repository.blobStore()
.blobContainer(repository.basePath())
.writeBlobAtomic(randomNonDataPurpose(), getRepositoryDataBlobName(modifiedRepositoryData.getGenId()), serialized, true);
.writeBlobAtomic(randomNonDataPurpose(), getRepositoryDataBlobName(modifiedRepositoryData.getGenId()), serialized, false);
} else {
repository.blobStore()
.blobContainer(repository.basePath())
Expand All @@ -434,7 +435,7 @@ public void testEnforcedCooldownPeriod() throws IOException {
getRepositoryDataBlobName(modifiedRepositoryData.getGenId()),
serialized.streamInput(),
serialized.length(),
true
false
);
}

Expand Down Expand Up @@ -568,6 +569,27 @@ public void match(LogEvent event) {
}
}

public void testFailIfAlreadyExists() throws IOException {
try (BlobStore store = newBlobStore()) {
final BlobContainer container = store.blobContainer(BlobPath.EMPTY);
final String blobName = randomAlphaOfLengthBetween(8, 12);
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));

// initial write blob
writeBlob(container, blobName, new BytesArray(data), true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a good spot to check for potential races in S3HttpHandler, if we initially wrote two blobs concurrently and verified that exactly one of those writes succeeded.

Could we also sometimes write a much larger blob to trigger the multipart upload path?


// override if failIfAlreadyExists is set to false
writeBlob(container, blobName, new BytesArray(data), false);

// throw exception if failIfAlreadyExists is set to true
var exception = expectThrows(IOException.class, () -> writeBlob(container, blobName, new BytesArray(data), true));

assertThat(exception.getMessage(), startsWith("Unable to upload object"));

container.delete(randomPurpose());
}
}

/**
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,15 @@ public long readBlobPreferredLength() {
return ByteSizeValue.of(32, ByteSizeUnit.MB).getBytes();
}

/**
* This implementation ignores the failIfAlreadyExists flag as the S3 API has no way to enforce this due to its weak consistency model.
*/
@Override
public void writeBlob(OperationPurpose purpose, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws IOException {
assert BlobContainer.assertPurposeConsistency(purpose, blobName);
assert inputStream.markSupported() : "No mark support on inputStream breaks the S3 SDK's ability to retry requests";
if (blobSize <= getLargeBlobThresholdInBytes()) {
executeSingleUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize);
executeSingleUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
} else {
executeMultipartUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize);
executeMultipartUpload(purpose, blobStore, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
}
}

Expand Down Expand Up @@ -366,7 +363,7 @@ public void copyBlob(

try {
if (blobSize > getMaxCopySizeBeforeMultipart()) {
executeMultipartCopy(purpose, s3SourceBlobContainer, sourceBlobName, blobName, blobSize);
executeMultipartCopy(purpose, s3SourceBlobContainer, sourceBlobName, blobName, blobSize, false);
} else {
// metadata is inherited from source, but not canned ACL or storage class
final var blobKey = buildKey(blobName);
Expand Down Expand Up @@ -545,7 +542,8 @@ void executeSingleUpload(
final S3BlobStore s3BlobStore,
final String blobName,
final InputStream input,
final long blobSize
final long blobSize,
final boolean failIfAlreadyExists
) throws IOException {
try (var clientReference = s3BlobStore.clientReference()) {
// Extra safety checks
Expand All @@ -565,6 +563,9 @@ void executeSingleUpload(
if (s3BlobStore.serverSideEncryption()) {
putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
if (failIfAlreadyExists) {
putRequestBuilder.ifNoneMatch("*");
}
S3BlobStore.configureRequestForMetrics(putRequestBuilder, blobStore, Operation.PUT_OBJECT, purpose);

final var putRequest = putRequestBuilder.build();
Expand All @@ -586,7 +587,8 @@ private void executeMultipart(
final String blobName,
final long partSize,
final long blobSize,
final PartOperation partOperation
final PartOperation partOperation,
final boolean failIfAlreadyExists
) throws IOException {

ensureMultiPartUploadSize(blobSize);
Expand Down Expand Up @@ -639,6 +641,11 @@ private void executeMultipart(
.key(blobName)
.uploadId(uploadId)
.multipartUpload(b -> b.parts(parts));

if (failIfAlreadyExists) {
completeMultipartUploadRequestBuilder.ifNoneMatch("*");
}

S3BlobStore.configureRequestForMetrics(completeMultipartUploadRequestBuilder, blobStore, operation, purpose);
final var completeMultipartUploadRequest = completeMultipartUploadRequestBuilder.build();
try (var clientReference = s3BlobStore.clientReference()) {
Expand All @@ -663,7 +670,8 @@ void executeMultipartUpload(
final S3BlobStore s3BlobStore,
final String blobName,
final InputStream input,
final long blobSize
final long blobSize,
final boolean failIfAlreadyExists
) throws IOException {
executeMultipart(
purpose,
Expand All @@ -680,7 +688,8 @@ void executeMultipartUpload(
.uploadPart(uploadRequest, RequestBody.fromInputStream(input, partSize));
return CompletedPart.builder().partNumber(partNum).eTag(uploadResponse.eTag()).build();
}
}
},
failIfAlreadyExists
);
}

Expand All @@ -699,7 +708,8 @@ void executeMultipartCopy(
final S3BlobContainer sourceContainer,
final String sourceBlobName,
final String destinationBlobName,
final long blobSize
final long blobSize,
final boolean failIfAlreadyExists
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parameter is always false (in production code anyway) and the true case isn't really tested - I'd rather we inlined it, passing the literal false to executeMultipart, instead.

) throws IOException {
final long copyPartSize = MAX_FILE_SIZE.getBytes();
final var destinationKey = buildKey(destinationBlobName);
Expand Down Expand Up @@ -727,7 +737,8 @@ void executeMultipartCopy(
final var uploadPartCopyResponse = clientReference.client().uploadPartCopy(uploadPartCopyRequest);
return CompletedPart.builder().partNumber(partNum).eTag(uploadPartCopyResponse.copyPartResult().eTag()).build();
}
})
}),
failIfAlreadyExists
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,14 @@ public void testExecuteSingleUploadBlobSizeTooLarge() {

final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> blobContainer.executeSingleUpload(randomPurpose(), blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)
() -> blobContainer.executeSingleUpload(
randomPurpose(),
blobStore,
randomAlphaOfLengthBetween(1, 10),
null,
blobSize,
randomBoolean()
)
);
assertEquals("Upload request size [" + blobSize + "] can't be larger than 5gb", e.getMessage());
}
Expand All @@ -88,7 +95,8 @@ public void testExecuteSingleUploadBlobSizeLargerThanBufferSize() {
blobStore,
blobName,
new ByteArrayInputStream(new byte[0]),
ByteSizeUnit.MB.toBytes(2)
ByteSizeUnit.MB.toBytes(2),
randomBoolean()
)
);
assertEquals("Upload request size [2097152] can't be larger than buffer size", e.getMessage());
Expand Down Expand Up @@ -123,6 +131,8 @@ public void testExecuteSingleUpload() throws IOException {
when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
}

final boolean failIfAlreadyExists = randomBoolean();

final S3Client client = configureMockClient(blobStore);

final ArgumentCaptor<PutObjectRequest> requestCaptor = ArgumentCaptor.forClass(PutObjectRequest.class);
Expand All @@ -131,7 +141,7 @@ public void testExecuteSingleUpload() throws IOException {
when(client.putObject(requestCaptor.capture(), bodyCaptor.capture())).thenReturn(PutObjectResponse.builder().build());

final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[blobSize]);
blobContainer.executeSingleUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize);
blobContainer.executeSingleUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, failIfAlreadyExists);

final PutObjectRequest request = requestCaptor.getValue();
assertEquals(bucketName, request.bucket());
Expand All @@ -147,6 +157,10 @@ public void testExecuteSingleUpload() throws IOException {
);
}

if (failIfAlreadyExists) {
assertEquals("*", request.ifNoneMatch());
}

final RequestBody requestBody = bodyCaptor.getValue();
try (var contentStream = requestBody.contentStreamProvider().newStream()) {
assertEquals(inputStream.available(), blobSize);
Expand All @@ -164,7 +178,14 @@ public void testExecuteMultipartUploadBlobSizeTooLarge() {

final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> blobContainer.executeMultipartUpload(randomPurpose(), blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)
() -> blobContainer.executeMultipartUpload(
randomPurpose(),
blobStore,
randomAlphaOfLengthBetween(1, 10),
null,
blobSize,
randomBoolean()
)
);
assertEquals("Multipart upload request size [" + blobSize + "] can't be larger than 5tb", e.getMessage());
}
Expand All @@ -176,7 +197,14 @@ public void testExecuteMultipartUploadBlobSizeTooSmall() {

final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> blobContainer.executeMultipartUpload(randomPurpose(), blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)
() -> blobContainer.executeMultipartUpload(
randomPurpose(),
blobStore,
randomAlphaOfLengthBetween(1, 10),
null,
blobSize,
randomBoolean()
)
);
assertEquals("Multipart upload request size [" + blobSize + "] can't be smaller than 5mb", e.getMessage());
}
Expand Down Expand Up @@ -225,6 +253,8 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
}

final boolean failIfAlreadyExists = doCopy ? false : randomBoolean();

final S3Client client = configureMockClient(blobStore);

final var uploadId = randomIdentifier();
Expand Down Expand Up @@ -271,9 +301,9 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
final S3BlobContainer sourceContainer = new S3BlobContainer(sourceBlobPath, sourceBlobStore);

if (doCopy) {
blobContainer.executeMultipartCopy(randomPurpose(), sourceContainer, sourceBlobName, blobName, blobSize);
blobContainer.executeMultipartCopy(randomPurpose(), sourceContainer, sourceBlobName, blobName, blobSize, false);
} else {
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize);
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize, failIfAlreadyExists);
}

final CreateMultipartUploadRequest initRequest = createMultipartUploadRequestCaptor.getValue();
Expand Down Expand Up @@ -340,6 +370,10 @@ void testExecuteMultipart(boolean doCopy) throws IOException {
assertEquals(blobPath.buildAsString() + blobName, compRequest.key());
assertEquals(uploadId, compRequest.uploadId());

if (failIfAlreadyExists) {
assertEquals("*", compRequest.ifNoneMatch());
}

final List<String> actualETags = compRequest.multipartUpload()
.parts()
.stream()
Expand Down Expand Up @@ -419,7 +453,14 @@ public void close() {}

final IOException e = expectThrows(IOException.class, () -> {
final S3BlobContainer blobContainer = new S3BlobContainer(BlobPath.EMPTY, blobStore);
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, new ByteArrayInputStream(new byte[0]), blobSize);
blobContainer.executeMultipartUpload(
randomPurpose(),
blobStore,
blobName,
new ByteArrayInputStream(new byte[0]),
blobSize,
randomBoolean()
);
});

assertEquals("Unable to upload or copy object [" + blobName + "] using multipart upload", e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ public void handle(final HttpExchange exchange) throws IOException {

} else if (request.isCompleteMultipartUploadRequest()) {
final byte[] responseBody;
boolean preconditionFailed = false;
synchronized (uploads) {
final var upload = removeUpload(request.getQueryParamOnce("uploadId"));
if (upload == null) {
Expand All @@ -206,19 +207,27 @@ public void handle(final HttpExchange exchange) throws IOException {
}
} else {
final var blobContents = upload.complete(extractPartEtags(Streams.readFully(exchange.getRequestBody())));
blobs.put(request.path(), blobContents);
responseBody = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+ "<CompleteMultipartUploadResult>\n"
+ "<Bucket>"
+ bucket
+ "</Bucket>\n"
+ "<Key>"
+ request.path()
+ "</Key>\n"
+ "</CompleteMultipartUploadResult>").getBytes(StandardCharsets.UTF_8);

if (isProtectOverwrite(exchange) && blobs.containsKey(request.path())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking containsKey before the put is racy. I think we need a putIfAbsent if If-None-Match: * is specified, and therefore also a test which tries to catch a race here.

preconditionFailed = true;
responseBody = null;
} else {
blobs.put(request.path(), blobContents);
responseBody = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+ "<CompleteMultipartUploadResult>\n"
+ "<Bucket>"
+ bucket
+ "</Bucket>\n"
+ "<Key>"
+ request.path()
+ "</Key>\n"
+ "</CompleteMultipartUploadResult>").getBytes(StandardCharsets.UTF_8);
}
}
}
if (responseBody == null) {
if (preconditionFailed) {
exchange.sendResponseHeaders(RestStatus.PRECONDITION_FAILED.getStatus(), -1);
} else if (responseBody == null) {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
} else {
exchange.getResponseHeaders().add("Content-Type", "application/xml");
Expand All @@ -232,7 +241,9 @@ public void handle(final HttpExchange exchange) throws IOException {
} else if (request.isPutObjectRequest()) {
// a copy request is a put request with an X-amz-copy-source header
final var copySource = copySourceName(exchange);
if (copySource != null) {
if (isProtectOverwrite(exchange) && blobs.containsKey(request.path())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise here, we need to use putIfAbsent to avoid the race on this path.

exchange.sendResponseHeaders(RestStatus.PRECONDITION_FAILED.getStatus(), -1);
} else if (copySource != null) {
var sourceBlob = blobs.get(copySource);
if (sourceBlob == null) {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
Expand Down Expand Up @@ -540,6 +551,18 @@ private static HttpHeaderParser.Range parsePartRange(final HttpExchange exchange
return parseRangeHeader(sourceRangeHeaders.getFirst());
}

private static boolean isProtectOverwrite(final HttpExchange exchange) {
final var ifNoneMatch = exchange.getRequestHeaders().getFirst("If-None-Match");

if (ifNoneMatch == null) {
return false;
} else if (ifNoneMatch.equals("*")) {
return true;
}

throw new AssertionError("invalid If-None-Match header: " + ifNoneMatch);
}

MultipartUpload putUpload(String path) {
final var upload = new MultipartUpload(UUIDs.randomBase64UUID(), path);
synchronized (uploads) {
Expand Down
Loading