Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/136952.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 136952
summary: Restricts snapshot concurrency based on available heap memory
area: Snapshot/Restore
type: bug
issues:
- 131822
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.common.bytes;

public class ByteSizeConstants {
public static int ONE_KILOBYTE_IN_BYTES = 1024;
public static int ONE_MEGABYTE_IN_BYTES = 1024 * ONE_KILOBYTE_IN_BYTES;
public static int ONE_GIGABYTE_IN_BYTES = 1024 * ONE_MEGABYTE_IN_BYTES;
}
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,7 @@ public void apply(Settings value, Settings current, Settings previous) {
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING,
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_MINIMUM_LOGGING_INTERVAL,
SamplingService.TTL_POLL_INTERVAL_SETTING,
BlobStoreRepository.MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING
BlobStoreRepository.MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING,
BlobStoreRepository.MAX_HEAP_SIZE_FOR_INDEX_METADATA_SETTING
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,14 @@ public static String getRepositoryDataBlobName(long repositoryGeneration) {
);
private volatile int maxHeapSizeForSnapshotDeletion;

public static final Setting<ByteSizeValue> MAX_HEAP_SIZE_FOR_INDEX_METADATA_SETTING = Setting.memorySizeSetting(
"repositories.blobstore.max_heap_size_for_index_metadata",
"10%",
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
private volatile int maxIndexDeletionConcurrency;

/**
* Repository settings that can be updated dynamically without having to create a new repository.
*/
Expand Down Expand Up @@ -573,6 +581,18 @@ protected BlobStoreRepository(
Math.min(maxHeapSizeForSnapshotDeletion.getBytes(), Integer.MAX_VALUE - ByteSizeUnit.MB.toBytes(1))
);
});
clusterSettings.initializeAndWatch(MAX_HEAP_SIZE_FOR_INDEX_METADATA_SETTING, maxHeapSizeForIndexMetadata -> {
this.maxIndexDeletionConcurrency = Math.toIntExact(
Math.min(
// Prevent smaller nodes from loading too many IndexMetadata objects in parallel and going OOMe (ES-12538).
// We use 50MB as an estimate for a large IndexMetadata object to load, and don't want to exceed 10% of total heap space
Math.max(1, maxHeapSizeForIndexMetadata.getMb() / 50),
// Each per-index process needs at least one snapshot thread at all times, so threadPool.info(SNAPSHOT).getMax()
// of them at once is enough to keep the thread pool fully utilized.
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax()
)
);
});
}

@Override
Expand Down Expand Up @@ -892,6 +912,13 @@ protected final boolean isCompress() {
return compress;
}

/**
* @return the maximum concurrency permitted in the SnapshotsDeletion.IndexSnapshotsDeletion object
*/
protected int getMaxIndexDeletionConcurrency() {
return this.maxIndexDeletionConcurrency;
}

/**
* Returns data file chunk size.
* <p>
Expand Down Expand Up @@ -1268,18 +1295,16 @@ void runCleanup(ActionListener<DeleteResult> listener) {
private void writeUpdatedShardMetadataAndComputeDeletes(ActionListener<Void> listener) {
// noinspection resource -- closed safely at the end of the iteration
final var listeners = new RefCountingListener(listener);

// Each per-index process takes some nonzero amount of working memory to hold the relevant snapshot IDs and metadata generations
// etc. which we can keep under tighter limits and release sooner if we limit the number of concurrently processing indices.
// Each one needs at least one snapshot thread at all times, so threadPool.info(SNAPSHOT).getMax() of them at once is enough to
// keep the threadpool fully utilized.
ThrottledIterator.run(
originalRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotIds),
(ref, indexId) -> ActionListener.run(
ActionListener.releaseAfter(listeners.acquire(), ref),
l -> new IndexSnapshotsDeletion(indexId).run(l)
),
threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
// Each per-index process takes some nonzero amount of working memory to hold the relevant snapshot IDs
// and metadata generations etc. which we can keep under tighter limits and release sooner if we
// limit the number of concurrently processing indices.
maxIndexDeletionConcurrency,
listeners::close
);
}
Expand Down
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am extending this test to ensure that, depending on the available heap memory, we throttle the number of concurrent snapshot threads accordingly

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.elasticsearch.common.blobstore.OperationPurpose;
import org.elasticsearch.common.blobstore.support.FilterBlobContainer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.CheckedConsumer;
Expand All @@ -41,6 +43,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;

import static org.elasticsearch.common.bytes.ByteSizeConstants.ONE_GIGABYTE_IN_BYTES;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

Expand All @@ -53,11 +56,36 @@ public class BlobStoreRepositoryDeleteThrottlingTests extends ESSingleNodeTestCa

private static final String TEST_REPO_TYPE = "concurrency-limiting-fs";
private static final String TEST_REPO_NAME = "test-repo";
private static final int MAX_SNAPSHOT_THREADS = 3;
/**
* The maximum number of threads possible to be used
*/
private static final int THREAD_POOL_MAX_SNAPSHOT_THREADS = 10;
/**
* The maximum number of snapshot threads we expect the BlobStore to use when deleting snapshots
*/
private static int maxSnapshotThreads;

@Override
protected Settings nodeSettings() {
return Settings.builder().put(super.nodeSettings()).put("thread_pool.snapshot.max", MAX_SNAPSHOT_THREADS).build();
// Randomly generate a heap size up to 5GB.
long heapSizeInBytes = randomLongBetween(0, 5L * ONE_GIGABYTE_IN_BYTES);
// We expect at most only 10% of the total heap space to be used when loading index metadata
long heapAvailableForIndexMetaData = heapSizeInBytes / 10;

// Calculate the maximum number of snapshot threads the BlobStore should use given the heap restrictions above
maxSnapshotThreads = Math.toIntExact(
Math.min(
Math.max(1, ByteSizeValue.of(heapAvailableForIndexMetaData, ByteSizeUnit.BYTES).getMb() / 50),
THREAD_POOL_MAX_SNAPSHOT_THREADS
)
);

return Settings.builder()
.put(super.nodeSettings())
// Mimic production by setting to 10
.put("thread_pool.snapshot.max", THREAD_POOL_MAX_SNAPSHOT_THREADS)
.put("repositories.blobstore.max_heap_size_for_index_metadata", heapAvailableForIndexMetaData + "b")
.build();
}

protected Collection<Class<? extends Plugin>> getPlugins() {
Expand Down Expand Up @@ -98,7 +126,7 @@ protected BlobStore createBlobStore() throws Exception {
private static class ConcurrencyLimitingBlobStore implements BlobStore {
private final BlobStore delegate;
private final Set<String> activeIndices = ConcurrentCollections.newConcurrentSet();
private final CountDownLatch countDownLatch = new CountDownLatch(MAX_SNAPSHOT_THREADS);
private final CountDownLatch countDownLatch = new CountDownLatch(maxSnapshotThreads);

private ConcurrencyLimitingBlobStore(BlobStore delegate) {
this.delegate = delegate;
Expand Down Expand Up @@ -136,7 +164,7 @@ public InputStream readBlob(OperationPurpose purpose, String blobName) throws IO
if (pathParts.size() == 2 && pathParts.get(0).equals("indices") && blobName.startsWith(BlobStoreRepository.METADATA_PREFIX)) {
// reading index metadata, so mark index as active
assertTrue(activeIndices.add(pathParts.get(1)));
assertThat(activeIndices.size(), lessThanOrEqualTo(MAX_SNAPSHOT_THREADS));
assertThat(activeIndices.size(), lessThanOrEqualTo(maxSnapshotThreads));
countDownLatch.countDown();
safeAwait(countDownLatch); // ensure that we do use all the threads
}
Expand Down Expand Up @@ -168,7 +196,7 @@ public void testDeleteThrottling() {

// Create enough indices that we cannot process them all at once

for (int i = 0; i < 3 * MAX_SNAPSHOT_THREADS; i++) {
for (int i = 0; i < 3 * maxSnapshotThreads; i++) {
createIndex("index-" + i, indexSettings(between(1, 3), 0).build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -90,6 +91,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.elasticsearch.common.bytes.ByteSizeConstants.ONE_GIGABYTE_IN_BYTES;
import static org.elasticsearch.repositories.RepositoryDataTests.generateRandomRepoData;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.INDEX_FILE_PREFIX;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING;
Expand All @@ -113,6 +115,19 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {

static final String REPO_TYPE = "fs";
private static final String TEST_REPO_NAME = "test-repo";
/**
* The maximum number of threads possible to be used
*/
private static final int THREAD_POOL_MAX_SNAPSHOT_THREADS = 10;

@Override
protected Settings nodeSettings() {
return Settings.builder()
.put(super.nodeSettings())
// Mimic production
.put("thread_pool.snapshot.max", THREAD_POOL_MAX_SNAPSHOT_THREADS)
.build();
}

public void testRetrieveSnapshots() {
final Client client = client();
Expand Down Expand Up @@ -859,4 +874,38 @@ public void testWriteToShardBlobToDelete() {
.setPersistentSettings(Settings.builder().putNull(MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING.getKey()).build())
.get();
}

/**
* Tests whether we adjust the maximum concurrency when deleting snapshots
* according to the size of the heap memory
*/
public void testMaxIndexDeletionConcurrency() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is different to the previous. This is just a basic unit test to ensure that the function of heap memory -> snapshot threads is as expected, but doesn't test whether all threads are properly utilised

// Randomly generate a heap size up to 10GB.
long heapSizeInBytes = randomLongBetween(0, 10L * ONE_GIGABYTE_IN_BYTES);
// We expect at most only 10% of the total heap space to be used when loading index metadata
long heapAvailableForIndexMetaData = heapSizeInBytes / 10;
client().admin()
.cluster()
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
.setPersistentSettings(
Settings.builder()
.put("repositories.blobstore.max_heap_size_for_index_metadata", heapAvailableForIndexMetaData + "b")
.build()
)
.get();

final var repo = setupRepo();

// We use 50MB as a heuristic of how many index metadata objects we could load concurrently
long maxConcurrentThreads = Math.max(1, ByteSizeValue.of(heapAvailableForIndexMetaData, ByteSizeUnit.BYTES).getMb() / 50);

assertEquals(Math.min(maxConcurrentThreads, THREAD_POOL_MAX_SNAPSHOT_THREADS), repo.getMaxIndexDeletionConcurrency());

// reset original default setting
client().admin()
.cluster()
.prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
.setPersistentSettings(Settings.builder().putNull("repositories.blobstore.max_heap_size_for_index_metadata").build())
.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.stream.Collectors;

import static org.apache.lucene.tests.util.LuceneTestCase.random;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.MAX_HEAP_SIZE_FOR_INDEX_METADATA_SETTING;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.METADATA_BLOB_NAME_SUFFIX;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.METADATA_NAME_FORMAT;
Expand Down Expand Up @@ -462,6 +463,7 @@ private static ClusterService mockClusterService(ClusterState initialState) {
when(clusterApplierService.threadPool()).thenReturn(threadPool);
Set<Setting<?>> settingSet = new HashSet<>();
settingSet.add(MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING);
settingSet.add(MAX_HEAP_SIZE_FOR_INDEX_METADATA_SETTING);
ClusterSettings mockClusterSettings = new ClusterSettings(Settings.EMPTY, settingSet);
when(clusterService.getClusterSettings()).thenReturn(mockClusterSettings);
return clusterService;
Expand Down