Skip to content

Commit 639d702

Browse files
committed
[ML] Create endpoint cache feature
We want to gate the inference endpoint cache behind a NodeFeature so that it will only enable if every node in the cluster can support it. This closes the risk that a master node can be an earlier version of the cache, potentially handling the ClearCache action without having the code for it. Resolve #134809
1 parent 39a53dc commit 639d702

File tree

4 files changed

+29
-23
lines changed

4 files changed

+29
-23
lines changed

qa/lucene-index-compatibility/src/javaRestTest/java/org/elasticsearch/lucene/AbstractIndexCompatibilityTestCase.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@
2626
import org.elasticsearch.index.mapper.MapperService;
2727
import org.elasticsearch.test.XContentTestUtils;
2828
import org.elasticsearch.test.cluster.ElasticsearchCluster;
29-
import org.elasticsearch.test.cluster.local.DefaultSettingsProvider;
3029
import org.elasticsearch.test.cluster.local.LocalClusterConfigProvider;
31-
import org.elasticsearch.test.cluster.local.LocalClusterSpec;
3230
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
3331
import org.elasticsearch.test.cluster.util.Version;
3432
import org.elasticsearch.test.rest.ESRestTestCase;
@@ -80,16 +78,6 @@ public abstract class AbstractIndexCompatibilityTestCase extends ESRestTestCase
8078
.setting("xpack.security.enabled", "false")
8179
.setting("xpack.ml.enabled", "false")
8280
.setting("path.repo", () -> REPOSITORY_PATH.getRoot().getPath())
83-
.settings(new DefaultSettingsProvider() {
84-
@Override
85-
public Map<String, String> get(LocalClusterSpec.LocalNodeSpec nodeSpec) {
86-
var settings = super.get(nodeSpec);
87-
if (nodeSpec.getVersion().onOrAfter(Version.fromString("9.2.0"))) {
88-
settings.put("xpack.inference.endpoint.cache.enabled", "false");
89-
}
90-
return settings;
91-
}
92-
})
9381
.apply(() -> clusterConfig)
9482
.build();
9583

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@ public class InferenceFeatures implements FeatureSpecification {
5050
public static final NodeFeature SEMANTIC_TEXT_HIGHLIGHTING_FLAT = new NodeFeature("semantic_text.highlighter.flat_index_options");
5151
private static final NodeFeature SEMANTIC_TEXT_FIELDS_CHUNKS_FORMAT = new NodeFeature("semantic_text.fields_chunks_format");
5252

53+
public static final NodeFeature INFERENCE_ENDPOINT_CACHE = new NodeFeature("inference.endpoint.cache");
54+
55+
@Override
56+
public Set<NodeFeature> getFeatures() {
57+
return Set.of(INFERENCE_ENDPOINT_CACHE);
58+
}
59+
5360
@Override
5461
public Set<NodeFeature> getTestFeatures() {
5562
var testFeatures = new HashSet<>(
@@ -90,6 +97,7 @@ public Set<NodeFeature> getTestFeatures() {
9097
TEXT_SIMILARITY_RERANKER_SNIPPETS
9198
)
9299
);
100+
testFeatures.addAll(getFeatures());
93101
return testFeatures;
94102
}
95103
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,8 @@ public Collection<?> createComponents(PluginServices services) {
391391
settings,
392392
modelRegistry.get(),
393393
serviceRegistry,
394-
services.projectResolver()
394+
services.projectResolver(),
395+
services.featureService()
395396
)
396397
);
397398

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/registry/InferenceEndpointRegistry.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@
1919
import org.elasticsearch.common.settings.Setting;
2020
import org.elasticsearch.common.settings.Settings;
2121
import org.elasticsearch.core.TimeValue;
22+
import org.elasticsearch.features.FeatureService;
2223
import org.elasticsearch.inference.InferenceServiceRegistry;
2324
import org.elasticsearch.inference.Model;
25+
import org.elasticsearch.xpack.inference.InferenceFeatures;
2426

2527
import java.util.Collection;
2628
import java.util.List;
29+
import java.util.function.Supplier;
2730

2831
/**
2932
* A registry that assembles and caches Inference Endpoints, {@link Model}, for reuse.
@@ -64,14 +67,16 @@ public static Collection<? extends Setting<?>> getSettingsDefinitions() {
6467
private final InferenceServiceRegistry serviceRegistry;
6568
private final ProjectResolver projectResolver;
6669
private final Cache<InferenceIdAndProject, Model> cache;
67-
private volatile boolean cacheEnabled;
70+
private final Supplier<Boolean> cacheEnabledViaFeature;
71+
private volatile boolean cacheEnabledViaSetting;
6872

6973
public InferenceEndpointRegistry(
7074
ClusterService clusterService,
7175
Settings settings,
7276
ModelRegistry modelRegistry,
7377
InferenceServiceRegistry serviceRegistry,
74-
ProjectResolver projectResolver
78+
ProjectResolver projectResolver,
79+
FeatureService featureService
7580
) {
7681
this.modelRegistry = modelRegistry;
7782
this.serviceRegistry = serviceRegistry;
@@ -80,15 +85,19 @@ public InferenceEndpointRegistry(
8085
.setMaximumWeight(INFERENCE_ENDPOINT_CACHE_WEIGHT.get(settings))
8186
.setExpireAfterWrite(INFERENCE_ENDPOINT_CACHE_EXPIRY.get(settings))
8287
.build();
83-
this.cacheEnabled = INFERENCE_ENDPOINT_CACHE_ENABLED.get(settings);
88+
this.cacheEnabledViaFeature = () -> {
89+
var state = clusterService.state();
90+
return state.clusterRecovered() && featureService.clusterHasFeature(state, InferenceFeatures.INFERENCE_ENDPOINT_CACHE);
91+
};
92+
this.cacheEnabledViaSetting = INFERENCE_ENDPOINT_CACHE_ENABLED.get(settings);
8493

8594
clusterService.getClusterSettings()
86-
.addSettingsUpdateConsumer(INFERENCE_ENDPOINT_CACHE_ENABLED, enabled -> this.cacheEnabled = enabled);
95+
.addSettingsUpdateConsumer(INFERENCE_ENDPOINT_CACHE_ENABLED, enabled -> this.cacheEnabledViaSetting = enabled);
8796
}
8897

8998
public void getEndpoint(String inferenceEntityId, ActionListener<Model> listener) {
9099
var key = new InferenceIdAndProject(inferenceEntityId, projectResolver.getProjectId());
91-
var cachedModel = cacheEnabled ? cache.get(key) : null;
100+
var cachedModel = cacheEnabled() ? cache.get(key) : null;
92101
if (cachedModel != null) {
93102
log.trace("Retrieved [{}] from cache.", inferenceEntityId);
94103
listener.onResponse(cachedModel);
@@ -98,7 +107,7 @@ public void getEndpoint(String inferenceEntityId, ActionListener<Model> listener
98107
}
99108

100109
void invalidateAll(ProjectId projectId) {
101-
if (cacheEnabled) {
110+
if (cacheEnabled()) {
102111
var cacheKeys = cache.keys().iterator();
103112
while (cacheKeys.hasNext()) {
104113
if (cacheKeys.next().projectId.equals(projectId)) {
@@ -126,23 +135,23 @@ private void loadFromIndex(InferenceIdAndProject idAndProject, ActionListener<Mo
126135
unparsedModel.secrets()
127136
);
128137

129-
if (cacheEnabled) {
138+
if (cacheEnabled()) {
130139
cache.put(idAndProject, model);
131140
}
132141
l.onResponse(model);
133142
}));
134143
}
135144

136145
public Cache.Stats stats() {
137-
return cacheEnabled ? cache.stats() : EMPTY;
146+
return cacheEnabled() ? cache.stats() : EMPTY;
138147
}
139148

140149
public int cacheCount() {
141-
return cacheEnabled ? cache.count() : 0;
150+
return cacheEnabled() ? cache.count() : 0;
142151
}
143152

144153
public boolean cacheEnabled() {
145-
return cacheEnabled;
154+
return cacheEnabledViaSetting && cacheEnabledViaFeature.get();
146155
}
147156

148157
private record InferenceIdAndProject(String inferenceEntityId, ProjectId projectId) {}

0 commit comments

Comments
 (0)