Skip to content

Commit f48a1c6

Browse files
authored
[8.15] [ML] Downloaded and write model parts using multiple streams (elastic#112869)
Manual backport of - [ML] Downloaded and write model parts using multiple streams (elastic#111684)
1 parent 45be659 commit f48a1c6

File tree

13 files changed

+902
-178
lines changed

13 files changed

+902
-178
lines changed

docs/changelog/111684.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 111684
2+
summary: Write downloaded model parts async
3+
area: Machine Learning
4+
type: enhancement
5+
issues: []

docs/changelog/112869.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 112869
2+
summary: "[8.15] [ML] Downloaded and write model parts using multiple streams"
3+
area: Machine Learning
4+
type: enhancement
5+
issues: []

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/common/notifications/Level.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,23 @@
99
import java.util.Locale;
1010

1111
public enum Level {
12-
INFO,
13-
WARNING,
14-
ERROR;
12+
INFO {
13+
public org.apache.logging.log4j.Level log4jLevel() {
14+
return org.apache.logging.log4j.Level.INFO;
15+
}
16+
},
17+
WARNING {
18+
public org.apache.logging.log4j.Level log4jLevel() {
19+
return org.apache.logging.log4j.Level.WARN;
20+
}
21+
},
22+
ERROR {
23+
public org.apache.logging.log4j.Level log4jLevel() {
24+
return org.apache.logging.log4j.Level.ERROR;
25+
}
26+
};
27+
28+
public abstract org.apache.logging.log4j.Level log4jLevel();
1529

1630
/**
1731
* Case-insensitive from string method.

x-pack/plugin/ml-package-loader/src/main/java/org/elasticsearch/xpack/ml/packageloader/MachineLearningPackageLoader.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,17 @@
1515
import org.elasticsearch.common.Strings;
1616
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1717
import org.elasticsearch.common.settings.Setting;
18+
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.common.util.concurrent.EsExecutors;
1820
import org.elasticsearch.plugins.ActionPlugin;
1921
import org.elasticsearch.plugins.Plugin;
2022
import org.elasticsearch.tasks.Task;
23+
import org.elasticsearch.threadpool.ExecutorBuilder;
24+
import org.elasticsearch.threadpool.FixedExecutorBuilder;
2125
import org.elasticsearch.xpack.core.ml.packageloader.action.GetTrainedModelPackageConfigAction;
2226
import org.elasticsearch.xpack.core.ml.packageloader.action.LoadTrainedModelPackageAction;
2327
import org.elasticsearch.xpack.ml.packageloader.action.ModelDownloadTask;
28+
import org.elasticsearch.xpack.ml.packageloader.action.ModelImporter;
2429
import org.elasticsearch.xpack.ml.packageloader.action.TransportGetTrainedModelPackageConfigAction;
2530
import org.elasticsearch.xpack.ml.packageloader.action.TransportLoadTrainedModelPackage;
2631

@@ -44,16 +49,15 @@ public class MachineLearningPackageLoader extends Plugin implements ActionPlugin
4449
Setting.Property.Dynamic
4550
);
4651

47-
// re-using thread pool setup by the ml plugin
48-
public static final String UTILITY_THREAD_POOL_NAME = "ml_utility";
49-
5052
// This link will be invalid for serverless, but serverless will never be
5153
// air-gapped, so this message should never be needed.
5254
private static final String MODEL_REPOSITORY_DOCUMENTATION_LINK = format(
5355
"https://www.elastic.co/guide/en/machine-learning/%s/ml-nlp-elser.html#air-gapped-install",
5456
Build.current().version().replaceFirst("^(\\d+\\.\\d+).*", "$1")
5557
);
5658

59+
public static final String MODEL_DOWNLOAD_THREADPOOL_NAME = "model_download";
60+
5761
public MachineLearningPackageLoader() {}
5862

5963
@Override
@@ -81,6 +85,24 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
8185
);
8286
}
8387

88+
@Override
89+
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
90+
return List.of(modelDownloadExecutor(settings));
91+
}
92+
93+
public static FixedExecutorBuilder modelDownloadExecutor(Settings settings) {
94+
// Threadpool with a fixed number of threads for
95+
// downloading the model definition files
96+
return new FixedExecutorBuilder(
97+
settings,
98+
MODEL_DOWNLOAD_THREADPOOL_NAME,
99+
ModelImporter.NUMBER_OF_STREAMS,
100+
-1, // unbounded queue size
101+
"xpack.ml.model_download_thread_pool",
102+
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
103+
);
104+
}
105+
84106
@Override
85107
public List<BootstrapCheck> getBootstrapChecks() {
86108
return List.of(new BootstrapCheck() {

0 commit comments

Comments
 (0)