-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Restricts snapshot concurrency based on available heap memory #136952
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
8f9d2ba
a5137b8
4b9d4b0
84358c7
8ac4fbc
1b9884c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -447,6 +447,14 @@ public static String getRepositoryDataBlobName(long repositoryGeneration) { | |
| ); | ||
| private volatile int maxHeapSizeForSnapshotDeletion; | ||
|
|
||
| public static final Setting<ByteSizeValue> HEAP_SIZE_SETTING = Setting.memorySizeSetting( | ||
| "repositories.blobstore.heap_size", | ||
| "100%", | ||
| Setting.Property.Dynamic, | ||
| Setting.Property.NodeScope | ||
| ); | ||
| private volatile int maxIndexDeletionConcurrency; | ||
|
|
||
| /** | ||
| * Repository settings that can be updated dynamically without having to create a new repository. | ||
| */ | ||
|
|
@@ -573,6 +581,18 @@ protected BlobStoreRepository( | |
| Math.min(maxHeapSizeForSnapshotDeletion.getBytes(), Integer.MAX_VALUE - ByteSizeUnit.MB.toBytes(1)) | ||
| ); | ||
| }); | ||
| clusterSettings.initializeAndWatch(HEAP_SIZE_SETTING, heapSize -> { | ||
| // If the heap size is a fractional GB size, then the fractional part is discarded | ||
| long heapSizeInGb = heapSize.getGb(); | ||
| this.maxIndexDeletionConcurrency = Math.min( | ||
| // Prevent smaller nodes from loading too many IndexMetadata objects in parallel | ||
| // and going OOMe (ES-12538) | ||
| (int) Math.pow(2, heapSizeInGb), | ||
|
||
| // Each per-index process needs at least one snapshot thread at all times, so threadPool.info(SNAPSHOT).getMax() | ||
| // of them at once is enough to keep the thread pool fully utilized. | ||
| threadPool.info(ThreadPool.Names.SNAPSHOT).getMax() | ||
| ); | ||
| }); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -892,6 +912,13 @@ protected final boolean isCompress() { | |
| return compress; | ||
| } | ||
|
|
||
| /** | ||
| * @return the maximum concurrency permitted in the SnapshotsDeletion.IndexSnapshotsDeletion object | ||
| */ | ||
| protected int getMaxIndexDeletionConcurrency() { | ||
| return this.maxIndexDeletionConcurrency; | ||
| } | ||
|
|
||
| /** | ||
| * Returns data file chunk size. | ||
| * <p> | ||
|
|
@@ -1268,18 +1295,16 @@ void runCleanup(ActionListener<DeleteResult> listener) { | |
| private void writeUpdatedShardMetadataAndComputeDeletes(ActionListener<Void> listener) { | ||
| // noinspection resource -- closed safely at the end of the iteration | ||
| final var listeners = new RefCountingListener(listener); | ||
|
|
||
| // Each per-index process takes some nonzero amount of working memory to hold the relevant snapshot IDs and metadata generations | ||
| // etc. which we can keep under tighter limits and release sooner if we limit the number of concurrently processing indices. | ||
| // Each one needs at least one snapshot thread at all times, so threadPool.info(SNAPSHOT).getMax() of them at once is enough to | ||
| // keep the threadpool fully utilized. | ||
| ThrottledIterator.run( | ||
| originalRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotIds), | ||
| (ref, indexId) -> ActionListener.run( | ||
| ActionListener.releaseAfter(listeners.acquire(), ref), | ||
| l -> new IndexSnapshotsDeletion(indexId).run(l) | ||
| ), | ||
| threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), | ||
| // Each per-index process takes some nonzero amount of working memory to hold the relevant snapshot IDs | ||
| // and metadata generations etc. which we can keep under tighter limits and release sooner if we | ||
| // limit the number of concurrently processing indices. | ||
| maxIndexDeletionConcurrency, | ||
| listeners::close | ||
| ); | ||
| } | ||
|
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am extending this test to ensure that, depending on the available heap memory, we throttle the number of concurrent snapshot threads accordingly |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -90,6 +90,7 @@ | |
| import java.util.function.Function; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import static org.elasticsearch.common.bytes.ByteSizeConstants.ONE_GIGABYTE_IN_BYTES; | ||
| import static org.elasticsearch.repositories.RepositoryDataTests.generateRandomRepoData; | ||
| import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.INDEX_FILE_PREFIX; | ||
| import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING; | ||
|
|
@@ -114,6 +115,15 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { | |
| static final String REPO_TYPE = "fs"; | ||
| private static final String TEST_REPO_NAME = "test-repo"; | ||
|
|
||
| @Override | ||
| protected Settings nodeSettings() { | ||
| return Settings.builder() | ||
| .put(super.nodeSettings()) | ||
| // Mimic production | ||
| .put("thread_pool.snapshot.max", 10) | ||
| .build(); | ||
| } | ||
|
|
||
| public void testRetrieveSnapshots() { | ||
| final Client client = client(); | ||
| final Path location = ESIntegTestCase.randomRepoPath(node().settings()); | ||
|
|
@@ -859,4 +869,41 @@ public void testWriteToShardBlobToDelete() { | |
| .setPersistentSettings(Settings.builder().putNull(MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING.getKey()).build()) | ||
| .get(); | ||
| } | ||
|
|
||
| /** | ||
| * Tests whether we adjust the maximum concurrency when deleting snapshots | ||
| * according to the size of the heap memory | ||
| */ | ||
| public void testMaxIndexDeletionConcurrency() { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test is different to the previous. This is just a basic unit test to ensure that the function of heap memory -> snapshot threads is as expected, but doesn't test whether all threads are properly utilised |
||
| long heapSizeInBytes = randomLongBetween(0, 5L * ONE_GIGABYTE_IN_BYTES); | ||
| client().admin() | ||
| .cluster() | ||
| .prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) | ||
| .setPersistentSettings(Settings.builder().put("repositories.blobstore.heap_size", heapSizeInBytes + "b").build()) | ||
| .get(); | ||
|
|
||
| final var repo = setupRepo(); | ||
|
|
||
| int expectedMaxSnapshotThreadsUsed; | ||
| if (heapSizeInBytes < ONE_GIGABYTE_IN_BYTES) { | ||
| expectedMaxSnapshotThreadsUsed = 1; | ||
| } else if (heapSizeInBytes < 2L * ONE_GIGABYTE_IN_BYTES) { | ||
| expectedMaxSnapshotThreadsUsed = 2; | ||
| } else if (heapSizeInBytes < 3L * ONE_GIGABYTE_IN_BYTES) { | ||
| expectedMaxSnapshotThreadsUsed = 4; | ||
| } else if (heapSizeInBytes < 4L * ONE_GIGABYTE_IN_BYTES) { | ||
| expectedMaxSnapshotThreadsUsed = 8; | ||
| } else { | ||
| expectedMaxSnapshotThreadsUsed = 10; | ||
| } | ||
|
|
||
| assertEquals(expectedMaxSnapshotThreadsUsed, repo.getMaxIndexDeletionConcurrency()); | ||
|
|
||
| // reset original default setting | ||
| client().admin() | ||
| .cluster() | ||
| .prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) | ||
| .setPersistentSettings(Settings.builder().putNull("repositories.blobstore.heap_size").build()) | ||
| .get(); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is an easier way to get the total heap memory then please let me know. I was following a similar approach to above with the
MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTINGsetting.My concern is that to modify
HEAP_SIZE_SETTINGinside the tests I had to use theSetting.Property.Dynamicproperty, but I don't want other users/code changing this percentage to something stupidly small and then the snapshotting takes too long and times out.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can get the heap memory with
Runtime.getRuntime().maxMemory()orJvmInfo.jvmInfo().getMem().getHeapMax(). Is that what you need?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did consider this, but I thought it too static, and it also conflicted with the approach I took in #133558
My thought process is that by utilising the existing
memorySizeSettinghere to return 10% of the available heap memory for use when loadingIndexMetadataobjects, the actual % can be dynamically updated without requiring a code change which is nice, and theByteSizeValuehandles all the maths in the background. I still have a concern about this setting being abused by someone setting the value really low (say 1%) which will force us to use only a single snapshot thread, but this concern isn't specific to theMAX_HEAP_SIZE_FOR_INDEX_METADATA_SETTINGsetting, but a wider concern of any dynamically updatable setting.My concern in:
was because I was trying to get 100% of the heap via a setting which seemed long-winded, but after this comment by David it seems the best approach. Do you agree?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's ok. I mean there's two made-up fiddle factors in play here, the "max 10% of heap" is a guess, as is the "max 50MiB of heap per IMD instance", but then the concurrency number is just one divided by the other. I'm not sure that's the most user-friendly interface really but my only other idea is to control the actual concurrency with a setting whose default is a function of
JvmInfo.jvmInfo().getMem().getHeapMax(). One super-opaque setting or two slightly-less-opaque settings...There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prior to loading the
IndexMetadataobject from heap memory, can we check 1) how big the object we will be loading is, and 2) whether we have enough space to load it? Then, if we don't have space, we block until we do. It's not ideal, but better than OOMing?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yet another possibility would be to introduce a variant of
org.elasticsearch.cluster.metadata.IndexMetadata.Builder#fromXContent()which skips over any mappings. It's the mappings that take up most of the heap space in practice, but we simply don't need them here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
XContentPasercan be configured to filter out keys. For snapshots, we control theXContentParsercreation in ChecksumBlobStoreFormat where we can configure it to skip themappingskey. I tested it briefly and it seems to be possible with the following change.If something like this is feasible, I'd think we don't need the memory limit which feels mostly a guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting, TIL, I didn't know there was a filter-on-parse option.
Can we just use the
includeStringsparameter? Really we only needsettings.index.number_of_shardstho I could imagine we might have to pull a few more things in to satisfy various invariants onIndexMetadata.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On reflection, maybe we don't even need to load this as a full
IndexMetadata. We could define anotherMetadataStateFormat<...>which defines a differentfromXContentthat only readssettings.index.number_of_shardsand skips everything else.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have pushed a WIP PR here if you guys wouldn't mind taking a look. If you agree with the approach, I would close this PR in favour of the above.