diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java index 24043bed8ddb2..058027bda1ec1 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java @@ -10,10 +10,12 @@ package org.elasticsearch.repositories.azure; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.indices.recovery.RecoverySettings; @@ -24,6 +26,7 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.threadpool.ExecutorBuilder; import org.elasticsearch.threadpool.ScalingExecutorBuilder; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.NamedXContentRegistry; import java.util.Arrays; @@ -78,6 +81,7 @@ public Map getRepositories( public Collection createComponents(PluginServices services) { AzureClientProvider azureClientProvider = AzureClientProvider.create(services.threadPool(), settings); azureStoreService.set(createAzureStorageService(settings, azureClientProvider)); + assert assertRepositoryAzureMaxThreads(settings, services.threadPool()); return List.of(azureClientProvider); } @@ -107,12 +111,18 @@ public List> getSettings() { } @Override - public List> getExecutorBuilders(Settings settingsToUse) { - return List.of(executorBuilder(), nettyEventLoopExecutorBuilder(settingsToUse)); + public List> getExecutorBuilders(Settings settings) { + return List.of(executorBuilder(settings), nettyEventLoopExecutorBuilder(settings)); } - public static ExecutorBuilder executorBuilder() { - return new ScalingExecutorBuilder(REPOSITORY_THREAD_POOL_NAME, 0, 5, TimeValue.timeValueSeconds(30L), false); + public static ExecutorBuilder executorBuilder(Settings settings) { + int repositoryAzureMax = 5; + if (DiscoveryNode.isStateless(settings)) { + // REPOSITORY_THREAD_POOL_NAME is shared between snapshot and translogs/segments upload logic in serverless. In order to avoid + // snapshots to slow down other uploads due to rate limiting, we allow more threads in serverless. + repositoryAzureMax += ThreadPool.getMaxSnapshotThreadPoolSize(EsExecutors.allocatedProcessors(settings)); + } + return new ScalingExecutorBuilder(REPOSITORY_THREAD_POOL_NAME, 0, repositoryAzureMax, TimeValue.timeValueSeconds(30L), false); } public static ExecutorBuilder nettyEventLoopExecutorBuilder(Settings settings) { @@ -128,4 +138,19 @@ public void reload(Settings settingsToLoad) { assert storageService != null; storageService.refreshSettings(clientsSettings); } + + private static boolean assertRepositoryAzureMaxThreads(Settings settings, ThreadPool threadPool) { + if (DiscoveryNode.isStateless(settings)) { + var repositoryAzureMax = threadPool.info(REPOSITORY_THREAD_POOL_NAME).getMax(); + var snapshotMax = ThreadPool.getMaxSnapshotThreadPoolSize(EsExecutors.allocatedProcessors(settings)); + assert snapshotMax < repositoryAzureMax + : "thread pool [" + + REPOSITORY_THREAD_POOL_NAME + + "] should be large enough to allow all " + + snapshotMax + + " snapshot threads to run at once, but got: " + + repositoryAzureMax; + } + return true; + } } diff --git a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AbstractAzureServerTestCase.java b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AbstractAzureServerTestCase.java index 902096fe027ed..39fe222d47a04 100644 --- a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AbstractAzureServerTestCase.java +++ b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AbstractAzureServerTestCase.java @@ -76,7 +76,7 @@ public void setUp() throws Exception { serverlessMode = false; threadPool = new TestThreadPool( getTestClass().getName(), - AzureRepositoryPlugin.executorBuilder(), + AzureRepositoryPlugin.executorBuilder(Settings.EMPTY), AzureRepositoryPlugin.nettyEventLoopExecutorBuilder(Settings.EMPTY) ); httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); diff --git a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureClientProviderTests.java b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureClientProviderTests.java index 2699438de8ac6..a66be65380da1 100644 --- a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureClientProviderTests.java +++ b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureClientProviderTests.java @@ -35,7 +35,7 @@ public class AzureClientProviderTests extends ESTestCase { public void setUpThreadPool() { threadPool = new TestThreadPool( getTestName(), - AzureRepositoryPlugin.executorBuilder(), + AzureRepositoryPlugin.executorBuilder(Settings.EMPTY), AzureRepositoryPlugin.nettyEventLoopExecutorBuilder(Settings.EMPTY) ); azureClientProvider = AzureClientProvider.create(threadPool, Settings.EMPTY); diff --git a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositoryPluginTests.java b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositoryPluginTests.java new file mode 100644 index 0000000000000..e8b420c2e58db --- /dev/null +++ b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureRepositoryPluginTests.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.repositories.azure; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.telemetry.metric.MeterRegistry; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Map; + +import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME; +import static org.hamcrest.Matchers.equalTo; + +public class AzureRepositoryPluginTests extends ESTestCase { + + public void testRepositoryAzureMaxThreads() { + final boolean isServerless = randomBoolean(); + final var settings = Settings.builder().put("node.name", getTestName()).put(STATELESS_ENABLED_SETTING_NAME, isServerless).build(); + + ThreadPool threadPool = null; + try { + threadPool = new ThreadPool( + settings, + MeterRegistry.NOOP, + (settings1, allocatedProcessors) -> Map.of(), + AzureRepositoryPlugin.executorBuilder(settings) + ); + + assertThat( + threadPool.info(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME).getMax(), + equalTo(isServerless ? ThreadPool.getMaxSnapshotThreadPoolSize(EsExecutors.allocatedProcessors(settings)) + 5 : 5) + ); + } finally { + terminate(threadPool); + } + } +} diff --git a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceTests.java b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceTests.java index ef50644ac95e5..f4cb86cb5431f 100644 --- a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceTests.java +++ b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceTests.java @@ -50,7 +50,7 @@ public class AzureStorageServiceTests extends ESTestCase { public void setUpThreadPool() { threadPool = new TestThreadPool( AzureStorageServiceTests.class.getName(), - AzureRepositoryPlugin.executorBuilder(), + AzureRepositoryPlugin.executorBuilder(Settings.EMPTY), AzureRepositoryPlugin.nettyEventLoopExecutorBuilder(Settings.EMPTY) ); } diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 1287dadd36928..225fc5bc2bd03 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -682,7 +682,7 @@ public static int searchOrGetThreadPoolSize(final int allocatedProcessors) { return ((allocatedProcessors * 3) / 2) + 1; } - static int getMaxSnapshotThreadPoolSize(int allocatedProcessors) { + public static int getMaxSnapshotThreadPoolSize(int allocatedProcessors) { final ByteSizeValue maxHeapSize = ByteSizeValue.ofBytes(Runtime.getRuntime().maxMemory()); return getMaxSnapshotThreadPoolSize(allocatedProcessors, maxHeapSize); }