diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java
index b45f3ccbdc5f3..5e09c396a3120 100644
--- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java
+++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java
@@ -72,7 +72,7 @@
* it means that the "barrier to entry" for adding new methods to this class is relatively low even though it is a shared class with code
* everywhere. That being said, this class deals primarily with {@code List}s rather than Arrays. For the most part calls should adapt to
* lists, either by storing {@code List}s internally or just converting to and from a {@code List} when calling. This comment is repeated
- * on {@link StreamInput}.
+ * on {@link StreamOutput}.
*/
public abstract class StreamInput extends InputStream {
diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/TruncatedOutputStream.java b/server/src/main/java/org/elasticsearch/common/io/stream/TruncatedOutputStream.java
new file mode 100644
index 0000000000000..0e7fc704a2ef5
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/common/io/stream/TruncatedOutputStream.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.common.io.stream;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.function.IntSupplier;
+
+/**
+ * Truncates writes once the max size is exceeded.
+ * However, when writing byte arrays, the stream does not check whether there is capacity for the full
+ * array prior to writing, so there is overspill of up to b.length - 1.
+ */
+public class TruncatedOutputStream extends FilterOutputStream {
+ private final IntSupplier currentSizeSupplier;
+ private final int maxSize;
+ private boolean hasCapacity = true;
+
+ public TruncatedOutputStream(OutputStream out, IntSupplier currentSizeSupplier, int maxSize) {
+ super(out);
+ this.currentSizeSupplier = currentSizeSupplier;
+ this.maxSize = maxSize;
+ }
+
+ public boolean hasCapacity() {
+ if (hasCapacity) {
+ hasCapacity = currentSizeSupplier.getAsInt() < maxSize;
+ }
+ return hasCapacity;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (hasCapacity()) {
+ out.write(b);
+ }
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ if (hasCapacity()) {
+ out.write(b);
+ }
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (hasCapacity()) {
+ out.write(b, off, len);
+ }
+ }
+}
diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
index 7006b5adbe886..c136a4a764a5e 100644
--- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
+++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java
@@ -119,6 +119,7 @@
import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.readiness.ReadinessService;
+import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.script.ScriptService;
@@ -650,6 +651,7 @@ public void apply(Settings value, Settings current, Settings previous) {
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING,
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_DURATION_SETTING,
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_QUEUE_LATENCY_THRESHOLD_SETTING,
- WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING
+ WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING,
+ BlobStoreRepository.MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING
);
}
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index f4275801fff1e..e036a4af12818 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -71,6 +71,7 @@
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.TruncatedOutputStream;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Setting;
@@ -432,6 +433,14 @@ public static String getRepositoryDataBlobName(long repositoryGeneration) {
Setting.Property.NodeScope
);
+ // Defines the max size of the shard_delete_results stream as a percentage of available heap memory
+ public static final Setting MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING = Setting.memorySizeSetting(
+ "repositories.blobstore.max_shard_delete_results_size",
+ "25%",
+ Setting.Property.Dynamic,
+ Setting.Property.NodeScope
+ );
+
/**
* Repository settings that can be updated dynamically without having to create a new repository.
*/
@@ -1006,7 +1015,8 @@ private void createSnapshotsDeletion(
SnapshotsServiceUtils.minCompatibleVersion(minimumNodeVersion, originalRepositoryData, snapshotIds),
originalRootBlobs,
blobStore().blobContainer(indicesPath()).children(OperationPurpose.SNAPSHOT_DATA),
- originalRepositoryData
+ originalRepositoryData,
+ metadata.settings()
);
}));
}
@@ -1096,7 +1106,7 @@ class SnapshotsDeletion {
/**
* Tracks the shard-level blobs which can be deleted once all the metadata updates have completed.
*/
- private final ShardBlobsToDelete shardBlobsToDelete = new ShardBlobsToDelete();
+ private final ShardBlobsToDelete shardBlobsToDelete;
SnapshotsDeletion(
Collection snapshotIds,
@@ -1104,7 +1114,8 @@ class SnapshotsDeletion {
IndexVersion repositoryFormatIndexVersion,
Map originalRootBlobs,
Map originalIndexContainers,
- RepositoryData originalRepositoryData
+ RepositoryData originalRepositoryData,
+ Settings settings
) {
this.snapshotIds = snapshotIds;
this.originalRepositoryDataGeneration = originalRepositoryDataGeneration;
@@ -1113,6 +1124,8 @@ class SnapshotsDeletion {
this.originalRootBlobs = originalRootBlobs;
this.originalIndexContainers = originalIndexContainers;
this.originalRepositoryData = originalRepositoryData;
+
+ shardBlobsToDelete = new ShardBlobsToDelete(settings);
}
// ---------------------------------------------------------------------------------------------------------------------------------
@@ -1678,28 +1691,46 @@ void writeTo(StreamOutput out) throws IOException {
* need no further synchronization.
*
*/
- // If the size of this continues to be a problem even after compression, consider either a hard limit on its size (preferring leaked
- // blobs over an OOME on the master) or else offloading it to disk or to the repository itself.
- private final BytesStreamOutput shardDeleteResults = new ReleasableBytesStreamOutput(bigArrays);
-
- private int resultCount = 0;
-
- private final StreamOutput compressed = new OutputStreamStreamOutput(
- new BufferedOutputStream(
- new DeflaterOutputStream(Streams.flushOnCloseStream(shardDeleteResults)),
- DeflateCompressor.BUFFER_SIZE
- )
- );
+ private final BytesStreamOutput shardDeleteResults;
+ private final TruncatedOutputStream truncatedShardDeleteResultsOutputStream;
+ private final StreamOutput compressed;
+ private final int shardDeleteResultsMaxSize;
+ private int successfullyWrittenBlobsCount = 0;
+ private int leakedBlobsCount = 0;
private final ArrayList resources = new ArrayList<>();
-
private final ShardGenerations.Builder shardGenerationsBuilder = ShardGenerations.builder();
- ShardBlobsToDelete() {
+ ShardBlobsToDelete(Settings settings) {
+ this.shardDeleteResultsMaxSize = calculateMaximumShardDeleteResultsSize(settings);
+ this.shardDeleteResults = new ReleasableBytesStreamOutput(bigArrays);
+ this.truncatedShardDeleteResultsOutputStream = new TruncatedOutputStream(
+ new BufferedOutputStream(
+ new DeflaterOutputStream(Streams.flushOnCloseStream(shardDeleteResults)),
+ DeflateCompressor.BUFFER_SIZE
+ ),
+ shardDeleteResults::size,
+ this.shardDeleteResultsMaxSize
+ );
+ this.compressed = new OutputStreamStreamOutput(this.truncatedShardDeleteResultsOutputStream);
resources.add(compressed);
resources.add(LeakTracker.wrap((Releasable) shardDeleteResults));
}
+ /**
+ * Calculates the maximum size of the shardDeleteResults BytesStreamOutput.
+ * The size cannot exceed 2GB, without {@code BytesStreamOutput} throwing an IAE,
+ * but should also be no more than 25% of the total remaining heap space.
+ * A buffer of 1MB is maintained, so that even if the stream is of max size, there is room to flush
+ * @return The maximum number of bytes the shardDeleteResults BytesStreamOutput can consume in the heap
+ */
+ int calculateMaximumShardDeleteResultsSize(Settings settings) {
+ long maxHeapSizeInBytes = MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING.get(settings).getBytes();
+ int oneMBBuffer = 1024 * 1024;
+ int maxShardDeleteResultsSize = Integer.MAX_VALUE - oneMBBuffer;
+ return Math.toIntExact(Math.min(maxHeapSizeInBytes, maxShardDeleteResultsSize));
+ }
+
synchronized void addShardDeleteResult(
IndexId indexId,
int shardId,
@@ -1708,8 +1739,26 @@ synchronized void addShardDeleteResult(
) {
try {
shardGenerationsBuilder.put(indexId, shardId, newGeneration);
- new ShardSnapshotMetaDeleteResult(Objects.requireNonNull(indexId.getId()), shardId, blobsToDelete).writeTo(compressed);
- resultCount += 1;
+ // There is a minimum of 1 byte available for writing
+ if (this.truncatedShardDeleteResultsOutputStream.hasCapacity()) {
+ new ShardSnapshotMetaDeleteResult(Objects.requireNonNull(indexId.getId()), shardId, blobsToDelete).writeTo(compressed);
+ // We only want to read this shard delete result if we were able to write the entire object.
+ // Otherwise, for partial writes, an EOFException will be thrown upon reading
+ if (this.truncatedShardDeleteResultsOutputStream.hasCapacity()) {
+ successfullyWrittenBlobsCount += 1;
+ } else {
+ leakedBlobsCount += 1;
+ }
+ } else {
+ logger.debug(
+ "Unable to clean up the following dangling blobs, {}, for index {} and shard {} "
+ + "due to insufficient heap space on the master node.",
+ blobsToDelete,
+ indexId,
+ shardId
+ );
+ leakedBlobsCount += 1;
+ }
} catch (IOException e) {
assert false : e; // no IO actually happens here
throw new UncheckedIOException(e);
@@ -1736,7 +1785,17 @@ public Iterator getBlobPaths() {
throw new UncheckedIOException(e);
}
- return Iterators.flatMap(Iterators.forRange(0, resultCount, i -> {
+ if (leakedBlobsCount > 0) {
+ logger.warn(
+ "Skipped cleanup of {} dangling snapshot blobs due to memory constraints on the master node. "
+ + "These blobs will be cleaned up automatically by future snapshot deletions. "
+ + "If you routinely delete large snapshots, consider increasing the master node's heap size "
+ + "to allow for more efficient cleanup.",
+ leakedBlobsCount
+ );
+ }
+
+ return Iterators.flatMap(Iterators.forRange(0, successfullyWrittenBlobsCount, i -> {
try {
return new ShardSnapshotMetaDeleteResult(input);
} catch (IOException e) {
diff --git a/server/src/test/java/org/elasticsearch/common/io/stream/TruncatedOutputStreamTests.java b/server/src/test/java/org/elasticsearch/common/io/stream/TruncatedOutputStreamTests.java
new file mode 100644
index 0000000000000..bf6540e81cab9
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/common/io/stream/TruncatedOutputStreamTests.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.common.io.stream;
+
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TruncatedOutputStreamTests extends ESTestCase {
+
+ public void testWriteSingleBytes() throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ int maxSize = randomIntBetween(0, 100);
+ TruncatedOutputStream truncatedOutputStream = new TruncatedOutputStream(
+ byteArrayOutputStream,
+ byteArrayOutputStream::size,
+ maxSize
+ );
+
+ byte[] values = new byte[maxSize];
+
+ // Write enough bytes within the defined maxSize
+ for (int i = 0; i < maxSize; i++) {
+ byte b = randomByte();
+ truncatedOutputStream.write(b);
+ values[i] = b;
+ }
+
+ // The stream should be truncated now that it is filled
+ for (int i = 0; i < randomIntBetween(0, 20); i++) {
+ truncatedOutputStream.write(randomByte());
+ }
+
+ assertArrayEquals(values, byteArrayOutputStream.toByteArray());
+ }
+
+ public void testWriteByteArray() throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ int maxSize = randomIntBetween(100, 200);
+ TruncatedOutputStream truncatedOutputStream = new TruncatedOutputStream(
+ byteArrayOutputStream,
+ byteArrayOutputStream::size,
+ maxSize
+ );
+
+ List values = new ArrayList<>();
+ int bytesWritten = 0;
+ // Write beyond the streams capacity
+ while (bytesWritten <= maxSize * 2) {
+ byte[] bytes = randomByteArrayOfLength(randomIntBetween(0, 20));
+ truncatedOutputStream.write(bytes);
+
+ // If there was capacity before writing, then the stream wrote the entire array
+ // even if that meant overflowing
+ if (bytesWritten < maxSize) {
+ for (byte b : bytes) {
+ values.add(b);
+ }
+ }
+
+ bytesWritten += bytes.length;
+ }
+
+ byte[] valuesAsByteArray = new byte[values.size()];
+ int i = 0;
+ for (byte b : values) {
+ valuesAsByteArray[i] = b;
+ i++;
+ }
+
+ assertArrayEquals(valuesAsByteArray, byteArrayOutputStream.toByteArray());
+ }
+
+ public void testWriteByteArrayWithOffsetAndLength() throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ int maxSize = randomIntBetween(100, 200);
+ TruncatedOutputStream truncatedOutputStream = new TruncatedOutputStream(
+ byteArrayOutputStream,
+ byteArrayOutputStream::size,
+ maxSize
+ );
+
+ List values = new ArrayList<>();
+ int bytesWritten = 0;
+ // Write beyond the streams capacity
+ while (bytesWritten <= maxSize * 2) {
+ byte[] bytes = randomByteArrayOfLength(randomIntBetween(0, 20));
+ int offset = randomIntBetween(0, bytes.length);
+ int length = randomIntBetween(0, bytes.length - offset);
+ truncatedOutputStream.write(bytes, offset, length);
+
+ // If there was capacity before writing, then the stream wrote the sub array
+ // even if that meant overflowing
+ if (bytesWritten < maxSize) {
+ for (int i = offset; i < offset + length; i++) {
+ values.add(bytes[i]);
+ }
+ }
+
+ bytesWritten += length;
+ }
+
+ byte[] valuesAsByteArray = new byte[values.size()];
+ int i = 0;
+ for (byte b : values) {
+ valuesAsByteArray[i] = b;
+ i++;
+ }
+
+ assertArrayEquals(valuesAsByteArray, byteArrayOutputStream.toByteArray());
+ }
+}
diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
index 802e5a86afa35..0299bcd80f99b 100644
--- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
+++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java
@@ -68,6 +68,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.MockLog;
+import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
@@ -453,10 +454,14 @@ private static void writeIndexGen(BlobStoreRepository repository, RepositoryData
}
private BlobStoreRepository setupRepo() {
+ return setupRepo(Settings.builder());
+ }
+
+ private BlobStoreRepository setupRepo(Settings.Builder repoSettings) {
final Client client = client();
final Path location = ESIntegTestCase.randomRepoPath(node().settings());
- Settings.Builder repoSettings = Settings.builder().put(node().settings()).put("location", location);
+ repoSettings.put(node().settings()).put("location", location);
boolean compress = randomBoolean();
if (compress == false) {
repoSettings.put(BlobStoreRepository.COMPRESS_SETTING.getKey(), false);
@@ -631,57 +636,6 @@ private Environment createEnvironment() {
);
}
- public void testShardBlobsToDelete() {
- final var repo = setupRepo();
- try (var shardBlobsToDelete = repo.new ShardBlobsToDelete()) {
- final var expectedShardGenerations = ShardGenerations.builder();
- final var expectedBlobsToDelete = new HashSet();
-
- final var countDownLatch = new CountDownLatch(1);
- int blobCount = 0;
- try (var refs = new RefCountingRunnable(countDownLatch::countDown)) {
- for (int index = between(0, 1000); index > 0; index--) {
- final var indexId = new IndexId(randomIdentifier(), randomUUID());
- for (int shard = between(1, 30); shard > 0; shard--) {
- final var shardId = shard;
- final var shardGeneration = new ShardGeneration(randomUUID());
- expectedShardGenerations.put(indexId, shard, shardGeneration);
- final var blobsToDelete = randomList(
- 100,
- () -> randomFrom(METADATA_PREFIX, INDEX_FILE_PREFIX, SNAPSHOT_PREFIX) + randomUUID() + randomFrom(
- "",
- METADATA_BLOB_NAME_SUFFIX
- )
- );
- blobCount += blobsToDelete.size();
- final var indexPath = repo.basePath()
- .add("indices")
- .add(indexId.getId())
- .add(Integer.toString(shard))
- .buildAsString();
- for (final var blobToDelete : blobsToDelete) {
- expectedBlobsToDelete.add(indexPath + blobToDelete);
- }
-
- repo.threadPool()
- .generic()
- .execute(
- ActionRunnable.run(
- refs.acquireListener(),
- () -> shardBlobsToDelete.addShardDeleteResult(indexId, shardId, shardGeneration, blobsToDelete)
- )
- );
- }
- }
- }
- safeAwait(countDownLatch);
- assertEquals(expectedShardGenerations.build(), shardBlobsToDelete.getUpdatedShardGenerations());
- shardBlobsToDelete.getBlobPaths().forEachRemaining(s -> assertTrue(expectedBlobsToDelete.remove(s)));
- assertThat(expectedBlobsToDelete, empty());
- assertThat(shardBlobsToDelete.sizeInBytes(), lessThanOrEqualTo(Math.max(ByteSizeUnit.KB.toIntBytes(1), 20 * blobCount)));
- }
- }
-
public void testUuidCreationLogging() {
final var repo = setupRepo();
final var repoMetadata = repo.getMetadata();
@@ -798,4 +752,160 @@ public void testUuidCreationLogging() {
)
);
}
+
+ private MockLog mockLog;
+
+ public void setUp() throws Exception {
+ super.setUp();
+ mockLog = MockLog.capture(BlobStoreRepository.class);
+ }
+
+ private void resetMockLog() {
+ mockLog.close();
+ mockLog = MockLog.capture(BlobStoreRepository.class);
+ }
+
+ public void tearDown() throws Exception {
+ mockLog.close();
+ super.tearDown();
+ }
+
+ /**
+ * Tests writing multiple blobs to ShardBlobToDelete when it has a variable sized stream.
+ * Initially, if there is capacity, we write N blobs to ShardBlobToDelete. We expect each of them to be compressed
+ * and written to the underlying stream.
+ * Once capacity is reached, we write M subsequent blobs, but expect that they will not be written to the
+ * underlying stream.
+ * When we read from the stream, we expect only the successful writes to be returned
+ */
+ @TestLogging(reason = "test includes assertions about logging", value = "org.elasticsearch.repositories.blobstore:WARN")
+ public void testWriteToShardBlobToDelete() {
+ resetMockLog();
+ int heapMemory = randomIntBetween(0, 20000);
+ Settings.Builder settings = Settings.builder().put("repositories.blobstore.max_shard_delete_results_size", heapMemory + "b");
+ final var repo = setupRepo(settings);
+ try (var shardBlobsToDelete = repo.new ShardBlobsToDelete(settings.build())) {
+ final var expectedShardGenerations = ShardGenerations.builder();
+ final var expectedBlobsToDelete = new HashSet();
+ CountDownLatch countDownLatch;
+ int blobCount = 0;
+
+ // First, write blobs until capacity is exceeded
+
+ mockLog.addExpectation(
+ new MockLog.SeenEventExpectation(
+ "skipped cleanup warning",
+ BlobStoreRepository.class.getCanonicalName(),
+ Level.WARN,
+ "*Skipped cleanup of 1 dangling snapshot blobs due to memory constraints "
+ + "on the master node. These blobs will be cleaned up automatically by future snapshot deletions. "
+ + "If you routinely delete large snapshots, consider increasing the master node's heap size to allow "
+ + "for more efficient cleanup."
+ )
+ );
+
+ // Write while we have capacity
+ while (shardBlobsToDelete.sizeInBytes() < heapMemory) {
+ // Generate the next blob to write
+ final var indexId = new IndexId(randomIdentifier(), randomUUID());
+ final var shardId = between(1, 30);
+ final var shardGeneration = new ShardGeneration(randomUUID());
+ final var blobsToDelete = randomList(
+ 100,
+ () -> randomFrom(METADATA_PREFIX, INDEX_FILE_PREFIX, SNAPSHOT_PREFIX) + randomUUID() + randomFrom(
+ "",
+ METADATA_BLOB_NAME_SUFFIX
+ )
+ );
+
+ expectedShardGenerations.put(indexId, shardId, shardGeneration);
+ final var indexPath = repo.basePath().add("indices").add(indexId.getId()).add(Integer.toString(shardId)).buildAsString();
+
+ countDownLatch = new CountDownLatch(1);
+ try (var refs = new RefCountingRunnable(countDownLatch::countDown)) {
+ repo.threadPool()
+ .generic()
+ .execute(
+ ActionRunnable.run(
+ refs.acquireListener(),
+ () -> shardBlobsToDelete.addShardDeleteResult(indexId, shardId, shardGeneration, blobsToDelete)
+ )
+ );
+ }
+ safeAwait(countDownLatch);
+
+ // Only if the entire blob was written to memory do we expect to see it returned
+ if (shardBlobsToDelete.sizeInBytes() < heapMemory) {
+ for (final var blobToDelete : blobsToDelete) {
+ expectedBlobsToDelete.add(indexPath + blobToDelete);
+ }
+ blobCount += blobsToDelete.size();
+ }
+ }
+
+ assertEquals(expectedShardGenerations.build(), shardBlobsToDelete.getUpdatedShardGenerations());
+ shardBlobsToDelete.getBlobPaths().forEachRemaining(s -> assertTrue(expectedBlobsToDelete.remove(s)));
+ assertThat(expectedBlobsToDelete, empty());
+ assertThat(shardBlobsToDelete.sizeInBytes(), lessThanOrEqualTo(Math.max(ByteSizeUnit.KB.toIntBytes(1), 20 * blobCount)));
+
+ mockLog.assertAllExpectationsMatched();
+ resetMockLog();
+
+ // Second, now we're at capacity, we test whether we can accept subsequent writes without throwing an error
+
+ int numberOfOverflowedWrites = randomIntBetween(1, 20);
+ mockLog.addExpectation(
+ new MockLog.SeenEventExpectation(
+ "skipped cleanup warning",
+ BlobStoreRepository.class.getCanonicalName(),
+ Level.WARN,
+ "*Skipped cleanup of "
+ + (numberOfOverflowedWrites + 1)
+ + " dangling snapshot blobs due to memory constraints "
+ + "on the master node. These blobs will be cleaned up automatically by future snapshot deletions. "
+ + "If you routinely delete large snapshots, consider increasing the master node's heap size to allow "
+ + "for more efficient cleanup."
+ )
+ );
+
+ for (int i = 0; i < numberOfOverflowedWrites; i++) {
+ // Generate the next blob to write
+ final var indexId = new IndexId(randomIdentifier(), randomUUID());
+ final var shardId = between(1, 30);
+ final var shardGeneration = new ShardGeneration(randomUUID());
+ final var blobsToDelete = randomList(
+ 100,
+ () -> randomFrom(METADATA_PREFIX, INDEX_FILE_PREFIX, SNAPSHOT_PREFIX) + randomUUID() + randomFrom(
+ "",
+ METADATA_BLOB_NAME_SUFFIX
+ )
+ );
+ expectedShardGenerations.put(indexId, shardId, shardGeneration);
+
+ countDownLatch = new CountDownLatch(1);
+ try (var refs = new RefCountingRunnable(countDownLatch::countDown)) {
+ repo.threadPool()
+ .generic()
+ .execute(
+ ActionRunnable.run(
+ refs.acquireListener(),
+ () -> shardBlobsToDelete.addShardDeleteResult(indexId, shardId, shardGeneration, blobsToDelete)
+ )
+ );
+ }
+ safeAwait(countDownLatch);
+ }
+
+ // We expect no shard generations
+ assertEquals(expectedShardGenerations.build(), shardBlobsToDelete.getUpdatedShardGenerations());
+
+ // We expect the blob paths to be only those that were written before we exceeded the threshold
+ List blobPaths = new ArrayList<>();
+ shardBlobsToDelete.getBlobPaths().forEachRemaining(blobPaths::add);
+ assertEquals(blobCount, blobPaths.size());
+
+ mockLog.assertAllExpectationsMatched();
+ resetMockLog();
+ }
+ }
}