Skip to content

Commit 4356a3e

Browse files
authored
Ignore abort-on-cleanup failure in S3 repo (#138569)
If a `CompleteMultipartUpload` request fails then we attempt to clean up by calling the `AbortMultipartUpload` API. This cleanup may also fail, and a failure here will supersede the original failure. The cleanup is really a best-effort thing so we should ignore failures. In particular if the `CompleteMultipartUpload` has an `If-None-Match` precondition then this may fail with a `409 Conflict` and in that case the upload no longer exists, so we would expect the `AbortMultipartUpload` call to fail with a 404. This commit extends `S3HttpHandler` to respond with both 412 and 409s on precondition failures, cleaning up the upload on a 409 but not a 412, triggering the unwanted failure path in some tests, and adds exception handling to deal with it.
1 parent ef8a008 commit 4356a3e

File tree

4 files changed

+54
-33
lines changed

4 files changed

+54
-33
lines changed

docs/changelog/138569.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 138569
2+
summary: Ignore abort-on-cleanup failure in S3 repo
3+
area: Snapshot/Restore
4+
type: bug
5+
issues: []

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,24 @@ private void executeMultipart(
608608
final String uploadId;
609609
try (AmazonS3Reference clientReference = s3BlobStore.clientReference()) {
610610
uploadId = clientReference.client().createMultipartUpload(createMultipartUpload(purpose, operation, blobName)).uploadId();
611-
cleanupOnFailureActions.add(() -> abortMultiPartUpload(purpose, uploadId, blobName));
611+
cleanupOnFailureActions.add(() -> {
612+
try {
613+
abortMultiPartUpload(purpose, uploadId, blobName);
614+
} catch (Exception e) {
615+
if (e instanceof SdkServiceException sdkServiceException
616+
&& sdkServiceException.statusCode() == RestStatus.NOT_FOUND.getStatus()) {
617+
// NOT_FOUND is what we wanted
618+
logger.atDebug()
619+
.withThrowable(e)
620+
.log("multipart upload of [{}] with ID [{}] not found on abort", blobName, uploadId);
621+
} else {
622+
// aborting the upload on failure is a best-effort cleanup step - if it fails then we must just move on
623+
logger.atWarn()
624+
.withThrowable(e)
625+
.log("failed to clean up multipart upload of [{}] with ID [{}] after earlier failure", blobName, uploadId);
626+
}
627+
}
628+
});
612629
}
613630
if (Strings.isEmpty(uploadId)) {
614631
throw new IOException("Failed to initialize multipart operation for " + blobName);

test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.logging.LogManager;
2727
import org.elasticsearch.logging.Logger;
2828
import org.elasticsearch.rest.RestStatus;
29+
import org.elasticsearch.test.ESTestCase;
2930
import org.elasticsearch.test.fixture.HttpHeaderParser;
3031

3132
import java.io.IOException;
@@ -190,14 +191,15 @@ public void handle(final HttpExchange exchange) throws IOException {
190191

191192
} else if (request.isCompleteMultipartUploadRequest()) {
192193
final byte[] responseBody;
193-
final boolean preconditionFailed;
194+
final RestStatus responseCode;
194195
synchronized (uploads) {
195196
final var upload = getUpload(request.getQueryParamOnce("uploadId"));
196197
if (upload == null) {
197-
preconditionFailed = false;
198198
if (Randomness.get().nextBoolean()) {
199+
responseCode = RestStatus.NOT_FOUND;
199200
responseBody = null;
200201
} else {
202+
responseCode = RestStatus.OK;
201203
responseBody = """
202204
<?xml version="1.0" encoding="UTF-8"?>
203205
<Error>
@@ -209,10 +211,8 @@ public void handle(final HttpExchange exchange) throws IOException {
209211
}
210212
} else {
211213
final var blobContents = upload.complete(extractPartEtags(Streams.readFully(exchange.getRequestBody())));
212-
213-
preconditionFailed = updateBlobContents(exchange, request.path(), blobContents) == false;
214-
215-
if (preconditionFailed == false) {
214+
responseCode = updateBlobContents(exchange, request.path(), blobContents);
215+
if (responseCode == RestStatus.OK) {
216216
responseBody = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
217217
+ "<CompleteMultipartUploadResult>\n"
218218
+ "<Bucket>"
@@ -222,20 +222,20 @@ public void handle(final HttpExchange exchange) throws IOException {
222222
+ request.path()
223223
+ "</Key>\n"
224224
+ "</CompleteMultipartUploadResult>").getBytes(StandardCharsets.UTF_8);
225-
removeUpload(upload.getUploadId());
226225
} else {
227226
responseBody = null;
228227
}
228+
if (responseCode != RestStatus.PRECONDITION_FAILED) {
229+
removeUpload(upload.getUploadId());
230+
}
229231
}
230232
}
231-
if (preconditionFailed) {
232-
exchange.sendResponseHeaders(RestStatus.PRECONDITION_FAILED.getStatus(), -1);
233-
} else if (responseBody == null) {
234-
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
235-
} else {
233+
if (responseCode == RestStatus.OK) {
236234
exchange.getResponseHeaders().add("Content-Type", "application/xml");
237235
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), responseBody.length);
238236
exchange.getResponseBody().write(responseBody);
237+
} else {
238+
exchange.sendResponseHeaders(responseCode.getStatus(), -1);
239239
}
240240
} else if (request.isAbortMultipartUploadRequest()) {
241241
final var upload = removeUpload(request.getQueryParamOnce("uploadId"));
@@ -264,14 +264,12 @@ public void handle(final HttpExchange exchange) throws IOException {
264264
}
265265
} else {
266266
final Tuple<String, BytesReference> blob = parseRequestBody(exchange);
267-
final var preconditionFailed = updateBlobContents(exchange, request.path(), blob.v2()) == false;
267+
final var updateResponseCode = updateBlobContents(exchange, request.path(), blob.v2());
268268

269-
if (preconditionFailed) {
270-
exchange.sendResponseHeaders(RestStatus.PRECONDITION_FAILED.getStatus(), -1);
271-
} else {
269+
if (updateResponseCode == RestStatus.OK) {
272270
exchange.getResponseHeaders().add("ETag", blob.v1());
273-
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
274271
}
272+
exchange.sendResponseHeaders(updateResponseCode.getStatus(), -1);
275273
}
276274

277275
} else if (request.isListObjectsRequest()) {
@@ -407,15 +405,21 @@ public void handle(final HttpExchange exchange) throws IOException {
407405
/**
408406
* Update the blob contents if and only if the preconditions in the request are satisfied.
409407
*
410-
* @return whether the blob contents were updated: if {@code false} then a requested precondition was not satisfied.
408+
* @return {@link RestStatus#OK} if the blob contents were updated, or else a different status code to indicate the error: possibly
409+
* {@link RestStatus#CONFLICT} or {@link RestStatus#PRECONDITION_FAILED} if the object exists and the precondition requires it
410+
* not to.
411+
*
412+
* @see <a href="https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-writes.html#conditional-error-response">AWS docs</a>
411413
*/
412-
private boolean updateBlobContents(HttpExchange exchange, String path, BytesReference newContents) {
414+
private RestStatus updateBlobContents(HttpExchange exchange, String path, BytesReference newContents) {
413415
if (isProtectOverwrite(exchange)) {
414-
return blobs.putIfAbsent(path, newContents) == null;
415-
} else {
416-
blobs.put(path, newContents);
417-
return true;
416+
return blobs.putIfAbsent(path, newContents) == null
417+
? RestStatus.OK
418+
: ESTestCase.randomFrom(RestStatus.PRECONDITION_FAILED, RestStatus.CONFLICT);
418419
}
420+
421+
blobs.put(path, newContents);
422+
return RestStatus.OK;
419423
}
420424

421425
/**

test/fixtures/s3-fixture/src/test/java/fixture/s3/S3HttpHandlerTests.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,13 @@
3232
import java.util.ArrayList;
3333
import java.util.List;
3434
import java.util.Objects;
35-
import java.util.concurrent.Executors;
36-
import java.util.concurrent.TimeUnit;
3735
import java.util.function.Consumer;
3836

3937
import static org.hamcrest.Matchers.allOf;
4038
import static org.hamcrest.Matchers.containsString;
4139
import static org.hamcrest.Matchers.greaterThan;
4240
import static org.hamcrest.Matchers.hasSize;
41+
import static org.hamcrest.Matchers.oneOf;
4342

4443
public class S3HttpHandlerTests extends ESTestCase {
4544

@@ -412,7 +411,7 @@ public void testExtractPartEtags() {
412411

413412
}
414413

415-
public void testPreventObjectOverwrite() throws InterruptedException {
414+
public void testPreventObjectOverwrite() {
416415
final var handler = new S3HttpHandler("bucket", "path");
417416

418417
var tasks = List.of(
@@ -422,12 +421,7 @@ public void testPreventObjectOverwrite() throws InterruptedException {
422421
createMultipartUploadTask(handler)
423422
);
424423

425-
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
426-
tasks.forEach(task -> executor.submit(task.consumer));
427-
executor.shutdown();
428-
var done = executor.awaitTermination(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS);
429-
assertTrue(done);
430-
}
424+
runInParallel(tasks.size(), i -> tasks.get(i).consumer.run());
431425

432426
List<TestWriteTask> successfulTasks = tasks.stream().filter(task -> task.status == RestStatus.OK).toList();
433427
assertThat(successfulTasks, hasSize(1));
@@ -436,6 +430,7 @@ public void testPreventObjectOverwrite() throws InterruptedException {
436430
if (task.status == RestStatus.PRECONDITION_FAILED) {
437431
assertNotNull(handler.getUpload(task.uploadId));
438432
} else {
433+
assertThat(task.status, oneOf(RestStatus.OK, RestStatus.CONFLICT));
439434
assertNull(handler.getUpload(task.uploadId));
440435
}
441436
});

0 commit comments

Comments
 (0)