Skip to content

Commit f7911fe

Browse files
authored
[ML] Limit in flight requests when indexing model download parts (#112992) (#113514)
Restores the changes from #111684 which uses multiple streams to improve the time to download and install the built in ml models. The first iteration has a problem where the number of in-flight requests was not properly limited which is fixed here. Additionally there are now circuit breaker checks on allocating the buffer used to store the model definition.
1 parent a218801 commit f7911fe

File tree

12 files changed

+896
-173
lines changed

12 files changed

+896
-173
lines changed

docs/changelog/111684.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 111684
2+
summary: Write downloaded model parts async
3+
area: Machine Learning
4+
type: enhancement
5+
issues: []

x-pack/plugin/inference/qa/inference-service-tests/src/javaRestTest/java/org/elasticsearch/xpack/inference/TextEmbeddingCrudIT.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package org.elasticsearch.xpack.inference;
99

10-
import org.apache.lucene.tests.util.LuceneTestCase;
1110
import org.elasticsearch.client.Request;
1211
import org.elasticsearch.common.Strings;
1312
import org.elasticsearch.inference.TaskType;
@@ -19,11 +18,11 @@
1918

2019
import static org.hamcrest.Matchers.containsString;
2120

22-
// Tests disabled in CI due to the models being too large to download. Can be enabled (commented out) for local testing
23-
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/105198")
21+
// This test was previously disabled in CI due to the models being too large
22+
// See "https://github.com/elastic/elasticsearch/issues/105198".
2423
public class TextEmbeddingCrudIT extends InferenceBaseRestTest {
2524

26-
public void testPutE5Small_withNoModelVariant() throws IOException {
25+
public void testPutE5Small_withNoModelVariant() {
2726
{
2827
String inferenceEntityId = randomAlphaOfLength(10).toLowerCase();
2928
expectThrows(
@@ -51,6 +50,7 @@ public void testPutE5Small_withPlatformAgnosticVariant() throws IOException {
5150
deleteTextEmbeddingModel(inferenceEntityId);
5251
}
5352

53+
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/105198")
5454
public void testPutE5Small_withPlatformSpecificVariant() throws IOException {
5555
String inferenceEntityId = randomAlphaOfLength(10).toLowerCase();
5656
if ("linux-x86_64".equals(Platforms.PLATFORM_NAME)) {
@@ -124,7 +124,7 @@ private String noModelIdVariantJsonEntity() {
124124
private String platformAgnosticModelVariantJsonEntity() {
125125
return """
126126
{
127-
"service": "text_embedding",
127+
"service": "elasticsearch",
128128
"service_settings": {
129129
"num_allocations": 1,
130130
"num_threads": 1,
@@ -137,7 +137,7 @@ private String platformAgnosticModelVariantJsonEntity() {
137137
private String platformSpecificModelVariantJsonEntity() {
138138
return """
139139
{
140-
"service": "text_embedding",
140+
"service": "elasticsearch",
141141
"service_settings": {
142142
"num_allocations": 1,
143143
"num_threads": 1,
@@ -150,7 +150,7 @@ private String platformSpecificModelVariantJsonEntity() {
150150
private String fakeModelVariantJsonEntity() {
151151
return """
152152
{
153-
"service": "text_embedding",
153+
"service": "elasticsearch",
154154
"service_settings": {
155155
"num_allocations": 1,
156156
"num_threads": 1,

x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/MachineLearningPackageLoader.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,17 @@
1515
import org.elasticsearch.common.Strings;
1616
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1717
import org.elasticsearch.common.settings.Setting;
18+
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.common.util.concurrent.EsExecutors;
1820
import org.elasticsearch.plugins.ActionPlugin;
1921
import org.elasticsearch.plugins.Plugin;
2022
import org.elasticsearch.tasks.Task;
23+
import org.elasticsearch.threadpool.ExecutorBuilder;
24+
import org.elasticsearch.threadpool.FixedExecutorBuilder;
2125
import org.elasticsearch.xpack.core.ml.packageloader.action.GetTrainedModelPackageConfigAction;
2226
import org.elasticsearch.xpack.core.ml.packageloader.action.LoadTrainedModelPackageAction;
2327
import org.elasticsearch.xpack.ml.packageloader.action.ModelDownloadTask;
28+
import org.elasticsearch.xpack.ml.packageloader.action.ModelImporter;
2429
import org.elasticsearch.xpack.ml.packageloader.action.TransportGetTrainedModelPackageConfigAction;
2530
import org.elasticsearch.xpack.ml.packageloader.action.TransportLoadTrainedModelPackage;
2631

@@ -44,16 +49,15 @@ public class MachineLearningPackageLoader extends Plugin implements ActionPlugin
4449
Setting.Property.Dynamic
4550
);
4651

47-
// re-using thread pool setup by the ml plugin
48-
public static final String UTILITY_THREAD_POOL_NAME = "ml_utility";
49-
5052
// This link will be invalid for serverless, but serverless will never be
5153
// air-gapped, so this message should never be needed.
5254
private static final String MODEL_REPOSITORY_DOCUMENTATION_LINK = format(
5355
"https://www.elastic.co/guide/en/machine-learning/%s/ml-nlp-elser.html#air-gapped-install",
5456
Build.current().version().replaceFirst("^(\\d+\\.\\d+).*", "$1")
5557
);
5658

59+
public static final String MODEL_DOWNLOAD_THREADPOOL_NAME = "model_download";
60+
5761
public MachineLearningPackageLoader() {}
5862

5963
@Override
@@ -81,6 +85,24 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
8185
);
8286
}
8387

88+
@Override
89+
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
90+
return List.of(modelDownloadExecutor(settings));
91+
}
92+
93+
public static FixedExecutorBuilder modelDownloadExecutor(Settings settings) {
94+
// Threadpool with a fixed number of threads for
95+
// downloading the model definition files
96+
return new FixedExecutorBuilder(
97+
settings,
98+
MODEL_DOWNLOAD_THREADPOOL_NAME,
99+
ModelImporter.NUMBER_OF_STREAMS,
100+
-1, // unbounded queue size
101+
"xpack.ml.model_download_thread_pool",
102+
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
103+
);
104+
}
105+
84106
@Override
85107
public List<BootstrapCheck> getBootstrapChecks() {
86108
return List.of(new BootstrapCheck() {

0 commit comments

Comments
 (0)