Skip to content

Commit 27a3eb0

Browse files
authored
Increase repository_azure max. threads on serverless (#128130)
On Serverless, the `repository_azure` thread pool is shared between snapshots and translogs/segments upload logic. Because snapshots can be rate-limited when executing in the repository_azure thread pool, we want to leave enough room for the other upload threads to be executed. Relates ES-11391
1 parent 18c6079 commit 27a3eb0

File tree

6 files changed

+79
-8
lines changed

6 files changed

+79
-8
lines changed

modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
package org.elasticsearch.repositories.azure;
1111

1212
import org.apache.lucene.util.SetOnce;
13+
import org.elasticsearch.cluster.node.DiscoveryNode;
1314
import org.elasticsearch.cluster.service.ClusterService;
1415
import org.elasticsearch.common.settings.Setting;
1516
import org.elasticsearch.common.settings.Settings;
1617
import org.elasticsearch.common.util.BigArrays;
18+
import org.elasticsearch.common.util.concurrent.EsExecutors;
1719
import org.elasticsearch.core.TimeValue;
1820
import org.elasticsearch.env.Environment;
1921
import org.elasticsearch.indices.recovery.RecoverySettings;
@@ -24,6 +26,7 @@
2426
import org.elasticsearch.repositories.Repository;
2527
import org.elasticsearch.threadpool.ExecutorBuilder;
2628
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
29+
import org.elasticsearch.threadpool.ThreadPool;
2730
import org.elasticsearch.xcontent.NamedXContentRegistry;
2831

2932
import java.util.Arrays;
@@ -78,6 +81,7 @@ public Map<String, Repository.Factory> getRepositories(
7881
public Collection<?> createComponents(PluginServices services) {
7982
AzureClientProvider azureClientProvider = AzureClientProvider.create(services.threadPool(), settings);
8083
azureStoreService.set(createAzureStorageService(settings, azureClientProvider));
84+
assert assertRepositoryAzureMaxThreads(settings, services.threadPool());
8185
return List.of(azureClientProvider);
8286
}
8387

@@ -107,12 +111,18 @@ public List<Setting<?>> getSettings() {
107111
}
108112

109113
@Override
110-
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settingsToUse) {
111-
return List.of(executorBuilder(), nettyEventLoopExecutorBuilder(settingsToUse));
114+
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
115+
return List.of(executorBuilder(settings), nettyEventLoopExecutorBuilder(settings));
112116
}
113117

114-
public static ExecutorBuilder<?> executorBuilder() {
115-
return new ScalingExecutorBuilder(REPOSITORY_THREAD_POOL_NAME, 0, 5, TimeValue.timeValueSeconds(30L), false);
118+
public static ExecutorBuilder<?> executorBuilder(Settings settings) {
119+
int repositoryAzureMax = 5;
120+
if (DiscoveryNode.isStateless(settings)) {
121+
// REPOSITORY_THREAD_POOL_NAME is shared between snapshot and translogs/segments upload logic in serverless. In order to avoid
122+
// snapshots to slow down other uploads due to rate limiting, we allow more threads in serverless.
123+
repositoryAzureMax += ThreadPool.getMaxSnapshotThreadPoolSize(EsExecutors.allocatedProcessors(settings));
124+
}
125+
return new ScalingExecutorBuilder(REPOSITORY_THREAD_POOL_NAME, 0, repositoryAzureMax, TimeValue.timeValueSeconds(30L), false);
116126
}
117127

118128
public static ExecutorBuilder<?> nettyEventLoopExecutorBuilder(Settings settings) {
@@ -128,4 +138,19 @@ public void reload(Settings settingsToLoad) {
128138
assert storageService != null;
129139
storageService.refreshSettings(clientsSettings);
130140
}
141+
142+
private static boolean assertRepositoryAzureMaxThreads(Settings settings, ThreadPool threadPool) {
143+
if (DiscoveryNode.isStateless(settings)) {
144+
var repositoryAzureMax = threadPool.info(REPOSITORY_THREAD_POOL_NAME).getMax();
145+
var snapshotMax = ThreadPool.getMaxSnapshotThreadPoolSize(EsExecutors.allocatedProcessors(settings));
146+
assert snapshotMax < repositoryAzureMax
147+
: "thread pool ["
148+
+ REPOSITORY_THREAD_POOL_NAME
149+
+ "] should be large enough to allow all "
150+
+ snapshotMax
151+
+ " snapshot threads to run at once, but got: "
152+
+ repositoryAzureMax;
153+
}
154+
return true;
155+
}
131156
}

modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AbstractAzureServerTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void setUp() throws Exception {
7676
serverlessMode = false;
7777
threadPool = new TestThreadPool(
7878
getTestClass().getName(),
79-
AzureRepositoryPlugin.executorBuilder(),
79+
AzureRepositoryPlugin.executorBuilder(Settings.EMPTY),
8080
AzureRepositoryPlugin.nettyEventLoopExecutorBuilder(Settings.EMPTY)
8181
);
8282
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);

modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureClientProviderTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class AzureClientProviderTests extends ESTestCase {
3535
public void setUpThreadPool() {
3636
threadPool = new TestThreadPool(
3737
getTestName(),
38-
AzureRepositoryPlugin.executorBuilder(),
38+
AzureRepositoryPlugin.executorBuilder(Settings.EMPTY),
3939
AzureRepositoryPlugin.nettyEventLoopExecutorBuilder(Settings.EMPTY)
4040
);
4141
azureClientProvider = AzureClientProvider.create(threadPool, Settings.EMPTY);
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.repositories.azure;
11+
12+
import org.elasticsearch.common.settings.Settings;
13+
import org.elasticsearch.common.util.concurrent.EsExecutors;
14+
import org.elasticsearch.telemetry.metric.MeterRegistry;
15+
import org.elasticsearch.test.ESTestCase;
16+
import org.elasticsearch.threadpool.ThreadPool;
17+
18+
import java.util.Map;
19+
20+
import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME;
21+
import static org.hamcrest.Matchers.equalTo;
22+
23+
public class AzureRepositoryPluginTests extends ESTestCase {
24+
25+
public void testRepositoryAzureMaxThreads() {
26+
final boolean isServerless = randomBoolean();
27+
final var settings = Settings.builder().put("node.name", getTestName()).put(STATELESS_ENABLED_SETTING_NAME, isServerless).build();
28+
29+
ThreadPool threadPool = null;
30+
try {
31+
threadPool = new ThreadPool(
32+
settings,
33+
MeterRegistry.NOOP,
34+
(settings1, allocatedProcessors) -> Map.of(),
35+
AzureRepositoryPlugin.executorBuilder(settings)
36+
);
37+
38+
assertThat(
39+
threadPool.info(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME).getMax(),
40+
equalTo(isServerless ? ThreadPool.getMaxSnapshotThreadPoolSize(EsExecutors.allocatedProcessors(settings)) + 5 : 5)
41+
);
42+
} finally {
43+
terminate(threadPool);
44+
}
45+
}
46+
}

modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class AzureStorageServiceTests extends ESTestCase {
5050
public void setUpThreadPool() {
5151
threadPool = new TestThreadPool(
5252
AzureStorageServiceTests.class.getName(),
53-
AzureRepositoryPlugin.executorBuilder(),
53+
AzureRepositoryPlugin.executorBuilder(Settings.EMPTY),
5454
AzureRepositoryPlugin.nettyEventLoopExecutorBuilder(Settings.EMPTY)
5555
);
5656
}

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -682,7 +682,7 @@ public static int searchOrGetThreadPoolSize(final int allocatedProcessors) {
682682
return ((allocatedProcessors * 3) / 2) + 1;
683683
}
684684

685-
static int getMaxSnapshotThreadPoolSize(int allocatedProcessors) {
685+
public static int getMaxSnapshotThreadPoolSize(int allocatedProcessors) {
686686
final ByteSizeValue maxHeapSize = ByteSizeValue.ofBytes(Runtime.getRuntime().maxMemory());
687687
return getMaxSnapshotThreadPoolSize(allocatedProcessors, maxHeapSize);
688688
}

0 commit comments

Comments
 (0)