Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/136952.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 136952
summary: Restricts snapshot concurrency based on available heap memory
area: Snapshot/Restore
type: bug
issues:
- 131822
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.common.bytes;

public class ByteSizeConstants {
public static int ONE_KILOBYTE_IN_BYTES = 1024;
public static int ONE_MEGABYTE_IN_BYTES = 1024 * ONE_KILOBYTE_IN_BYTES;
public static int ONE_GIGABYTE_IN_BYTES = 1024 * ONE_MEGABYTE_IN_BYTES;
}
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ public void apply(Settings value, Settings current, Settings previous) {
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING,
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_MINIMUM_LOGGING_INTERVAL,
SamplingService.TTL_POLL_INTERVAL_SETTING,
BlobStoreRepository.MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING
BlobStoreRepository.MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING,
BlobStoreRepository.HEAP_SIZE_SETTING
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,14 @@ public static String getRepositoryDataBlobName(long repositoryGeneration) {
);
private volatile int maxHeapSizeForSnapshotDeletion;

public static final Setting<ByteSizeValue> HEAP_SIZE_SETTING = Setting.memorySizeSetting(
Copy link
Contributor Author

@joshua-adams-1 joshua-adams-1 Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is an easier way to get the total heap memory then please let me know. I was following a similar approach to above with the MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING setting.

My concern is that to modify HEAP_SIZE_SETTING inside the tests I had to use the Setting.Property.Dynamic property, but I don't want other users/code changing this percentage to something stupidly small and then the snapshotting takes too long and times out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can get the heap memory with Runtime.getRuntime().maxMemory() or JvmInfo.jvmInfo().getMem().getHeapMax(). Is that what you need?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did consider this, but I thought it too static, and it also conflicted with the approach I took in #133558

My thought process is that by utilising the existing memorySizeSetting here to return 10% of the available heap memory for use when loading IndexMetadata objects, the actual % can be dynamically updated without requiring a code change which is nice, and the ByteSizeValue handles all the maths in the background. I still have a concern about this setting being abused by someone setting the value really low (say 1%) which will force us to use only a single snapshot thread, but this concern isn't specific to the MAX_HEAP_SIZE_FOR_INDEX_METADATA_SETTING setting, but a wider concern of any dynamically updatable setting.

My concern in:

If there is an easier way to get the total heap memory then please let me know

was because I was trying to get 100% of the heap via a setting which seemed long-winded, but after this comment by David it seems the best approach. Do you agree?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok. I mean there's two made-up fiddle factors in play here, the "max 10% of heap" is a guess, as is the "max 50MiB of heap per IMD instance", but then the concurrency number is just one divided by the other. I'm not sure that's the most user-friendly interface really but my only other idea is to control the actual concurrency with a setting whose default is a function of JvmInfo.jvmInfo().getMem().getHeapMax(). One super-opaque setting or two slightly-less-opaque settings...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prior to loading the IndexMetadata object from heap memory, can we check 1) how big the object we will be loading is, and 2) whether we have enough space to load it? Then, if we don't have space, we block until we do. It's not ideal, but better than OOMing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yet another possibility would be to introduce a variant of org.elasticsearch.cluster.metadata.IndexMetadata.Builder#fromXContent() which skips over any mappings. It's the mappings that take up most of the heap space in practice, but we simply don't need them here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

XContentPaser can be configured to filter out keys. For snapshots, we control the XContentParser creation in ChecksumBlobStoreFormat where we can configure it to skip the mappings key. I tested it briefly and it seems to be possible with the following change.

--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java
 
@@ -150,7 +151,8 @@ public final class ChecksumBlobStoreFormat<T> {
                 try (
                     XContentParser parser = XContentHelper.createParserNotCompressed(
                         XContentParserConfiguration.EMPTY.withRegistry(namedXContentRegistry)
-                            .withDeprecationHandler(LoggingDeprecationHandler.INSTANCE),
+                            .withDeprecationHandler(LoggingDeprecationHandler.INSTANCE)
+                            .withFiltering(null, Set.of("*.mappings"), false),
                         bytesReference,
                         XContentType.SMILE
                     )
@@ -161,7 +163,8 @@ public final class ChecksumBlobStoreFormat<T> {
                     try (
                         XContentParser parser = XContentHelper.createParserNotCompressed(
                             XContentParserConfiguration.EMPTY.withRegistry(namedXContentRegistry)
-                                .withDeprecationHandler(LoggingDeprecationHandler.INSTANCE),
+                                .withDeprecationHandler(LoggingDeprecationHandler.INSTANCE)
+                                .withFiltering(null, Set.of("*.mappings"), false),
                             bytesReference,
                             XContentType.SMILE
                         )

If something like this is feasible, I'd think we don't need the memory limit which feels mostly a guess.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, TIL, I didn't know there was a filter-on-parse option.

Can we just use the includeStrings parameter? Really we only need settings.index.number_of_shards tho I could imagine we might have to pull a few more things in to satisfy various invariants on IndexMetadata.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On reflection, maybe we don't even need to load this as a full IndexMetadata. We could define another MetadataStateFormat<...> which defines a different fromXContent that only reads settings.index.number_of_shards and skips everything else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have pushed a WIP PR here if you guys wouldn't mind taking a look. If you agree with the approach, I would close this PR in favour of the above.

"repositories.blobstore.heap_size",
"100%",
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
private volatile int maxIndexDeletionConcurrency;

/**
* Repository settings that can be updated dynamically without having to create a new repository.
*/
Expand Down Expand Up @@ -573,6 +581,18 @@ protected BlobStoreRepository(
Math.min(maxHeapSizeForSnapshotDeletion.getBytes(), Integer.MAX_VALUE - ByteSizeUnit.MB.toBytes(1))
);
});
clusterSettings.initializeAndWatch(HEAP_SIZE_SETTING, heapSize -> {
// If the heap size is a fractional GB size, then the fractional part is discarded
long heapSizeInGb = heapSize.getGb();
this.maxIndexDeletionConcurrency = Math.min(
// Prevent smaller nodes from loading too many IndexMetadata objects in parallel
// and going OOMe (ES-12538)
(int) Math.pow(2, heapSizeInGb),
Copy link
Contributor Author

@joshua-adams-1 joshua-adams-1 Oct 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a heuristic since I didn't know what we wanted to throttle the snapshot threads to be.

For context to any reviewer, we currently have up to 10 snapshot threads on any size node, and in this instance a node with 1GiB heap OOMed because of this. I used a simple Math.pow(2, heapSizeInGb) heuristic, but this does mean that a node must have >4GB heap to use all 10 snapshot threads. I suspect this is far too conservative of an estimate so suggestions are appreciated. Happy to use a simple step function if people think that would suffice, ie if heap is less than 2GB, use 4 threads, else use 10

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we currently have up to 10 snapshot threads on any size node

We do have fewer threads for smaller nodes, see here. The threshold is current 750MB. Are we considering this to be too low and want to raise it to something like 1GB or more? Or is this change specific about snapshot deletion? For the later, I'd probably just go with a simple heuristic like use half of the snapshot threads (minimal 1) if heap is smaller than 2GB?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is specific to index-metadata-loading during snapshot deletion, which is something we do today concurrently on all the available snapshot threads and which we've seen to need unusually large amounts of heap. Some IndexMetadata instances can be 50MiB or more when deserialized. Most of the work that the snapshot threads do has much lower heap footprint than this.

I don't think it makes sense to have the concurrency limit be an exponentially-increasing function of the heap size tho, since the resource usage is linear function of the concurrency limit. I'd suggest we take the observed 50MiB as a reasonable estimate of a big IndexMetadata blob and make sure we don't use more than, say, 10% of heap for this. So if the node has 1GiB of heap then allow 2 threads, 2GiB means 4 threads, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks both, based on the suggestion above I have extended BlobStoreRepository with a setting to read 10% of available heap memory, in the same way I did for MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING

// Each per-index process needs at least one snapshot thread at all times, so threadPool.info(SNAPSHOT).getMax()
// of them at once is enough to keep the thread pool fully utilized.
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax()
);
});
}

@Override
Expand Down Expand Up @@ -892,6 +912,13 @@ protected final boolean isCompress() {
return compress;
}

/**
* @return the maximum concurrency permitted in the SnapshotsDeletion.IndexSnapshotsDeletion object
*/
protected int getMaxIndexDeletionConcurrency() {
return this.maxIndexDeletionConcurrency;
}

/**
* Returns data file chunk size.
* <p>
Expand Down Expand Up @@ -1268,18 +1295,16 @@ void runCleanup(ActionListener<DeleteResult> listener) {
private void writeUpdatedShardMetadataAndComputeDeletes(ActionListener<Void> listener) {
// noinspection resource -- closed safely at the end of the iteration
final var listeners = new RefCountingListener(listener);

// Each per-index process takes some nonzero amount of working memory to hold the relevant snapshot IDs and metadata generations
// etc. which we can keep under tighter limits and release sooner if we limit the number of concurrently processing indices.
// Each one needs at least one snapshot thread at all times, so threadPool.info(SNAPSHOT).getMax() of them at once is enough to
// keep the threadpool fully utilized.
ThrottledIterator.run(
originalRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotIds),
(ref, indexId) -> ActionListener.run(
ActionListener.releaseAfter(listeners.acquire(), ref),
l -> new IndexSnapshotsDeletion(indexId).run(l)
),
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
// Each per-index process takes some nonzero amount of working memory to hold the relevant snapshot IDs
// and metadata generations etc. which we can keep under tighter limits and release sooner if we
// limit the number of concurrently processing indices.
maxIndexDeletionConcurrency,
listeners::close
);
}
Expand Down
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am extending this test to ensure that, depending on the available heap memory, we throttle the number of concurrent snapshot threads accordingly

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.blobstore.OperationPurpose;
import org.elasticsearch.common.blobstore.support.FilterBlobContainer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.CheckedConsumer;
Expand All @@ -41,6 +42,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;

import static org.elasticsearch.common.bytes.ByteSizeConstants.ONE_GIGABYTE_IN_BYTES;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

Expand All @@ -53,11 +55,17 @@ public class BlobStoreRepositoryDeleteThrottlingTests extends ESSingleNodeTestCa

private static final String TEST_REPO_TYPE = "concurrency-limiting-fs";
private static final String TEST_REPO_NAME = "test-repo";
private static final int MAX_SNAPSHOT_THREADS = 3;
private static int maxSnapshotThreads;

@Override
protected Settings nodeSettings() {
return Settings.builder().put(super.nodeSettings()).put("thread_pool.snapshot.max", MAX_SNAPSHOT_THREADS).build();
long heapSizeInBytes = randomLongBetween(0, 5L * ONE_GIGABYTE_IN_BYTES);
maxSnapshotThreads = Math.min((int) Math.pow(2, ByteSizeValue.ofBytes(heapSizeInBytes).getGb()), 10);
return Settings.builder()
.put(super.nodeSettings())
.put("thread_pool.snapshot.max", maxSnapshotThreads)
.put("repositories.blobstore.heap_size", heapSizeInBytes + "b")
.build();
}

protected Collection<Class<? extends Plugin>> getPlugins() {
Expand Down Expand Up @@ -98,7 +106,7 @@ protected BlobStore createBlobStore() throws Exception {
private static class ConcurrencyLimitingBlobStore implements BlobStore {
private final BlobStore delegate;
private final Set<String> activeIndices = ConcurrentCollections.newConcurrentSet();
private final CountDownLatch countDownLatch = new CountDownLatch(MAX_SNAPSHOT_THREADS);
private final CountDownLatch countDownLatch = new CountDownLatch(maxSnapshotThreads);

private ConcurrencyLimitingBlobStore(BlobStore delegate) {
this.delegate = delegate;
Expand Down Expand Up @@ -136,7 +144,7 @@ public InputStream readBlob(OperationPurpose purpose, String blobName) throws IO
if (pathParts.size() == 2 && pathParts.get(0).equals("indices") && blobName.startsWith(BlobStoreRepository.METADATA_PREFIX)) {
// reading index metadata, so mark index as active
assertTrue(activeIndices.add(pathParts.get(1)));
assertThat(activeIndices.size(), lessThanOrEqualTo(MAX_SNAPSHOT_THREADS));
assertThat(activeIndices.size(), lessThanOrEqualTo(maxSnapshotThreads));
countDownLatch.countDown();
safeAwait(countDownLatch); // ensure that we do use all the threads
}
Expand Down Expand Up @@ -168,7 +176,7 @@ public void testDeleteThrottling() {

// Create enough indices that we cannot process them all at once

for (int i = 0; i < 3 * MAX_SNAPSHOT_THREADS; i++) {
for (int i = 0; i < 3 * maxSnapshotThreads; i++) {
createIndex("index-" + i, indexSettings(between(1, 3), 0).build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.elasticsearch.common.bytes.ByteSizeConstants.ONE_GIGABYTE_IN_BYTES;
import static org.elasticsearch.repositories.RepositoryDataTests.generateRandomRepoData;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.INDEX_FILE_PREFIX;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING;
Expand All @@ -114,6 +115,15 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
static final String REPO_TYPE = "fs";
private static final String TEST_REPO_NAME = "test-repo";

@Override
protected Settings nodeSettings() {
return Settings.builder()
.put(super.nodeSettings())
// Mimic production
.put("thread_pool.snapshot.max", 10)
.build();
}

public void testRetrieveSnapshots() {
final Client client = client();
final Path location = ESIntegTestCase.randomRepoPath(node().settings());
Expand Down Expand Up @@ -859,4 +869,41 @@ public void testWriteToShardBlobToDelete() {
.setPersistentSettings(Settings.builder().putNull(MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING.getKey()).build())
.get();
}

/**
* Tests whether we adjust the maximum concurrency when deleting snapshots
* according to the size of the heap memory
*/
public void testMaxIndexDeletionConcurrency() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is different to the previous. This is just a basic unit test to ensure that the function of heap memory -> snapshot threads is as expected, but doesn't test whether all threads are properly utilised

long heapSizeInBytes = randomLongBetween(0, 5L * ONE_GIGABYTE_IN_BYTES);
client().admin()
.cluster()
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
.setPersistentSettings(Settings.builder().put("repositories.blobstore.heap_size", heapSizeInBytes + "b").build())
.get();

final var repo = setupRepo();

int expectedMaxSnapshotThreadsUsed;
if (heapSizeInBytes < ONE_GIGABYTE_IN_BYTES) {
expectedMaxSnapshotThreadsUsed = 1;
} else if (heapSizeInBytes < 2L * ONE_GIGABYTE_IN_BYTES) {
expectedMaxSnapshotThreadsUsed = 2;
} else if (heapSizeInBytes < 3L * ONE_GIGABYTE_IN_BYTES) {
expectedMaxSnapshotThreadsUsed = 4;
} else if (heapSizeInBytes < 4L * ONE_GIGABYTE_IN_BYTES) {
expectedMaxSnapshotThreadsUsed = 8;
} else {
expectedMaxSnapshotThreadsUsed = 10;
}

assertEquals(expectedMaxSnapshotThreadsUsed, repo.getMaxIndexDeletionConcurrency());

// reset original default setting
client().admin()
.cluster()
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
.setPersistentSettings(Settings.builder().putNull("repositories.blobstore.heap_size").build())
.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.stream.Collectors;

import static org.apache.lucene.tests.util.LuceneTestCase.random;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.HEAP_SIZE_SETTING;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.METADATA_BLOB_NAME_SUFFIX;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.METADATA_NAME_FORMAT;
Expand Down Expand Up @@ -462,6 +463,7 @@ private static ClusterService mockClusterService(ClusterState initialState) {
when(clusterApplierService.threadPool()).thenReturn(threadPool);
Set<Setting<?>> settingSet = new HashSet<>();
settingSet.add(MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING);
settingSet.add(HEAP_SIZE_SETTING);
ClusterSettings mockClusterSettings = new ClusterSettings(Settings.EMPTY, settingSet);
when(clusterService.getClusterSettings()).thenReturn(mockClusterSettings);
return clusterService;
Expand Down