Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,24 @@ private void executeMultipart(
final String uploadId;
try (AmazonS3Reference clientReference = s3BlobStore.clientReference()) {
uploadId = clientReference.client().createMultipartUpload(createMultipartUpload(purpose, operation, blobName)).uploadId();
cleanupOnFailureActions.add(() -> abortMultiPartUpload(purpose, uploadId, blobName));
cleanupOnFailureActions.add(() -> {
try {
abortMultiPartUpload(purpose, uploadId, blobName);
} catch (Exception e) {
if (e instanceof SdkServiceException sdkServiceException
&& sdkServiceException.statusCode() == RestStatus.NOT_FOUND.getStatus()) {
// NOT_FOUND is what we wanted
logger.atDebug()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a change of behaviour. CMIIW, but previously if abortMultiPartUpload threw a SdkServiceException then the exception would propagate upwards and get caught in the outer catch block here, and then throw a NoSuchFileException to the user. Now, since we're swallowing the exception, this isn't true.

If this is an intentional change in behaviour then I think it worthy enough for a unit test (as we don't want this code to change in future and then start returning an exception were previously there was none)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. This is covered by tests already in this PR - it's the reason why 5a9eace failed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll open a separate PR to address this, it's somewhat orthogonal to the point of this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok see #138569

.withThrowable(e)
.log("multipart upload of [{}] with ID [{}] not found on abort", blobName, uploadId);
} else {
// aborting the upload on failure is a best-effort cleanup step - if it fails then we must just move on
logger.atWarn()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL that there was an alternate logging format!

.withThrowable(e)
.log("failed to clean up multipart upload of [{}] with ID [{}] after earlier failure", blobName, uploadId);
}
}
});
}
if (Strings.isEmpty(uploadId)) {
throw new IOException("Failed to initialize multipart operation for " + blobName);
Expand Down
100 changes: 77 additions & 23 deletions test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.fixture.HttpHeaderParser;

import java.io.IOException;
Expand All @@ -44,6 +45,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -190,14 +192,15 @@ public void handle(final HttpExchange exchange) throws IOException {

} else if (request.isCompleteMultipartUploadRequest()) {
final byte[] responseBody;
final boolean preconditionFailed;
final RestStatus responseCode;
synchronized (uploads) {
final var upload = getUpload(request.getQueryParamOnce("uploadId"));
if (upload == null) {
preconditionFailed = false;
if (Randomness.get().nextBoolean()) {
responseCode = RestStatus.NOT_FOUND;
responseBody = null;
} else {
responseCode = RestStatus.OK;
responseBody = """
<?xml version="1.0" encoding="UTF-8"?>
<Error>
Expand All @@ -209,10 +212,8 @@ public void handle(final HttpExchange exchange) throws IOException {
}
} else {
final var blobContents = upload.complete(extractPartEtags(Streams.readFully(exchange.getRequestBody())));

preconditionFailed = updateBlobContents(exchange, request.path(), blobContents) == false;

if (preconditionFailed == false) {
responseCode = updateBlobContents(exchange, request.path(), blobContents);
if (responseCode == RestStatus.OK) {
responseBody = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
+ "<CompleteMultipartUploadResult>\n"
+ "<Bucket>"
Expand All @@ -222,20 +223,20 @@ public void handle(final HttpExchange exchange) throws IOException {
+ request.path()
+ "</Key>\n"
+ "</CompleteMultipartUploadResult>").getBytes(StandardCharsets.UTF_8);
removeUpload(upload.getUploadId());
} else {
responseBody = null;
}
if (responseCode != RestStatus.PRECONDITION_FAILED) {
removeUpload(upload.getUploadId());
}
}
}
if (preconditionFailed) {
exchange.sendResponseHeaders(RestStatus.PRECONDITION_FAILED.getStatus(), -1);
} else if (responseBody == null) {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
} else {
if (responseCode == RestStatus.OK) {
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), responseBody.length);
exchange.getResponseBody().write(responseBody);
} else {
exchange.sendResponseHeaders(responseCode.getStatus(), -1);
}
} else if (request.isAbortMultipartUploadRequest()) {
final var upload = removeUpload(request.getQueryParamOnce("uploadId"));
Expand All @@ -248,6 +249,9 @@ public void handle(final HttpExchange exchange) throws IOException {
if (isProtectOverwrite(exchange)) {
throw new AssertionError("If-None-Match: * header is not supported here");
}
if (getRequiredExistingETag(exchange) != null) {
throw new AssertionError("If-Match: * header is not supported here");
}

var sourceBlob = blobs.get(copySource);
if (sourceBlob == null) {
Expand All @@ -264,14 +268,12 @@ public void handle(final HttpExchange exchange) throws IOException {
}
} else {
final Tuple<String, BytesReference> blob = parseRequestBody(exchange);
final var preconditionFailed = updateBlobContents(exchange, request.path(), blob.v2()) == false;
final var updateResponseCode = updateBlobContents(exchange, request.path(), blob.v2());

if (preconditionFailed) {
exchange.sendResponseHeaders(RestStatus.PRECONDITION_FAILED.getStatus(), -1);
} else {
if (updateResponseCode == RestStatus.OK) {
exchange.getResponseHeaders().add("ETag", blob.v1());
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
}
exchange.sendResponseHeaders(updateResponseCode.getStatus(), -1);
}

} else if (request.isListObjectsRequest()) {
Expand Down Expand Up @@ -407,15 +409,41 @@ public void handle(final HttpExchange exchange) throws IOException {
/**
* Update the blob contents if and only if the preconditions in the request are satisfied.
*
* @return whether the blob contents were updated: if {@code false} then a requested precondition was not satisfied.
* @return {@link RestStatus#OK} if the blob contents were updated, or else a different status code to indicate the error: possibly
* {@link RestStatus#CONFLICT} in any case, but if not then either {@link RestStatus#PRECONDITION_FAILED} if the object exists
* but doesn't match the specified precondition, or {@link RestStatus#NOT_FOUND} if the object doesn't exist but is required to
* do so by the precondition.
*
* @see <a href="https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-writes.html#conditional-error-response">AWS docs</a>
*/
private boolean updateBlobContents(HttpExchange exchange, String path, BytesReference newContents) {
private RestStatus updateBlobContents(HttpExchange exchange, String path, BytesReference newContents) {
if (isProtectOverwrite(exchange)) {
return blobs.putIfAbsent(path, newContents) == null;
} else {
blobs.put(path, newContents);
return true;
return blobs.putIfAbsent(path, newContents) == null
? RestStatus.OK
: ESTestCase.randomFrom(RestStatus.PRECONDITION_FAILED, RestStatus.CONFLICT);
}

final var requireExistingETag = getRequiredExistingETag(exchange);
if (requireExistingETag != null) {
final var responseCode = new AtomicReference<>(RestStatus.OK);
blobs.compute(path, (ignoredPath, existingContents) -> {
if (existingContents != null && requireExistingETag.equals(getEtagFromContents(existingContents))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no existing object and it's If-Match it should return 404. I assume in this case it would return 412 precondition failed.

https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-writes.html#conditional-error-response

If there's no current object version with the same name, or if the current object version is a delete marker, the operation fails with a 404 Not Found error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a test for If-Match for non-existing object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks, well spotted. Bit of an odd choice tbh but I'll try and find a way to match that behaviour.

return newContents;
}

responseCode.set(
ESTestCase.randomFrom(
existingContents == null ? RestStatus.NOT_FOUND : RestStatus.PRECONDITION_FAILED,
RestStatus.CONFLICT
)
);
Comment on lines +434 to +439
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need randomization for the conflict? Is it because we synchronize on blobs and cant detect conflicting uploads, i.e. handler linearize writes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AWS docs are not 100% clear on exactly what conditions lead to a 409 here, but it doesn't really matter from our point of view anyway: we just need to make sure we handle both 409 and 412s as acceptable failures.

return existingContents;
});
return responseCode.get();
}

blobs.put(path, newContents);
return RestStatus.OK;
}

/**
Expand Down Expand Up @@ -594,6 +622,9 @@ private static boolean isProtectOverwrite(final HttpExchange exchange) {
return false;
}

if (exchange.getRequestHeaders().get("If-Match") != null) {
throw new AssertionError("Handling both If-None-Match and If-Match headers is not supported");
}
if (ifNoneMatch.size() != 1) {
throw new AssertionError("multiple If-None-Match headers found: " + ifNoneMatch);
}
Expand All @@ -605,6 +636,29 @@ private static boolean isProtectOverwrite(final HttpExchange exchange) {
throw new AssertionError("invalid If-None-Match header: " + ifNoneMatch);
}

@Nullable // if no If-Match header found
private static String getRequiredExistingETag(final HttpExchange exchange) {
final var ifMatch = exchange.getRequestHeaders().get("If-Match");

if (ifMatch == null) {
return null;
}

if (exchange.getRequestHeaders().get("If-None-Match") != null) {
throw new AssertionError("Handling both If-None-Match and If-Match headers is not supported");
}

final var iterator = ifMatch.iterator();
if (iterator.hasNext()) {
final var result = iterator.next();
if (iterator.hasNext() == false) {
return result;
}
}
Comment on lines +651 to +657
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it should be exactly-one-item, not last-item.


throw new AssertionError("multiple If-Match headers found: " + ifMatch);
}

MultipartUpload putUpload(String path) {
final var upload = new MultipartUpload(UUIDs.randomBase64UUID(), path);
synchronized (uploads) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.oneOf;

public class S3HttpHandlerTests extends ESTestCase {

Expand Down Expand Up @@ -412,22 +411,33 @@ public void testExtractPartEtags() {

}

public void testPreventObjectOverwrite() throws InterruptedException {
public void testPreventObjectOverwrite() {
ensureExactlyOneSuccess(new S3HttpHandler("bucket", "path"), null);
}

public void testConditionalOverwrite() {
final var handler = new S3HttpHandler("bucket", "path");

var tasks = List.of(
createPutObjectTask(handler),
createPutObjectTask(handler),
createMultipartUploadTask(handler),
createMultipartUploadTask(handler)
final var originalBody = new BytesArray(randomAlphaOfLength(50).getBytes(StandardCharsets.UTF_8));
final var originalETag = S3HttpHandler.getEtagFromContents(originalBody);
assertEquals(RestStatus.OK, handleRequest(handler, "PUT", "/bucket/path/blob", originalBody).status());
assertEquals(
new TestHttpResponse(RestStatus.OK, originalBody, addETag(originalETag, TestHttpExchange.EMPTY_HEADERS)),
handleRequest(handler, "GET", "/bucket/path/blob")
);

try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
tasks.forEach(task -> executor.submit(task.consumer));
executor.shutdown();
var done = executor.awaitTermination(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS);
assertTrue(done);
}
ensureExactlyOneSuccess(handler, originalETag);
}

private static void ensureExactlyOneSuccess(S3HttpHandler handler, String originalETag) {
final var tasks = List.of(
createPutObjectTask(handler, originalETag),
createPutObjectTask(handler, originalETag),
createMultipartUploadTask(handler, originalETag),
createMultipartUploadTask(handler, originalETag)
);

runInParallel(tasks.size(), i -> tasks.get(i).consumer.run());

List<TestWriteTask> successfulTasks = tasks.stream().filter(task -> task.status == RestStatus.OK).toList();
assertThat(successfulTasks, hasSize(1));
Expand All @@ -436,6 +446,7 @@ public void testPreventObjectOverwrite() throws InterruptedException {
if (task.status == RestStatus.PRECONDITION_FAILED) {
assertNotNull(handler.getUpload(task.uploadId));
} else {
assertThat(task.status, oneOf(RestStatus.OK, RestStatus.CONFLICT));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, this test is only checking that of the four requests, exactly one succeeds, but it doesn't assert on the order we expect the rest statuses to be returned.

List<TestWriteTask> successfulTasks = tasks.stream().filter(task -> task.status == RestStatus.OK).toList();
assertThat(successfulTasks, hasSize(1));

passes if the first three requests fail and the fourth succeeds, and then again

assertThat(task.status, oneOf(RestStatus.OK, RestStatus.CONFLICT));

is indifferent to the order.

Is there value in making stronger assertions about when we expect the RestStatus to be OK versus CONFLICT? Ie, we put an object that doesn't exist, then expect OK, then put the same object again N times each expecting CONFLICT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're running the requests in parallel and have no expectations about which one of them might win the race to succeed. We just need to know that the other 3 fail.

assertNull(handler.getUpload(task.uploadId));
}
});
Expand All @@ -450,13 +461,38 @@ public void testPreventObjectOverwrite() throws InterruptedException {
);
}

private static TestWriteTask createPutObjectTask(S3HttpHandler handler) {
public void testPutObjectIfMatchWithBlobNotFound() {
final var handler = new S3HttpHandler("bucket", "path");
while (true) {
final var task = createPutObjectTask(handler, randomIdentifier());
task.consumer.run();
if (task.status == RestStatus.NOT_FOUND) {
break;
}
assertEquals(RestStatus.CONFLICT, task.status); // chosen randomly so eventually we will escape the loop
}
}

public void testCompleteMultipartUploadIfMatchWithBlobNotFound() {
final var handler = new S3HttpHandler("bucket", "path");
while (true) {
final var task = createMultipartUploadTask(handler, randomIdentifier());
task.consumer.run();
if (task.status == RestStatus.NOT_FOUND) {
break;
}
assertEquals(RestStatus.CONFLICT, task.status); // chosen randomly so eventually we will escape the loop
}
}

private static TestWriteTask createPutObjectTask(S3HttpHandler handler, @Nullable String originalETag) {
return new TestWriteTask(
(task) -> task.status = handleRequest(handler, "PUT", "/bucket/path/blob", task.body, ifNoneMatchHeader()).status()
(task) -> task.status = handleRequest(handler, "PUT", "/bucket/path/blob", task.body, conditionalWriteHeader(originalETag))
.status()
);
}

private static TestWriteTask createMultipartUploadTask(S3HttpHandler handler) {
private static TestWriteTask createMultipartUploadTask(S3HttpHandler handler, @Nullable String originalETag) {
final var multipartUploadTask = new TestWriteTask(
(task) -> task.status = handleRequest(
handler,
Expand All @@ -470,7 +506,7 @@ private static TestWriteTask createMultipartUploadTask(S3HttpHandler handler) {
<PartNumber>1</PartNumber>
</Part>
</CompleteMultipartUpload>""", task.etag)),
ifNoneMatchHeader()
conditionalWriteHeader(originalETag)
).status()
);

Expand Down Expand Up @@ -599,9 +635,13 @@ private static Headers contentRangeHeader(long start, long end, long length) {
return headers;
}

private static Headers ifNoneMatchHeader() {
private static Headers conditionalWriteHeader(@Nullable String originalEtag) {
var headers = new Headers();
headers.put("If-None-Match", List.of("*"));
if (originalEtag == null) {
headers.put("If-None-Match", List.of("*"));
} else {
headers.put("If-Match", List.of(originalEtag));
}
return headers;
}

Expand Down