Skip to content

Commit e6b830e

Browse files
authored
Clean up dangling S3 multipart uploads (#111955)
If Elasticsearch fails part-way through a multipart upload to S3 it will generally try and abort the upload, but it's possible that the abort attempt also fails. In this case the upload becomes _dangling_. Dangling uploads consume storage space, and therefore cost money, until they are eventually aborted. Earlier versions of Elasticsearch require users to check for dangling multipart uploads, and to manually abort any that they find. This commit introduces a cleanup process which aborts all dangling uploads on each snapshot delete instead. Closes #44971 Closes #101169
1 parent aa959e6 commit e6b830e

File tree

6 files changed

+329
-28
lines changed

6 files changed

+329
-28
lines changed

docs/changelog/111955.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pr: 111955
2+
summary: Clean up dangling S3 multipart uploads
3+
area: Snapshot/Restore
4+
type: enhancement
5+
issues:
6+
- 101169
7+
- 44971

docs/reference/snapshot-restore/repository-s3.asciidoc

Lines changed: 9 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,15 @@ include::repository-shared-settings.asciidoc[]
317317
https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html[AWS
318318
DeleteObjects API].
319319

320+
`max_multipart_upload_cleanup_size`::
321+
322+
(<<number,numeric>>) Sets the maximum number of possibly-dangling multipart
323+
uploads to clean up in each batch of snapshot deletions. Defaults to `1000`
324+
which is the maximum number supported by the
325+
https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUploads.html[AWS
326+
ListMultipartUploads API]. If set to `0`, {es} will not attempt to clean up
327+
dangling multipart uploads.
328+
320329
NOTE: The option of defining client settings in the repository settings as
321330
documented below is considered deprecated, and will be removed in a future
322331
version.
@@ -492,33 +501,6 @@ by the `elasticsearch` user. By default, {es} runs as user `elasticsearch` using
492501

493502
If the symlink exists, it will be used by default by all S3 repositories that don't have explicit `client` credentials.
494503

495-
==== Cleaning up multi-part uploads
496-
497-
{es} uses S3's multi-part upload process to upload larger blobs to the
498-
repository. The multi-part upload process works by dividing each blob into
499-
smaller parts, uploading each part independently, and then completing the
500-
upload in a separate step. This reduces the amount of data that {es} must
501-
re-send if an upload fails: {es} only needs to re-send the part that failed
502-
rather than starting from the beginning of the whole blob. The storage for each
503-
part is charged independently starting from the time at which the part was
504-
uploaded.
505-
506-
If a multi-part upload cannot be completed then it must be aborted in order to
507-
delete any parts that were successfully uploaded, preventing further storage
508-
charges from accumulating. {es} will automatically abort a multi-part upload on
509-
failure, but sometimes the abort request itself fails. For example, if the
510-
repository becomes inaccessible or the instance on which {es} is running is
511-
terminated abruptly then {es} cannot complete or abort any ongoing uploads.
512-
513-
You must make sure that failed uploads are eventually aborted to avoid
514-
unnecessary storage costs. You can use the
515-
https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUploads.html[List
516-
multipart uploads API] to list the ongoing uploads and look for any which are
517-
unusually long-running, or you can
518-
https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpu-abort-incomplete-mpu-lifecycle-config.html[configure
519-
a bucket lifecycle policy] to automatically abort incomplete uploads once they
520-
reach a certain age.
521-
522504
[[repository-s3-aws-vpc]]
523505
==== AWS VPC bandwidth settings
524506

modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,20 @@
1010
import fixture.s3.S3HttpHandler;
1111

1212
import com.amazonaws.http.AmazonHttpClient;
13+
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
14+
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
15+
import com.amazonaws.services.s3.model.MultipartUpload;
1316
import com.sun.net.httpserver.Headers;
1417
import com.sun.net.httpserver.HttpExchange;
1518
import com.sun.net.httpserver.HttpHandler;
1619

20+
import org.apache.logging.log4j.Level;
21+
import org.apache.logging.log4j.core.LogEvent;
22+
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
1723
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
1824
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
1925
import org.elasticsearch.cluster.service.ClusterService;
26+
import org.elasticsearch.common.Strings;
2027
import org.elasticsearch.common.blobstore.BlobContainer;
2128
import org.elasticsearch.common.blobstore.BlobPath;
2229
import org.elasticsearch.common.blobstore.BlobStore;
@@ -54,6 +61,7 @@
5461
import org.elasticsearch.telemetry.TestTelemetryPlugin;
5562
import org.elasticsearch.test.BackgroundIndexer;
5663
import org.elasticsearch.test.ESIntegTestCase;
64+
import org.elasticsearch.test.MockLog;
5765
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
5866
import org.elasticsearch.threadpool.ThreadPool;
5967
import org.elasticsearch.xcontent.NamedXContentRegistry;
@@ -70,6 +78,7 @@
7078
import java.util.Map;
7179
import java.util.Objects;
7280
import java.util.Set;
81+
import java.util.concurrent.CountDownLatch;
7382
import java.util.concurrent.atomic.AtomicBoolean;
7483
import java.util.concurrent.atomic.AtomicLong;
7584
import java.util.stream.Collectors;
@@ -81,6 +90,7 @@
8190
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
8291
import static org.hamcrest.Matchers.allOf;
8392
import static org.hamcrest.Matchers.containsString;
93+
import static org.hamcrest.Matchers.empty;
8494
import static org.hamcrest.Matchers.equalTo;
8595
import static org.hamcrest.Matchers.greaterThan;
8696
import static org.hamcrest.Matchers.hasEntry;
@@ -451,6 +461,106 @@ private Map<S3BlobStore.StatsKey, Long> getServerMetrics() {
451461
return Collections.emptyMap();
452462
}
453463

464+
public void testMultipartUploadCleanup() {
465+
final String repoName = randomRepositoryName();
466+
createRepository(repoName, repositorySettings(repoName), true);
467+
468+
createIndex("test-idx-1");
469+
for (int i = 0; i < 100; i++) {
470+
prepareIndex("test-idx-1").setId(Integer.toString(i)).setSource("foo", "bar" + i).get();
471+
}
472+
client().admin().indices().prepareRefresh().get();
473+
474+
final String snapshotName = randomIdentifier();
475+
CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repoName, snapshotName)
476+
.setWaitForCompletion(true)
477+
.get();
478+
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
479+
assertThat(
480+
createSnapshotResponse.getSnapshotInfo().successfulShards(),
481+
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
482+
);
483+
484+
final var repository = asInstanceOf(
485+
S3Repository.class,
486+
internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName)
487+
);
488+
final var blobStore = asInstanceOf(S3BlobStore.class, asInstanceOf(BlobStoreWrapper.class, repository.blobStore()).delegate());
489+
490+
try (var clientRef = blobStore.clientReference()) {
491+
final var danglingBlobName = randomIdentifier();
492+
final var initiateMultipartUploadRequest = new InitiateMultipartUploadRequest(
493+
blobStore.bucket(),
494+
blobStore.blobContainer(repository.basePath().add("test-multipart-upload")).path().buildAsString() + danglingBlobName
495+
);
496+
initiateMultipartUploadRequest.putCustomQueryParameter(
497+
S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE,
498+
OperationPurpose.SNAPSHOT_DATA.getKey()
499+
);
500+
final var multipartUploadResult = clientRef.client().initiateMultipartUpload(initiateMultipartUploadRequest);
501+
502+
final var listMultipartUploadsRequest = new ListMultipartUploadsRequest(blobStore.bucket()).withPrefix(
503+
repository.basePath().buildAsString()
504+
);
505+
listMultipartUploadsRequest.putCustomQueryParameter(
506+
S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE,
507+
OperationPurpose.SNAPSHOT_DATA.getKey()
508+
);
509+
assertEquals(
510+
List.of(multipartUploadResult.getUploadId()),
511+
clientRef.client()
512+
.listMultipartUploads(listMultipartUploadsRequest)
513+
.getMultipartUploads()
514+
.stream()
515+
.map(MultipartUpload::getUploadId)
516+
.toList()
517+
);
518+
519+
final var seenCleanupLogLatch = new CountDownLatch(1);
520+
MockLog.assertThatLogger(() -> {
521+
assertAcked(clusterAdmin().prepareDeleteSnapshot(TEST_REQUEST_TIMEOUT, repoName, snapshotName));
522+
safeAwait(seenCleanupLogLatch);
523+
},
524+
S3BlobContainer.class,
525+
new MockLog.SeenEventExpectation(
526+
"found-dangling",
527+
S3BlobContainer.class.getCanonicalName(),
528+
Level.INFO,
529+
"found [1] possibly-dangling multipart uploads; will clean them up after finalizing the current snapshot deletions"
530+
),
531+
new MockLog.SeenEventExpectation(
532+
"cleaned-dangling",
533+
S3BlobContainer.class.getCanonicalName(),
534+
Level.INFO,
535+
Strings.format(
536+
"cleaned up dangling multipart upload [%s] of blob [%s]*test-multipart-upload/%s]",
537+
multipartUploadResult.getUploadId(),
538+
repoName,
539+
danglingBlobName
540+
)
541+
) {
542+
@Override
543+
public void match(LogEvent event) {
544+
super.match(event);
545+
if (Regex.simpleMatch(message, event.getMessage().getFormattedMessage())) {
546+
seenCleanupLogLatch.countDown();
547+
}
548+
}
549+
}
550+
);
551+
552+
assertThat(
553+
clientRef.client()
554+
.listMultipartUploads(listMultipartUploadsRequest)
555+
.getMultipartUploads()
556+
.stream()
557+
.map(MultipartUpload::getUploadId)
558+
.toList(),
559+
empty()
560+
);
561+
}
562+
}
563+
454564
/**
455565
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
456566
*/
@@ -592,6 +702,9 @@ public void maybeTrack(final String rawRequest, Headers requestHeaders) {
592702
trackRequest("ListObjects");
593703
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.LIST_OBJECTS, purpose), k -> new AtomicLong())
594704
.incrementAndGet();
705+
} else if (Regex.simpleMatch("GET /*/?uploads&*", request)) {
706+
// TODO track ListMultipartUploads requests
707+
logger.info("--> ListMultipartUploads not tracked [{}] with parsed purpose [{}]", request, purpose.getKey());
595708
} else if (Regex.simpleMatch("GET /*/*", request)) {
596709
trackRequest("GetObject");
597710
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.GET_OBJECT, purpose), k -> new AtomicLong())

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,17 @@
2828
import com.amazonaws.services.s3.model.UploadPartResult;
2929
import com.amazonaws.util.ValidationUtils;
3030

31+
import org.apache.logging.log4j.Level;
3132
import org.apache.logging.log4j.LogManager;
3233
import org.apache.logging.log4j.Logger;
3334
import org.apache.lucene.util.SetOnce;
3435
import org.elasticsearch.action.ActionListener;
3536
import org.elasticsearch.action.ActionRunnable;
3637
import org.elasticsearch.action.support.RefCountingListener;
38+
import org.elasticsearch.action.support.RefCountingRunnable;
3739
import org.elasticsearch.action.support.SubscribableListener;
40+
import org.elasticsearch.action.support.ThreadedActionListener;
41+
import org.elasticsearch.cluster.service.MasterService;
3842
import org.elasticsearch.common.Randomness;
3943
import org.elasticsearch.common.Strings;
4044
import org.elasticsearch.common.blobstore.BlobContainer;
@@ -54,6 +58,7 @@
5458
import org.elasticsearch.core.Nullable;
5559
import org.elasticsearch.core.TimeValue;
5660
import org.elasticsearch.core.Tuple;
61+
import org.elasticsearch.repositories.RepositoryException;
5762
import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream;
5863
import org.elasticsearch.repositories.s3.S3BlobStore.Operation;
5964
import org.elasticsearch.threadpool.ThreadPool;
@@ -912,4 +917,94 @@ public void getRegister(OperationPurpose purpose, String key, ActionListener<Opt
912917
}
913918
});
914919
}
920+
921+
ActionListener<Void> getMultipartUploadCleanupListener(int maxUploads, RefCountingRunnable refs) {
922+
try (var clientReference = blobStore.clientReference()) {
923+
final var bucket = blobStore.bucket();
924+
final var request = new ListMultipartUploadsRequest(bucket).withPrefix(keyPath).withMaxUploads(maxUploads);
925+
request.putCustomQueryParameter(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE, OperationPurpose.SNAPSHOT_DATA.getKey());
926+
final var multipartUploadListing = SocketAccess.doPrivileged(() -> clientReference.client().listMultipartUploads(request));
927+
final var multipartUploads = multipartUploadListing.getMultipartUploads();
928+
if (multipartUploads.isEmpty()) {
929+
logger.debug("found no multipart uploads to clean up");
930+
return ActionListener.noop();
931+
} else {
932+
// the uploads are only _possibly_ dangling because it's also possible we're no longer then master and the new master has
933+
// started some more shard snapshots
934+
if (multipartUploadListing.isTruncated()) {
935+
logger.info("""
936+
found at least [{}] possibly-dangling multipart uploads; will clean up the first [{}] after finalizing \
937+
the current snapshot deletions, and will check for further possibly-dangling multipart uploads in future \
938+
snapshot deletions""", multipartUploads.size(), multipartUploads.size());
939+
} else {
940+
logger.info("""
941+
found [{}] possibly-dangling multipart uploads; \
942+
will clean them up after finalizing the current snapshot deletions""", multipartUploads.size());
943+
}
944+
return newMultipartUploadCleanupListener(
945+
refs,
946+
multipartUploads.stream().map(u -> new AbortMultipartUploadRequest(bucket, u.getKey(), u.getUploadId())).toList()
947+
);
948+
}
949+
} catch (Exception e) {
950+
// Cleanup is a best-effort thing, we can't do anything better than log and carry on here.
951+
logger.warn("failure while checking for possibly-dangling multipart uploads", e);
952+
return ActionListener.noop();
953+
}
954+
}
955+
956+
private ActionListener<Void> newMultipartUploadCleanupListener(
957+
RefCountingRunnable refs,
958+
List<AbortMultipartUploadRequest> abortMultipartUploadRequests
959+
) {
960+
return new ThreadedActionListener<>(blobStore.getSnapshotExecutor(), ActionListener.releaseAfter(new ActionListener<>() {
961+
@Override
962+
public void onResponse(Void unused) {
963+
try (var clientReference = blobStore.clientReference()) {
964+
for (final var abortMultipartUploadRequest : abortMultipartUploadRequests) {
965+
abortMultipartUploadRequest.putCustomQueryParameter(
966+
S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE,
967+
OperationPurpose.SNAPSHOT_DATA.getKey()
968+
);
969+
try {
970+
SocketAccess.doPrivilegedVoid(() -> clientReference.client().abortMultipartUpload(abortMultipartUploadRequest));
971+
logger.info(
972+
"cleaned up dangling multipart upload [{}] of blob [{}][{}][{}]",
973+
abortMultipartUploadRequest.getUploadId(),
974+
blobStore.getRepositoryMetadata().name(),
975+
abortMultipartUploadRequest.getBucketName(),
976+
abortMultipartUploadRequest.getKey()
977+
);
978+
} catch (Exception e) {
979+
// Cleanup is a best-effort thing, we can't do anything better than log and carry on here. Note that any failure
980+
// is surprising, even a 404 means that something else aborted/completed the upload at a point where there
981+
// should be no other processes interacting with the repository.
982+
logger.warn(
983+
Strings.format(
984+
"failed to clean up multipart upload [{}] of blob [{}][{}][{}]",
985+
abortMultipartUploadRequest.getUploadId(),
986+
blobStore.getRepositoryMetadata().name(),
987+
abortMultipartUploadRequest.getBucketName(),
988+
abortMultipartUploadRequest.getKey()
989+
),
990+
e
991+
);
992+
}
993+
}
994+
}
995+
}
996+
997+
@Override
998+
public void onFailure(Exception e) {
999+
logger.log(
1000+
MasterService.isPublishFailureException(e)
1001+
|| (e instanceof RepositoryException repositoryException
1002+
&& repositoryException.getCause() instanceof Exception cause
1003+
&& MasterService.isPublishFailureException(cause)) ? Level.DEBUG : Level.WARN,
1004+
"failed to start cleanup of dangling multipart uploads",
1005+
e
1006+
);
1007+
}
1008+
}, refs.acquire()));
1009+
}
9151010
}

0 commit comments

Comments
 (0)