Skip to content

Commit 93137e6

Browse files
jonathan-buttnerelasticsearchmachine
andauthored
[ML] Creating CCM feature flag, index, and storage logic (#137582)
* Starting CCM index * Renaming * Building request * Adding feature flag and tests * [CI] Auto commit changes from spotless * Adding alias * Using search instead of get in case of rollover --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent e3598de commit 93137e6

File tree

9 files changed

+508
-24
lines changed

9 files changed

+508
-24
lines changed

test/test-clusters/src/main/java/org/elasticsearch/test/cluster/FeatureFlag.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ public enum FeatureFlag {
2626
Version.fromString("9.2.0"),
2727
null
2828
),
29-
RANDOM_SAMPLING("es.random_sampling_feature_flag_enabled=true", Version.fromString("9.2.0"), null);
29+
RANDOM_SAMPLING("es.random_sampling_feature_flag_enabled=true", Version.fromString("9.2.0"), null),
30+
INFERENCE_API_CCM("es.inference_api_ccm_feature_flag_enabled=true", Version.fromString("9.3.0"), null);
3031

3132
public final String systemProperty;
3233
public final Version from;
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.inference.integration;
9+
10+
import org.elasticsearch.ElasticsearchException;
11+
import org.elasticsearch.ResourceNotFoundException;
12+
import org.elasticsearch.action.DocWriteResponse;
13+
import org.elasticsearch.action.support.PlainActionFuture;
14+
import org.elasticsearch.action.support.WriteRequest;
15+
import org.elasticsearch.common.settings.SecureString;
16+
import org.elasticsearch.core.TimeValue;
17+
import org.elasticsearch.plugins.Plugin;
18+
import org.elasticsearch.test.ESSingleNodeTestCase;
19+
import org.elasticsearch.xcontent.XContentType;
20+
import org.elasticsearch.xpack.inference.LocalStateInferencePlugin;
21+
import org.elasticsearch.xpack.inference.services.elastic.ccm.CCMIndex;
22+
import org.elasticsearch.xpack.inference.services.elastic.ccm.CCMModel;
23+
import org.elasticsearch.xpack.inference.services.elastic.ccm.CCMStorageService;
24+
import org.junit.Before;
25+
26+
import java.util.Collection;
27+
28+
import static org.hamcrest.Matchers.containsString;
29+
import static org.hamcrest.Matchers.is;
30+
31+
public class CCMStorageServiceIT extends ESSingleNodeTestCase {
32+
private CCMStorageService ccmStorageService;
33+
34+
@Before
35+
public void createComponents() {
36+
ccmStorageService = node().injector().getInstance(CCMStorageService.class);
37+
}
38+
39+
@Override
40+
protected Collection<Class<? extends Plugin>> getPlugins() {
41+
return pluginList(LocalStateInferencePlugin.class);
42+
}
43+
44+
public void testStoreAndGetCCMModel() {
45+
var ccmModel = new CCMModel(new SecureString("secret".toCharArray()));
46+
var storeListener = new PlainActionFuture<Void>();
47+
ccmStorageService.store(ccmModel, storeListener);
48+
49+
assertNull(storeListener.actionGet(TimeValue.THIRTY_SECONDS));
50+
51+
var getListener = new PlainActionFuture<CCMModel>();
52+
ccmStorageService.get(getListener);
53+
54+
assertThat(getListener.actionGet(TimeValue.THIRTY_SECONDS), is(ccmModel));
55+
}
56+
57+
public void testGet_ThrowsResourceNotFoundException_WhenCCMIndexDoesNotExist() {
58+
var getListener = new PlainActionFuture<CCMModel>();
59+
ccmStorageService.get(getListener);
60+
61+
var exception = expectThrows(ResourceNotFoundException.class, () -> getListener.actionGet(TimeValue.THIRTY_SECONDS));
62+
assertThat(exception.getMessage(), is("CCM configuration not found"));
63+
}
64+
65+
public void testGet_ThrowsResourceNotFoundException_WhenCCMConfigurationDocumentDoesNotExist() {
66+
storeCorruptCCMModel("id");
67+
68+
var getListener = new PlainActionFuture<CCMModel>();
69+
ccmStorageService.get(getListener);
70+
71+
var exception = expectThrows(ResourceNotFoundException.class, () -> getListener.actionGet(TimeValue.THIRTY_SECONDS));
72+
assertThat(exception.getMessage(), is("CCM configuration not found"));
73+
}
74+
75+
public void testGetCCMModel_ThrowsException_WhenStoredModelIsCorrupted() {
76+
storeCorruptCCMModel(CCMStorageService.CCM_DOC_ID);
77+
78+
var getListener = new PlainActionFuture<CCMModel>();
79+
ccmStorageService.get(getListener);
80+
81+
var exception = expectThrows(ElasticsearchException.class, () -> getListener.actionGet(TimeValue.THIRTY_SECONDS));
82+
assertThat(exception.getMessage(), containsString("Failed to retrieve CCM configuration"));
83+
assertThat(exception.getCause().getMessage(), containsString("Required [api_key]"));
84+
}
85+
86+
private void storeCorruptCCMModel(String id) {
87+
var corruptedSource = """
88+
{
89+
90+
}
91+
""";
92+
93+
var response = client().prepareIndex()
94+
.setSource(corruptedSource, XContentType.JSON)
95+
.setIndex(CCMIndex.INDEX_NAME)
96+
.setId(id)
97+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
98+
.execute()
99+
.actionGet(TimeValue.THIRTY_SECONDS);
100+
101+
assertThat(response.getResult(), is(DocWriteResponse.Result.CREATED));
102+
}
103+
104+
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@
134134
import org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceServiceComponents;
135135
import org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceServiceSettings;
136136
import org.elasticsearch.xpack.inference.services.elastic.authorization.ElasticInferenceServiceAuthorizationRequestHandler;
137+
import org.elasticsearch.xpack.inference.services.elastic.ccm.CCMFeatureFlag;
138+
import org.elasticsearch.xpack.inference.services.elastic.ccm.CCMIndex;
139+
import org.elasticsearch.xpack.inference.services.elastic.ccm.CCMStorageService;
137140
import org.elasticsearch.xpack.inference.services.elasticsearch.ElasticsearchInternalService;
138141
import org.elasticsearch.xpack.inference.services.googleaistudio.GoogleAiStudioService;
139142
import org.elasticsearch.xpack.inference.services.googlevertexai.GoogleVertexAiService;
@@ -160,6 +163,7 @@
160163
import java.util.Set;
161164
import java.util.function.Predicate;
162165
import java.util.function.Supplier;
166+
import java.util.stream.Stream;
163167

164168
import static java.util.Collections.singletonList;
165169
import static org.elasticsearch.xpack.inference.action.filter.ShardBulkInferenceActionFilter.INDICES_INFERENCE_BATCH_SIZE;
@@ -403,6 +407,10 @@ public Collection<?> createComponents(PluginServices services) {
403407
)
404408
);
405409

410+
if (CCMFeatureFlag.FEATURE_FLAG.isEnabled()) {
411+
components.add(new CCMStorageService(services.client()));
412+
}
413+
406414
return components;
407415
}
408416

@@ -491,6 +499,20 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
491499

492500
@Override
493501
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
502+
List<SystemIndexDescriptor> ccmIndexDescriptor = CCMFeatureFlag.FEATURE_FLAG.isEnabled()
503+
? List.of(
504+
SystemIndexDescriptor.builder()
505+
.setType(SystemIndexDescriptor.Type.INTERNAL_MANAGED)
506+
.setIndexPattern(CCMIndex.INDEX_PATTERN)
507+
.setPrimaryIndex(CCMIndex.INDEX_NAME)
508+
.setDescription("Contains Elastic Inference Service Cloud Connected Mode settings")
509+
.setMappings(CCMIndex.mappings())
510+
.setSettings(CCMIndex.settings())
511+
.setOrigin(ClientHelper.INFERENCE_ORIGIN)
512+
.setNetNew()
513+
.build()
514+
)
515+
: List.of();
494516

495517
var inferenceIndexV1Descriptor = SystemIndexDescriptor.builder()
496518
.setType(SystemIndexDescriptor.Type.INTERNAL_MANAGED)
@@ -503,29 +525,32 @@ public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings sett
503525
.setOrigin(ClientHelper.INFERENCE_ORIGIN)
504526
.build();
505527

506-
return List.of(
507-
SystemIndexDescriptor.builder()
508-
.setType(SystemIndexDescriptor.Type.INTERNAL_MANAGED)
509-
.setIndexPattern(InferenceIndex.INDEX_PATTERN)
510-
.setAliasName(InferenceIndex.INDEX_ALIAS)
511-
.setPrimaryIndex(InferenceIndex.INDEX_NAME)
512-
.setDescription("Contains inference service and model configuration")
513-
.setMappings(InferenceIndex.mappings())
514-
.setSettings(getIndexSettings())
515-
.setOrigin(ClientHelper.INFERENCE_ORIGIN)
516-
.setPriorSystemIndexDescriptors(List.of(inferenceIndexV1Descriptor))
517-
.build(),
518-
SystemIndexDescriptor.builder()
519-
.setType(SystemIndexDescriptor.Type.INTERNAL_MANAGED)
520-
.setIndexPattern(InferenceSecretsIndex.INDEX_PATTERN)
521-
.setPrimaryIndex(InferenceSecretsIndex.INDEX_NAME)
522-
.setDescription("Contains inference service secrets")
523-
.setMappings(InferenceSecretsIndex.mappings())
524-
.setSettings(getSecretsIndexSettings())
525-
.setOrigin(ClientHelper.INFERENCE_ORIGIN)
526-
.setNetNew()
527-
.build()
528-
);
528+
return Stream.of(
529+
List.of(
530+
SystemIndexDescriptor.builder()
531+
.setType(SystemIndexDescriptor.Type.INTERNAL_MANAGED)
532+
.setIndexPattern(InferenceIndex.INDEX_PATTERN)
533+
.setAliasName(InferenceIndex.INDEX_ALIAS)
534+
.setPrimaryIndex(InferenceIndex.INDEX_NAME)
535+
.setDescription("Contains inference service and model configuration")
536+
.setMappings(InferenceIndex.mappings())
537+
.setSettings(getIndexSettings())
538+
.setOrigin(ClientHelper.INFERENCE_ORIGIN)
539+
.setPriorSystemIndexDescriptors(List.of(inferenceIndexV1Descriptor))
540+
.build(),
541+
SystemIndexDescriptor.builder()
542+
.setType(SystemIndexDescriptor.Type.INTERNAL_MANAGED)
543+
.setIndexPattern(InferenceSecretsIndex.INDEX_PATTERN)
544+
.setPrimaryIndex(InferenceSecretsIndex.INDEX_NAME)
545+
.setDescription("Contains inference service secrets")
546+
.setMappings(InferenceSecretsIndex.mappings())
547+
.setSettings(getSecretsIndexSettings())
548+
.setOrigin(ClientHelper.INFERENCE_ORIGIN)
549+
.setNetNew()
550+
.build()
551+
),
552+
ccmIndexDescriptor
553+
).flatMap(List::stream).toList();
529554
}
530555

531556
// Overridable for tests
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.inference.services.elastic.ccm;
9+
10+
import org.elasticsearch.common.util.FeatureFlag;
11+
12+
public class CCMFeatureFlag {
13+
14+
/**
15+
* {@link org.elasticsearch.xpack.inference.services.custom.CustomService} feature flag. When the feature is complete,
16+
* this flag will be removed.
17+
* Enable feature via JVM option: `-Des.inference_api_ccm_feature_flag_enabled=true`.
18+
*/
19+
public static final FeatureFlag FEATURE_FLAG = new FeatureFlag("inference_api_ccm");
20+
21+
private CCMFeatureFlag() {}
22+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.inference.services.elastic.ccm;
9+
10+
import org.elasticsearch.cluster.metadata.IndexMetadata;
11+
import org.elasticsearch.common.settings.Settings;
12+
import org.elasticsearch.indices.SystemIndexDescriptor;
13+
import org.elasticsearch.xcontent.XContentBuilder;
14+
15+
import java.io.IOException;
16+
import java.io.UncheckedIOException;
17+
18+
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
19+
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
20+
21+
public class CCMIndex {
22+
23+
private CCMIndex() {}
24+
25+
public static final String INDEX_NAME = ".ccm-inference";
26+
public static final String INDEX_PATTERN = INDEX_NAME + "*";
27+
28+
// Increment this version number when the mappings change
29+
private static final int INDEX_MAPPING_VERSION = 1;
30+
31+
public static Settings settings() {
32+
return builder().build();
33+
}
34+
35+
// Public to allow tests to create the index with custom settings
36+
public static Settings.Builder builder() {
37+
return Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS, "0-1");
38+
}
39+
40+
public static XContentBuilder mappings() {
41+
try {
42+
return jsonBuilder().startObject()
43+
.startObject(SINGLE_MAPPING_NAME)
44+
.startObject("_meta")
45+
.field(SystemIndexDescriptor.VERSION_META_KEY, INDEX_MAPPING_VERSION)
46+
.endObject()
47+
.field("dynamic", "strict")
48+
.startObject("properties")
49+
.startObject("api_key")
50+
.field("type", "keyword")
51+
.endObject()
52+
.endObject()
53+
.endObject()
54+
.endObject();
55+
} catch (IOException e) {
56+
throw new UncheckedIOException("Failed to build mappings for index " + INDEX_NAME, e);
57+
}
58+
}
59+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.inference.services.elastic.ccm;
9+
10+
import org.elasticsearch.common.bytes.BytesReference;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
13+
import org.elasticsearch.common.io.stream.Writeable;
14+
import org.elasticsearch.common.settings.SecureString;
15+
import org.elasticsearch.common.xcontent.XContentHelper;
16+
import org.elasticsearch.xcontent.ConstructingObjectParser;
17+
import org.elasticsearch.xcontent.ParseField;
18+
import org.elasticsearch.xcontent.ToXContent;
19+
import org.elasticsearch.xcontent.ToXContentObject;
20+
import org.elasticsearch.xcontent.XContentBuilder;
21+
import org.elasticsearch.xcontent.XContentParserConfiguration;
22+
import org.elasticsearch.xcontent.XContentType;
23+
24+
import java.io.IOException;
25+
import java.util.Objects;
26+
27+
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
28+
29+
public record CCMModel(SecureString apiKey) implements Writeable, ToXContentObject {
30+
31+
private static final String API_KEY_FIELD = "api_key";
32+
private static final ConstructingObjectParser<CCMModel, Void> PARSER = new ConstructingObjectParser<>(
33+
CCMModel.class.getSimpleName(),
34+
true,
35+
args -> new CCMModel(new SecureString(((String) args[0]).toCharArray()))
36+
);
37+
38+
static {
39+
PARSER.declareString(constructorArg(), new ParseField(API_KEY_FIELD));
40+
}
41+
42+
public static CCMModel parse(org.elasticsearch.xcontent.XContentParser parser) throws IOException {
43+
return PARSER.parse(parser, null);
44+
}
45+
46+
public CCMModel {
47+
Objects.requireNonNull(apiKey);
48+
}
49+
50+
public CCMModel(StreamInput in) throws IOException {
51+
this(in.readSecureString());
52+
}
53+
54+
@Override
55+
public void writeTo(StreamOutput out) throws IOException {
56+
out.writeSecureString(apiKey);
57+
}
58+
59+
@Override
60+
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
61+
builder.startObject();
62+
builder.field(API_KEY_FIELD, apiKey.toString());
63+
builder.endObject();
64+
return builder;
65+
}
66+
67+
public static CCMModel fromXContentBytes(BytesReference bytes) throws IOException {
68+
try (var parser = XContentHelper.createParserNotCompressed(XContentParserConfiguration.EMPTY, bytes, XContentType.JSON)) {
69+
return parse(parser);
70+
}
71+
}
72+
}

0 commit comments

Comments
 (0)