diff --git a/docs/changelog/136952.yaml b/docs/changelog/136952.yaml new file mode 100644 index 0000000000000..d73c10b18580a --- /dev/null +++ b/docs/changelog/136952.yaml @@ -0,0 +1,6 @@ +pr: 136952 +summary: Restricts snapshot concurrency based on available heap memory +area: Snapshot/Restore +type: bug +issues: + - 131822 diff --git a/server/src/main/java/org/elasticsearch/common/bytes/ByteSizeConstants.java b/server/src/main/java/org/elasticsearch/common/bytes/ByteSizeConstants.java new file mode 100644 index 0000000000000..adf5d6d070b95 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/bytes/ByteSizeConstants.java @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.bytes; + +public class ByteSizeConstants { + public static int ONE_KILOBYTE_IN_BYTES = 1024; + public static int ONE_MEGABYTE_IN_BYTES = 1024 * ONE_KILOBYTE_IN_BYTES; + public static int ONE_GIGABYTE_IN_BYTES = 1024 * ONE_MEGABYTE_IN_BYTES; +} diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 595149ff26bb5..5a5ed84ef97af 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -656,6 +656,7 @@ public void apply(Settings value, Settings current, Settings previous) { WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING, WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_MINIMUM_LOGGING_INTERVAL, SamplingService.TTL_POLL_INTERVAL_SETTING, - BlobStoreRepository.MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING + BlobStoreRepository.MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING, + BlobStoreRepository.MAX_HEAP_SIZE_FOR_INDEX_METADATA_SETTING ); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index fdac63cc5466c..d3d2388705a4a 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -447,6 +447,14 @@ public static String getRepositoryDataBlobName(long repositoryGeneration) { ); private volatile int maxHeapSizeForSnapshotDeletion; + public static final Setting MAX_HEAP_SIZE_FOR_INDEX_METADATA_SETTING = Setting.memorySizeSetting( + "repositories.blobstore.max_heap_size_for_index_metadata", + "10%", + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + private volatile int maxIndexDeletionConcurrency; + /** * Repository settings that can be updated dynamically without having to create a new repository. */ @@ -573,6 +581,18 @@ protected BlobStoreRepository( Math.min(maxHeapSizeForSnapshotDeletion.getBytes(), Integer.MAX_VALUE - ByteSizeUnit.MB.toBytes(1)) ); }); + clusterSettings.initializeAndWatch(MAX_HEAP_SIZE_FOR_INDEX_METADATA_SETTING, maxHeapSizeForIndexMetadata -> { + this.maxIndexDeletionConcurrency = Math.toIntExact( + Math.min( + // Prevent smaller nodes from loading too many IndexMetadata objects in parallel and going OOMe (ES-12538). + // We use 50MB as an estimate for a large IndexMetadata object to load, and don't want to exceed 10% of total heap space + Math.max(1, maxHeapSizeForIndexMetadata.getMb() / 50), + // Each per-index process needs at least one snapshot thread at all times, so threadPool.info(SNAPSHOT).getMax() + // of them at once is enough to keep the thread pool fully utilized. + threadPool.info(ThreadPool.Names.SNAPSHOT).getMax() + ) + ); + }); } @Override @@ -892,6 +912,13 @@ protected final boolean isCompress() { return compress; } + /** + * @return the maximum concurrency permitted in the SnapshotsDeletion.IndexSnapshotsDeletion object + */ + protected int getMaxIndexDeletionConcurrency() { + return this.maxIndexDeletionConcurrency; + } + /** * Returns data file chunk size. *

@@ -1268,18 +1295,16 @@ void runCleanup(ActionListener listener) { private void writeUpdatedShardMetadataAndComputeDeletes(ActionListener listener) { // noinspection resource -- closed safely at the end of the iteration final var listeners = new RefCountingListener(listener); - - // Each per-index process takes some nonzero amount of working memory to hold the relevant snapshot IDs and metadata generations - // etc. which we can keep under tighter limits and release sooner if we limit the number of concurrently processing indices. - // Each one needs at least one snapshot thread at all times, so threadPool.info(SNAPSHOT).getMax() of them at once is enough to - // keep the threadpool fully utilized. ThrottledIterator.run( originalRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotIds), (ref, indexId) -> ActionListener.run( ActionListener.releaseAfter(listeners.acquire(), ref), l -> new IndexSnapshotsDeletion(indexId).run(l) ), - threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(), + // Each per-index process takes some nonzero amount of working memory to hold the relevant snapshot IDs + // and metadata generations etc. which we can keep under tighter limits and release sooner if we + // limit the number of concurrently processing indices. + maxIndexDeletionConcurrency, listeners::close ); } diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryDeleteThrottlingTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryDeleteThrottlingTests.java index 0d8011830dcec..58c53aa3fa0c5 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryDeleteThrottlingTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryDeleteThrottlingTests.java @@ -16,6 +16,8 @@ import org.elasticsearch.common.blobstore.OperationPurpose; import org.elasticsearch.common.blobstore.support.FilterBlobContainer; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.core.CheckedConsumer; @@ -41,6 +43,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; +import static org.elasticsearch.common.bytes.ByteSizeConstants.ONE_GIGABYTE_IN_BYTES; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -53,11 +56,36 @@ public class BlobStoreRepositoryDeleteThrottlingTests extends ESSingleNodeTestCa private static final String TEST_REPO_TYPE = "concurrency-limiting-fs"; private static final String TEST_REPO_NAME = "test-repo"; - private static final int MAX_SNAPSHOT_THREADS = 3; + /** + * The maximum number of threads possible to be used + */ + private static final int THREAD_POOL_MAX_SNAPSHOT_THREADS = 10; + /** + * The maximum number of snapshot threads we expect the BlobStore to use when deleting snapshots + */ + private static int maxSnapshotThreads; @Override protected Settings nodeSettings() { - return Settings.builder().put(super.nodeSettings()).put("thread_pool.snapshot.max", MAX_SNAPSHOT_THREADS).build(); + // Randomly generate a heap size up to 5GB. + long heapSizeInBytes = randomLongBetween(0, 5L * ONE_GIGABYTE_IN_BYTES); + // We expect at most only 10% of the total heap space to be used when loading index metadata + long heapAvailableForIndexMetaData = heapSizeInBytes / 10; + + // Calculate the maximum number of snapshot threads the BlobStore should use given the heap restrictions above + maxSnapshotThreads = Math.toIntExact( + Math.min( + Math.max(1, ByteSizeValue.of(heapAvailableForIndexMetaData, ByteSizeUnit.BYTES).getMb() / 50), + THREAD_POOL_MAX_SNAPSHOT_THREADS + ) + ); + + return Settings.builder() + .put(super.nodeSettings()) + // Mimic production by setting to 10 + .put("thread_pool.snapshot.max", THREAD_POOL_MAX_SNAPSHOT_THREADS) + .put("repositories.blobstore.max_heap_size_for_index_metadata", heapAvailableForIndexMetaData + "b") + .build(); } protected Collection> getPlugins() { @@ -98,7 +126,7 @@ protected BlobStore createBlobStore() throws Exception { private static class ConcurrencyLimitingBlobStore implements BlobStore { private final BlobStore delegate; private final Set activeIndices = ConcurrentCollections.newConcurrentSet(); - private final CountDownLatch countDownLatch = new CountDownLatch(MAX_SNAPSHOT_THREADS); + private final CountDownLatch countDownLatch = new CountDownLatch(maxSnapshotThreads); private ConcurrencyLimitingBlobStore(BlobStore delegate) { this.delegate = delegate; @@ -136,7 +164,7 @@ public InputStream readBlob(OperationPurpose purpose, String blobName) throws IO if (pathParts.size() == 2 && pathParts.get(0).equals("indices") && blobName.startsWith(BlobStoreRepository.METADATA_PREFIX)) { // reading index metadata, so mark index as active assertTrue(activeIndices.add(pathParts.get(1))); - assertThat(activeIndices.size(), lessThanOrEqualTo(MAX_SNAPSHOT_THREADS)); + assertThat(activeIndices.size(), lessThanOrEqualTo(maxSnapshotThreads)); countDownLatch.countDown(); safeAwait(countDownLatch); // ensure that we do use all the threads } @@ -168,7 +196,7 @@ public void testDeleteThrottling() { // Create enough indices that we cannot process them all at once - for (int i = 0; i < 3 * MAX_SNAPSHOT_THREADS; i++) { + for (int i = 0; i < 3 * maxSnapshotThreads; i++) { createIndex("index-" + i, indexSettings(between(1, 3), 0).build()); } diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index e210452aaaa34..488f52a84a3a9 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; @@ -90,6 +91,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.elasticsearch.common.bytes.ByteSizeConstants.ONE_GIGABYTE_IN_BYTES; import static org.elasticsearch.repositories.RepositoryDataTests.generateRandomRepoData; import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.INDEX_FILE_PREFIX; import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING; @@ -113,6 +115,19 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { static final String REPO_TYPE = "fs"; private static final String TEST_REPO_NAME = "test-repo"; + /** + * The maximum number of threads possible to be used + */ + private static final int THREAD_POOL_MAX_SNAPSHOT_THREADS = 10; + + @Override + protected Settings nodeSettings() { + return Settings.builder() + .put(super.nodeSettings()) + // Mimic production + .put("thread_pool.snapshot.max", THREAD_POOL_MAX_SNAPSHOT_THREADS) + .build(); + } public void testRetrieveSnapshots() { final Client client = client(); @@ -859,4 +874,38 @@ public void testWriteToShardBlobToDelete() { .setPersistentSettings(Settings.builder().putNull(MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING.getKey()).build()) .get(); } + + /** + * Tests whether we adjust the maximum concurrency when deleting snapshots + * according to the size of the heap memory + */ + public void testMaxIndexDeletionConcurrency() { + // Randomly generate a heap size up to 10GB. + long heapSizeInBytes = randomLongBetween(0, 10L * ONE_GIGABYTE_IN_BYTES); + // We expect at most only 10% of the total heap space to be used when loading index metadata + long heapAvailableForIndexMetaData = heapSizeInBytes / 10; + client().admin() + .cluster() + .prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .setPersistentSettings( + Settings.builder() + .put("repositories.blobstore.max_heap_size_for_index_metadata", heapAvailableForIndexMetaData + "b") + .build() + ) + .get(); + + final var repo = setupRepo(); + + // We use 50MB as a heuristic of how many index metadata objects we could load concurrently + long maxConcurrentThreads = Math.max(1, ByteSizeValue.of(heapAvailableForIndexMetaData, ByteSizeUnit.BYTES).getMb() / 50); + + assertEquals(Math.min(maxConcurrentThreads, THREAD_POOL_MAX_SNAPSHOT_THREADS), repo.getMaxIndexDeletionConcurrency()); + + // reset original default setting + client().admin() + .cluster() + .prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) + .setPersistentSettings(Settings.builder().putNull("repositories.blobstore.max_heap_size_for_index_metadata").build()) + .get(); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java index 9d09f53e05b15..e1f20fdaf29db 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java @@ -69,6 +69,7 @@ import java.util.stream.Collectors; import static org.apache.lucene.tests.util.LuceneTestCase.random; +import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.MAX_HEAP_SIZE_FOR_INDEX_METADATA_SETTING; import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING; import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.METADATA_BLOB_NAME_SUFFIX; import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.METADATA_NAME_FORMAT; @@ -462,6 +463,7 @@ private static ClusterService mockClusterService(ClusterState initialState) { when(clusterApplierService.threadPool()).thenReturn(threadPool); Set> settingSet = new HashSet<>(); settingSet.add(MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING); + settingSet.add(MAX_HEAP_SIZE_FOR_INDEX_METADATA_SETTING); ClusterSettings mockClusterSettings = new ClusterSettings(Settings.EMPTY, settingSet); when(clusterService.getClusterSettings()).thenReturn(mockClusterSettings); return clusterService;