Skip to content

Conversation

joshua-adams-1
Copy link
Contributor

@joshua-adams-1 joshua-adams-1 commented Aug 26, 2025

Modifies BlobStoreRepository.ShardBlobsToDelete.shardDeleteResults to have a variable size depending on the remaining heap space rather than a hard-coded 2GB size which caused smaller nodes with less heap space to OOMe.

Relates to #131822
Closes #116379

Closes ES-12540

Modifies `BlobStoreRepository.ShardBlobsToDelete.shardDeleteResults` to have
a variable size depending on the remaining heap space rather than a
hard-coded 2GB size which caused smaller nodes with less heap
space to OOMe.

Relates to elastic#131822

Closes ES-12540
@joshua-adams-1 joshua-adams-1 marked this pull request as ready for review September 4, 2025 15:32
@joshua-adams-1 joshua-adams-1 requested a review from a team as a code owner September 4, 2025 15:32
@elasticsearchmachine elasticsearchmachine added the needs:triage Requires assignment of a team area label label Sep 4, 2025
@joshua-adams-1 joshua-adams-1 self-assigned this Sep 4, 2025
@joshua-adams-1 joshua-adams-1 added the :Distributed Coordination/Distributed A catch all label for anything in the Distributed Coordination area. Please avoid if you can. label Sep 4, 2025
@elasticsearchmachine elasticsearchmachine added the Team:Distributed Coordination Meta label for Distributed Coordination team label Sep 4, 2025
@TestLogging(reason = "test includes assertions about logging", value = "org.elasticsearch.repositories.blobstore:WARN")
public void testShardBlobsToDeleteWithLimitedHeapSpace() {
// Limit the heap size so we force it to truncate the stream
int totalBytesRequired = randomIntBetween(1000, 10000);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arbitrarily chosen values, but tested with both bounds, and we are always guaranteed to be writing more data than we can hold

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm could we instead check org.elasticsearch.repositories.blobstore.BlobStoreRepository.ShardBlobsToDelete#sizeInBytes and keep going until at least this has reached the limit we chose (but maybe not going much further than that)? I think in most cases this test is going to do enormously more work than needed to verify what we're trying to verify.

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Josh, looking good, just a few more comments.

Comment on lines 1773 to 1774
if (compressed == null || shardDeleteResults == null) {
// No output stream: nothing to return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise here I don't think we should change anything with respect to these values being null.

Comment on lines 1819 to 1821
if (resources.isEmpty()) {
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise here, let's always track these resources even if the limit is zero.

@TestLogging(reason = "test includes assertions about logging", value = "org.elasticsearch.repositories.blobstore:WARN")
public void testShardBlobsToDeleteWithLimitedHeapSpace() {
// Limit the heap size so we force it to truncate the stream
int totalBytesRequired = randomIntBetween(1000, 10000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm could we instead check org.elasticsearch.repositories.blobstore.BlobStoreRepository.ShardBlobsToDelete#sizeInBytes and keep going until at least this has reached the limit we chose (but maybe not going much further than that)? I think in most cases this test is going to do enormously more work than needed to verify what we're trying to verify.

Comment on lines 779 to 785
for (int index = between(0, 1000); index > 0; index--) {
final var indexId = new IndexId(randomIdentifier(), randomUUID());
for (int shard = between(1, 30); shard > 0; shard--) {
final var shardId = shard;
final var shardGeneration = new ShardGeneration(randomUUID());
expectedShardGenerations.put(indexId, shard, shardGeneration);
final var blobsToDelete = generateRandomBlobsToDelete(0, 100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Max 1000 indices × max 30 shards × max 100 blobs is a max of 3M items. That seems like a lot. Does this really give us much more coverage than a smaller test?

A much more interesting test here would be to see what happens if we stop writing just shy of the limit, such that the final flush pushes us over. Could we instead pick a lower limit, write until we get very close to the limit (according to shardBlobsToDelete.sizeInBytes()) and then verify that we didn't lose anything?


final var expectedShardGenerations = ShardGenerations.builder().put(indexId, shardId, shardGeneration).build();

Settings.Builder settings = Settings.builder().put("repositories.blobstore.max_shard_delete_results_size", "0b");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise here I think this can just be folded into testShardBlobsToDeleteWithLimitedHeapSpace.

}

return Iterators.flatMap(Iterators.forRange(0, resultCount, i -> {
List<String> blobPaths = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Materializing this entire collection of paths as an ArrayList<String> is exactly what we're trying to avoid doing in the first place!

Comment on lines 1732 to 1735
if (maxHeapSizeInBytes > maxShardDeleteResultsSize) {
return maxShardDeleteResultsSize;
}
return (int) maxHeapSizeInBytes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a rather elaborate way to write Math.min() :) I'd suggest keeping it as a long throughout even though we know it will always be less than Integer.MAX_VALUE, but you can try a Math.toIntExact if you'd prefer.

ShardBlobsToDelete() {
// Gets 25% of the heap size to be allocated to the shard_delete_results stream
public final Setting<ByteSizeValue> MAX_SHARD_DELETE_RESULTS_SIZE_SETTING = Setting.memorySizeSetting(
"repositories.blobstore.max_shard_delete_results_size",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have to register this setting in ClusterSettings.

public final Setting<ByteSizeValue> MAX_SHARD_DELETE_RESULTS_SIZE_SETTING = Setting.memorySizeSetting(
"repositories.blobstore.max_shard_delete_results_size",
"25%",
Setting.Property.NodeScope
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this a dynamic setting too?

new ShardSnapshotMetaDeleteResult(Objects.requireNonNull(indexId.getId()), shardId, blobsToDelete).writeTo(compressed);
resultCount += 1;
// Only write if we have capacity
if (shardDeleteResults.size() < this.shardDeleteResultsMaxSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this call TruncatedOutputStream#hasCapacity? No need for a comment that way, and also it's important that we use the same has-capacity computation as the underlying stream here.

new ShardSnapshotMetaDeleteResult(Objects.requireNonNull(indexId.getId()), shardId, blobsToDelete).writeTo(compressed);
// We only want to read this shard delete result if we were able to write the entire object.
// Otherwise, for partial writes, an EOFException will be thrown upon reading
if (shardDeleteResults.size() < this.shardDeleteResultsMaxSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise here, this should call TruncatedOutputStream#hasCapacity.

Comment on lines 1755 to 1760
logger.warn(
"Failure to clean up the following dangling blobs, {}, for index {} and shard {}",
blobsToDelete,
indexId,
shardId
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't reasonably log every skipped blob at WARN like this - we've already captured several (compressed) GiB of blob names before getting to this point, so it wouldn't be surprising if there were several GiB more. We wouldn't expect users to go through these logs and delete the blobs manually - indeed we would strongly discourage that kind of behaviour.

Instead, let's log this at DEBUG and keep count of the number of blobs we skipped. Then at the end we can log at WARN how many blobs we've leaked.

Also nit it's not really a "failure", we're deliberately skipping this work because of resource constraints. We should mention in the user-facing WARN message that these dangling blobs will be cleaned up by subsequent deletes, and perhaps suggest that the master node needs a larger heap size to perform such large snapshot deletes in future.

Comment on lines 1751 to 1753
if (shardDeleteResults.size() < this.shardDeleteResultsMaxSize) {
resultCount += 1;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if also needs an else to keep track of the blobs that leaked because we ran out of capacity during the write.

// We only want to read this shard delete result if we were able to write the entire object.
// Otherwise, for partial writes, an EOFException will be thrown upon reading
if (this.truncatedShardDeleteResultsOutputStream.hasCapacity()) {
successfullyWrittenBlobsCount += 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This replaces resultCount but it's the count of the number of successfully recorded shards not blobs.

if (this.truncatedShardDeleteResultsOutputStream.hasCapacity()) {
successfullyWrittenBlobsCount += 1;
} else {
leakedBlobsCount += 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise this is recording the number of shards with leaked blobs rather than the number of leaked blobs. However, rather than just renaming the variable I think we should actually count the number of leaked blobs (i.e. += blobsToDelete.size() here).

Comment on lines +1753 to +1754
logger.debug(
"Unable to clean up the following dangling blobs, {}, for index {} and shard {} "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also applies to the other branch that increases leakedBlobsCount.

public static final Setting<ByteSizeValue> MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING = Setting.memorySizeSetting(
"repositories.blobstore.max_shard_delete_results_size",
"25%",
Setting.Property.Dynamic,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yet more trappiness: you now need to register a listener for updates to this setting (e.g. call clusterSettings.initializeAndWatch(...)). You can get a clusterSettings from clusterService.getClusterSettings(). I think I'd be inclined to do that in the BlobStoreRepository constructor rather than doing it each time we create a SnapshotsDeletion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Coordination/Distributed A catch all label for anything in the Distributed Coordination area. Please avoid if you can. >non-issue Team:Distributed Coordination Meta label for Distributed Coordination team v9.3.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Snapshot delete tasks do not complete if blobs-to-delete list exceeds 2GiB
3 participants