Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9870591
Limit size of shardDeleteResults
joshua-adams-1 Aug 22, 2025
97e9969
[CI] Auto commit changes from spotless
Aug 26, 2025
24b7a62
Minor tweaks
joshua-adams-1 Aug 26, 2025
d888113
Merge branch 'limit-shard-blobs-to-delete' of github.com:joshua-adams…
joshua-adams-1 Aug 26, 2025
ee89eb2
Ran ./gradlew spotlessApply precommit
joshua-adams-1 Aug 26, 2025
92991b9
TBR - Add TODO
joshua-adams-1 Aug 27, 2025
3190772
Uses a setting to control the max `shardDeleteResults` size
joshua-adams-1 Sep 1, 2025
a16856c
Remove TODOs
joshua-adams-1 Sep 1, 2025
381d294
Fix failing unit tests
joshua-adams-1 Sep 1, 2025
203d513
Merge branch 'main' into limit-shard-blobs-to-delete
joshua-adams-1 Sep 1, 2025
daf09b6
Moved the limit logic out of the streams submodule and into
joshua-adams-1 Sep 3, 2025
dc70d5b
Merge branch 'main' of github.com:elastic/elasticsearch into limit-sh…
joshua-adams-1 Sep 3, 2025
0355c2a
Add tests
joshua-adams-1 Sep 4, 2025
f072128
Run ./gradlew spotlessApply precommit
joshua-adams-1 Sep 4, 2025
abb2d4c
Merge branch 'limit-shard-blobs-to-delete' of github.com:joshua-adams…
joshua-adams-1 Sep 4, 2025
a0d728f
Merge branch 'main' into limit-shard-blobs-to-delete
joshua-adams-1 Sep 4, 2025
654ebf2
Creates BoundedOutputStream
joshua-adams-1 Sep 11, 2025
5ef0111
Merge branch 'limit-shard-blobs-to-delete' of github.com:joshua-adams…
joshua-adams-1 Sep 11, 2025
bd9217b
Merge branch 'main' into limit-shard-blobs-to-delete
joshua-adams-1 Sep 11, 2025
ba81bcf
[CI] Auto commit changes from spotless
Sep 11, 2025
acd2182
Revert StreamOutput and delete tests
joshua-adams-1 Sep 11, 2025
be05a1f
Adds BoundedOutputStreamTests
joshua-adams-1 Sep 11, 2025
8d66c1e
Creates TruncatedOutputStream
joshua-adams-1 Sep 12, 2025
ce64bf5
Merge branch 'main' into limit-shard-blobs-to-delete
joshua-adams-1 Sep 12, 2025
0fa5099
Spotless commit
joshua-adams-1 Sep 12, 2025
3575240
Merge branch 'limit-shard-blobs-to-delete' of github.com:joshua-adams…
joshua-adams-1 Sep 12, 2025
d55893d
Add skippedResultsCount
joshua-adams-1 Sep 12, 2025
3725a3c
Rewrite the unit tests
joshua-adams-1 Sep 18, 2025
ed00f1a
Spotless apply
joshua-adams-1 Sep 18, 2025
ce6195d
Merge branch 'main' into limit-shard-blobs-to-delete
joshua-adams-1 Sep 18, 2025
37404a5
[CI] Update transport version definitions
Oct 2, 2025
fc41d60
Merge branch 'main' into limit-shard-blobs-to-delete
joshua-adams-1 Oct 2, 2025
d1e81f7
David comments
joshua-adams-1 Oct 7, 2025
2f0ea30
Fix test
joshua-adams-1 Oct 7, 2025
6babba9
Merge branch 'limit-shard-blobs-to-delete' of https://github.com/josh…
joshua-adams-1 Oct 7, 2025
1ff464d
Merge branch 'main' into limit-shard-blobs-to-delete
joshua-adams-1 Oct 7, 2025
0d01264
spotless
joshua-adams-1 Oct 7, 2025
d73ffef
Modify comment
joshua-adams-1 Oct 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
* it means that the "barrier to entry" for adding new methods to this class is relatively low even though it is a shared class with code
* everywhere. That being said, this class deals primarily with {@code List}s rather than Arrays. For the most part calls should adapt to
* lists, either by storing {@code List}s internally or just converting to and from a {@code List} when calling. This comment is repeated
* on {@link StreamInput}.
* on {@link StreamOutput}.
*/
public abstract class StreamInput extends InputStream {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.common.io.stream;

import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.function.IntSupplier;

/**
* Truncates writes once the max size is exceeded.
* However, when writing byte arrays, the stream does not check whether there is capacity for the full
* array prior to writing, so there is overspill of up to b.length - 1.
*/
public class TruncatedOutputStream extends FilterOutputStream {
private final IntSupplier currentSizeSupplier;
private final int maxSize;
private boolean hasCapacity = true;

public TruncatedOutputStream(OutputStream out, IntSupplier currentSizeSupplier, int maxSize) {
super(out);
this.currentSizeSupplier = currentSizeSupplier;
this.maxSize = maxSize;
}

public boolean hasCapacity() {
if (hasCapacity) {
hasCapacity = currentSizeSupplier.getAsInt() < maxSize;
}
return hasCapacity;
}

@Override
public void write(int b) throws IOException {
if (hasCapacity()) {
out.write(b);
}
}

@Override
public void write(byte[] b) throws IOException {
if (hasCapacity()) {
out.write(b);
}
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
if (hasCapacity()) {
out.write(b, off, len);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.readiness.ReadinessService;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.script.ScriptService;
Expand Down Expand Up @@ -650,6 +651,7 @@ public void apply(Settings value, Settings current, Settings previous) {
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING,
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_DURATION_SETTING,
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_QUEUE_LATENCY_THRESHOLD_SETTING,
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING,
BlobStoreRepository.MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.TruncatedOutputStream;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Setting;
Expand Down Expand Up @@ -432,6 +433,14 @@ public static String getRepositoryDataBlobName(long repositoryGeneration) {
Setting.Property.NodeScope
);

// Defines the max size of the shard_delete_results stream as a percentage of available heap memory
public static final Setting<ByteSizeValue> MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING = Setting.memorySizeSetting(
"repositories.blobstore.max_shard_delete_results_size",
"25%",
Setting.Property.Dynamic,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yet more trappiness: you now need to register a listener for updates to this setting (e.g. call clusterSettings.initializeAndWatch(...)). You can get a clusterSettings from clusterService.getClusterSettings(). I think I'd be inclined to do that in the BlobStoreRepository constructor rather than doing it each time we create a SnapshotsDeletion.

Setting.Property.NodeScope
);

/**
* Repository settings that can be updated dynamically without having to create a new repository.
*/
Expand Down Expand Up @@ -1006,7 +1015,8 @@ private void createSnapshotsDeletion(
SnapshotsServiceUtils.minCompatibleVersion(minimumNodeVersion, originalRepositoryData, snapshotIds),
originalRootBlobs,
blobStore().blobContainer(indicesPath()).children(OperationPurpose.SNAPSHOT_DATA),
originalRepositoryData
originalRepositoryData,
metadata.settings()
);
}));
}
Expand Down Expand Up @@ -1096,15 +1106,16 @@ class SnapshotsDeletion {
/**
* Tracks the shard-level blobs which can be deleted once all the metadata updates have completed.
*/
private final ShardBlobsToDelete shardBlobsToDelete = new ShardBlobsToDelete();
private final ShardBlobsToDelete shardBlobsToDelete;

SnapshotsDeletion(
Collection<SnapshotId> snapshotIds,
long originalRepositoryDataGeneration,
IndexVersion repositoryFormatIndexVersion,
Map<String, BlobMetadata> originalRootBlobs,
Map<String, BlobContainer> originalIndexContainers,
RepositoryData originalRepositoryData
RepositoryData originalRepositoryData,
Settings settings
) {
this.snapshotIds = snapshotIds;
this.originalRepositoryDataGeneration = originalRepositoryDataGeneration;
Expand All @@ -1113,6 +1124,8 @@ class SnapshotsDeletion {
this.originalRootBlobs = originalRootBlobs;
this.originalIndexContainers = originalIndexContainers;
this.originalRepositoryData = originalRepositoryData;

shardBlobsToDelete = new ShardBlobsToDelete(settings);
}

// ---------------------------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -1678,28 +1691,46 @@ void writeTo(StreamOutput out) throws IOException {
* need no further synchronization.
* </p>
*/
// If the size of this continues to be a problem even after compression, consider either a hard limit on its size (preferring leaked
// blobs over an OOME on the master) or else offloading it to disk or to the repository itself.
private final BytesStreamOutput shardDeleteResults = new ReleasableBytesStreamOutput(bigArrays);

private int resultCount = 0;

private final StreamOutput compressed = new OutputStreamStreamOutput(
new BufferedOutputStream(
new DeflaterOutputStream(Streams.flushOnCloseStream(shardDeleteResults)),
DeflateCompressor.BUFFER_SIZE
)
);
private final BytesStreamOutput shardDeleteResults;
private final TruncatedOutputStream truncatedShardDeleteResultsOutputStream;
private final StreamOutput compressed;

private final int shardDeleteResultsMaxSize;
private int successfullyWrittenBlobsCount = 0;
private int leakedBlobsCount = 0;
private final ArrayList<Closeable> resources = new ArrayList<>();

private final ShardGenerations.Builder shardGenerationsBuilder = ShardGenerations.builder();

ShardBlobsToDelete() {
ShardBlobsToDelete(Settings settings) {
this.shardDeleteResultsMaxSize = calculateMaximumShardDeleteResultsSize(settings);
this.shardDeleteResults = new ReleasableBytesStreamOutput(bigArrays);
this.truncatedShardDeleteResultsOutputStream = new TruncatedOutputStream(
new BufferedOutputStream(
new DeflaterOutputStream(Streams.flushOnCloseStream(shardDeleteResults)),
DeflateCompressor.BUFFER_SIZE
),
shardDeleteResults::size,
this.shardDeleteResultsMaxSize
);
this.compressed = new OutputStreamStreamOutput(this.truncatedShardDeleteResultsOutputStream);
resources.add(compressed);
resources.add(LeakTracker.wrap((Releasable) shardDeleteResults));
}

/**
* Calculates the maximum size of the shardDeleteResults BytesStreamOutput.
* The size cannot exceed 2GB, without {@code BytesStreamOutput} throwing an IAE,
* but should also be no more than 25% of the total remaining heap space.
* A buffer of 1MB is maintained, so that even if the stream is of max size, there is room to flush
* @return The maximum number of bytes the shardDeleteResults BytesStreamOutput can consume in the heap
*/
int calculateMaximumShardDeleteResultsSize(Settings settings) {
long maxHeapSizeInBytes = MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING.get(settings).getBytes();
int oneMBBuffer = 1024 * 1024;
int maxShardDeleteResultsSize = Integer.MAX_VALUE - oneMBBuffer;
return Math.toIntExact(Math.min(maxHeapSizeInBytes, maxShardDeleteResultsSize));
}

synchronized void addShardDeleteResult(
IndexId indexId,
int shardId,
Expand All @@ -1708,8 +1739,26 @@ synchronized void addShardDeleteResult(
) {
try {
shardGenerationsBuilder.put(indexId, shardId, newGeneration);
new ShardSnapshotMetaDeleteResult(Objects.requireNonNull(indexId.getId()), shardId, blobsToDelete).writeTo(compressed);
resultCount += 1;
// There is a minimum of 1 byte available for writing
if (this.truncatedShardDeleteResultsOutputStream.hasCapacity()) {
new ShardSnapshotMetaDeleteResult(Objects.requireNonNull(indexId.getId()), shardId, blobsToDelete).writeTo(compressed);
// We only want to read this shard delete result if we were able to write the entire object.
// Otherwise, for partial writes, an EOFException will be thrown upon reading
if (this.truncatedShardDeleteResultsOutputStream.hasCapacity()) {
successfullyWrittenBlobsCount += 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This replaces resultCount but it's the count of the number of successfully recorded shards not blobs.

} else {
leakedBlobsCount += 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise this is recording the number of shards with leaked blobs rather than the number of leaked blobs. However, rather than just renaming the variable I think we should actually count the number of leaked blobs (i.e. += blobsToDelete.size() here).

}
} else {
logger.debug(
"Unable to clean up the following dangling blobs, {}, for index {} and shard {} "
Comment on lines +1753 to +1754
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also applies to the other branch that increases leakedBlobsCount.

+ "due to insufficient heap space on the master node.",
blobsToDelete,
indexId,
shardId
);
leakedBlobsCount += 1;
}
} catch (IOException e) {
assert false : e; // no IO actually happens here
throw new UncheckedIOException(e);
Expand All @@ -1736,7 +1785,17 @@ public Iterator<String> getBlobPaths() {
throw new UncheckedIOException(e);
}

return Iterators.flatMap(Iterators.forRange(0, resultCount, i -> {
if (leakedBlobsCount > 0) {
logger.warn(
"Skipped cleanup of {} dangling snapshot blobs due to memory constraints on the master node. "
+ "These blobs will be cleaned up automatically by future snapshot deletions. "
+ "If you routinely delete large snapshots, consider increasing the master node's heap size "
+ "to allow for more efficient cleanup.",
leakedBlobsCount
);
}

return Iterators.flatMap(Iterators.forRange(0, successfullyWrittenBlobsCount, i -> {
try {
return new ShardSnapshotMetaDeleteResult(input);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.common.io.stream;

import org.elasticsearch.test.ESTestCase;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class TruncatedOutputStreamTests extends ESTestCase {

public void testWriteSingleBytes() throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
int maxSize = randomIntBetween(0, 100);
TruncatedOutputStream truncatedOutputStream = new TruncatedOutputStream(
byteArrayOutputStream,
byteArrayOutputStream::size,
maxSize
);

byte[] values = new byte[maxSize];

// Write enough bytes within the defined maxSize
for (int i = 0; i < maxSize; i++) {
byte b = randomByte();
truncatedOutputStream.write(b);
values[i] = b;
}

// The stream should be truncated now that it is filled
for (int i = 0; i < randomIntBetween(0, 20); i++) {
truncatedOutputStream.write(randomByte());
}

assertArrayEquals(values, byteArrayOutputStream.toByteArray());
}

public void testWriteByteArray() throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
int maxSize = randomIntBetween(100, 200);
TruncatedOutputStream truncatedOutputStream = new TruncatedOutputStream(
byteArrayOutputStream,
byteArrayOutputStream::size,
maxSize
);

List<Byte> values = new ArrayList<>();
int bytesWritten = 0;
// Write beyond the streams capacity
while (bytesWritten <= maxSize * 2) {
byte[] bytes = randomByteArrayOfLength(randomIntBetween(0, 20));
truncatedOutputStream.write(bytes);

// If there was capacity before writing, then the stream wrote the entire array
// even if that meant overflowing
if (bytesWritten < maxSize) {
for (byte b : bytes) {
values.add(b);
}
}

bytesWritten += bytes.length;
}

byte[] valuesAsByteArray = new byte[values.size()];
int i = 0;
for (byte b : values) {
valuesAsByteArray[i] = b;
i++;
}

assertArrayEquals(valuesAsByteArray, byteArrayOutputStream.toByteArray());
}

public void testWriteByteArrayWithOffsetAndLength() throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
int maxSize = randomIntBetween(100, 200);
TruncatedOutputStream truncatedOutputStream = new TruncatedOutputStream(
byteArrayOutputStream,
byteArrayOutputStream::size,
maxSize
);

List<Byte> values = new ArrayList<>();
int bytesWritten = 0;
// Write beyond the streams capacity
while (bytesWritten <= maxSize * 2) {
byte[] bytes = randomByteArrayOfLength(randomIntBetween(0, 20));
int offset = randomIntBetween(0, bytes.length);
int length = randomIntBetween(0, bytes.length - offset);
truncatedOutputStream.write(bytes, offset, length);

// If there was capacity before writing, then the stream wrote the sub array
// even if that meant overflowing
if (bytesWritten < maxSize) {
for (int i = offset; i < offset + length; i++) {
values.add(bytes[i]);
}
}

bytesWritten += length;
}

byte[] valuesAsByteArray = new byte[values.size()];
int i = 0;
for (byte b : values) {
valuesAsByteArray[i] = b;
i++;
}

assertArrayEquals(valuesAsByteArray, byteArrayOutputStream.toByteArray());
}
}
Loading