Skip to content

Commit 74a4484

Browse files
authored
Support mTLS in Elastic Inference Service plugin (#116423)
* Introduce new SSL settings under `xpack.inference.elastic.http.ssl`. * Support mTLS connection between Elasticsearch and Elastic Inference Service.
1 parent e7a4436 commit 74a4484

File tree

22 files changed

+314
-69
lines changed

22 files changed

+314
-69
lines changed

docs/changelog/116423.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 116423
2+
summary: Support mTLS for the Elastic Inference Service integration inside the inference API
3+
area: Machine Learning
4+
type: feature
5+
issues: []

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ssl/SSLService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,8 @@ static Map<String, Settings> getSSLSettingsMap(Settings settings) {
596596
sslSettingsMap.put(WatcherField.EMAIL_NOTIFICATION_SSL_PREFIX, settings.getByPrefix(WatcherField.EMAIL_NOTIFICATION_SSL_PREFIX));
597597
sslSettingsMap.put(XPackSettings.TRANSPORT_SSL_PREFIX, settings.getByPrefix(XPackSettings.TRANSPORT_SSL_PREFIX));
598598
sslSettingsMap.putAll(getTransportProfileSSLSettings(settings));
599+
// Mount Elastic Inference Service (part of the Inference plugin) configuration
600+
sslSettingsMap.put("xpack.inference.elastic.http.ssl", settings.getByPrefix("xpack.inference.elastic.http.ssl."));
599601
// Only build remote cluster server SSL if the port is enabled
600602
if (REMOTE_CLUSTER_SERVER_ENABLED.get(settings)) {
601603
sslSettingsMap.put(XPackSettings.REMOTE_CLUSTER_SERVER_SSL_PREFIX, getRemoteClusterServerSslSettings(settings));

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,7 @@ public Map<String, IndexStorePlugin.SnapshotCommitSupplier> getSnapshotCommitSup
623623
}
624624

625625
@SuppressWarnings("unchecked")
626-
private <T> List<T> filterPlugins(Class<T> type) {
626+
protected <T> List<T> filterPlugins(Class<T> type) {
627627
return plugins.stream().filter(x -> type.isAssignableFrom(x.getClass())).map(p -> ((T) p)).collect(Collectors.toList());
628628
}
629629

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ssl/SSLServiceTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,8 @@ public void testGetConfigurationByContextName() throws Exception {
614614
"xpack.security.authc.realms.ldap.realm1.ssl",
615615
"xpack.security.authc.realms.saml.realm2.ssl",
616616
"xpack.monitoring.exporters.mon1.ssl",
617-
"xpack.monitoring.exporters.mon2.ssl" };
617+
"xpack.monitoring.exporters.mon2.ssl",
618+
"xpack.inference.elastic.http.ssl" };
618619

619620
assumeTrue("Not enough cipher suites are available to support this test", getCipherSuites.length >= contextNames.length);
620621

x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.plugins.Plugin;
2323
import org.elasticsearch.search.builder.SearchSourceBuilder;
2424
import org.elasticsearch.test.ESIntegTestCase;
25+
import org.elasticsearch.xpack.inference.LocalStateInferencePlugin;
2526
import org.elasticsearch.xpack.inference.Utils;
2627
import org.elasticsearch.xpack.inference.mock.TestDenseInferenceServiceExtension;
2728
import org.elasticsearch.xpack.inference.mock.TestSparseInferenceServiceExtension;
@@ -58,7 +59,7 @@ public void setup() throws Exception {
5859

5960
@Override
6061
protected Collection<Class<? extends Plugin>> nodePlugins() {
61-
return Arrays.asList(Utils.TestInferencePlugin.class);
62+
return Arrays.asList(LocalStateInferencePlugin.class);
6263
}
6364

6465
public void testBulkOperations() throws Exception {

x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/integration/ModelRegistryIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import org.elasticsearch.threadpool.ThreadPool;
3232
import org.elasticsearch.xcontent.ToXContentObject;
3333
import org.elasticsearch.xcontent.XContentBuilder;
34-
import org.elasticsearch.xpack.inference.InferencePlugin;
34+
import org.elasticsearch.xpack.inference.LocalStateInferencePlugin;
3535
import org.elasticsearch.xpack.inference.chunking.ChunkingSettingsTests;
3636
import org.elasticsearch.xpack.inference.registry.ModelRegistry;
3737
import org.elasticsearch.xpack.inference.services.elasticsearch.ElasticsearchInternalModel;
@@ -76,7 +76,7 @@ public void createComponents() {
7676

7777
@Override
7878
protected Collection<Class<? extends Plugin>> getPlugins() {
79-
return pluginList(ReindexPlugin.class, InferencePlugin.class);
79+
return pluginList(ReindexPlugin.class, LocalStateInferencePlugin.class);
8080
}
8181

8282
public void testStoreModel() throws Exception {

x-pack/plugin/inference/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
requires software.amazon.awssdk.retries.api;
3535
requires org.reactivestreams;
3636
requires org.elasticsearch.logging;
37+
requires org.elasticsearch.sslconfig;
3738

3839
exports org.elasticsearch.xpack.inference.action;
3940
exports org.elasticsearch.xpack.inference.registry;

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java

Lines changed: 72 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.indices.SystemIndexDescriptor;
2929
import org.elasticsearch.inference.InferenceServiceExtension;
3030
import org.elasticsearch.inference.InferenceServiceRegistry;
31+
import org.elasticsearch.license.XPackLicenseState;
3132
import org.elasticsearch.node.PluginComponentBinding;
3233
import org.elasticsearch.plugins.ActionPlugin;
3334
import org.elasticsearch.plugins.ExtensiblePlugin;
@@ -44,6 +45,7 @@
4445
import org.elasticsearch.threadpool.ScalingExecutorBuilder;
4546
import org.elasticsearch.xcontent.ParseField;
4647
import org.elasticsearch.xpack.core.ClientHelper;
48+
import org.elasticsearch.xpack.core.XPackPlugin;
4749
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
4850
import org.elasticsearch.xpack.core.inference.action.DeleteInferenceEndpointAction;
4951
import org.elasticsearch.xpack.core.inference.action.GetInferenceDiagnosticsAction;
@@ -53,6 +55,7 @@
5355
import org.elasticsearch.xpack.core.inference.action.PutInferenceModelAction;
5456
import org.elasticsearch.xpack.core.inference.action.UnifiedCompletionAction;
5557
import org.elasticsearch.xpack.core.inference.action.UpdateInferenceModelAction;
58+
import org.elasticsearch.xpack.core.ssl.SSLService;
5659
import org.elasticsearch.xpack.inference.action.TransportDeleteInferenceEndpointAction;
5760
import org.elasticsearch.xpack.inference.action.TransportGetInferenceDiagnosticsAction;
5861
import org.elasticsearch.xpack.inference.action.TransportGetInferenceModelAction;
@@ -116,7 +119,6 @@
116119
import java.util.Map;
117120
import java.util.function.Predicate;
118121
import java.util.function.Supplier;
119-
import java.util.stream.Collectors;
120122
import java.util.stream.Stream;
121123

122124
import static java.util.Collections.singletonList;
@@ -150,6 +152,7 @@ public class InferencePlugin extends Plugin implements ActionPlugin, ExtensibleP
150152
private final Settings settings;
151153
private final SetOnce<HttpRequestSender.Factory> httpFactory = new SetOnce<>();
152154
private final SetOnce<AmazonBedrockRequestSender.Factory> amazonBedrockFactory = new SetOnce<>();
155+
private final SetOnce<HttpRequestSender.Factory> elasicInferenceServiceFactory = new SetOnce<>();
153156
private final SetOnce<ServiceComponents> serviceComponents = new SetOnce<>();
154157
private final SetOnce<ElasticInferenceServiceComponents> elasticInferenceServiceComponents = new SetOnce<>();
155158
private final SetOnce<InferenceServiceRegistry> inferenceServiceRegistry = new SetOnce<>();
@@ -232,31 +235,31 @@ public Collection<?> createComponents(PluginServices services) {
232235
var inferenceServices = new ArrayList<>(inferenceServiceExtensions);
233236
inferenceServices.add(this::getInferenceServiceFactories);
234237

235-
// Set elasticInferenceUrl based on feature flags to support transitioning to the new Elastic Inference Service URL without exposing
236-
// internal names like "eis" or "gateway".
237-
ElasticInferenceServiceSettings inferenceServiceSettings = new ElasticInferenceServiceSettings(settings);
238-
239-
String elasticInferenceUrl = null;
238+
if (isElasticInferenceServiceEnabled()) {
239+
// Create a separate instance of HTTPClientManager with its own SSL configuration (`xpack.inference.elastic.http.ssl.*`).
240+
var elasticInferenceServiceHttpClientManager = HttpClientManager.create(
241+
settings,
242+
services.threadPool(),
243+
services.clusterService(),
244+
throttlerManager,
245+
getSslService()
246+
);
240247

241-
if (ELASTIC_INFERENCE_SERVICE_FEATURE_FLAG.isEnabled()) {
242-
elasticInferenceUrl = inferenceServiceSettings.getElasticInferenceServiceUrl();
243-
} else if (DEPRECATED_ELASTIC_INFERENCE_SERVICE_FEATURE_FLAG.isEnabled()) {
244-
log.warn(
245-
"Deprecated flag {} detected for enabling {}. Please use {}.",
246-
ELASTIC_INFERENCE_SERVICE_IDENTIFIER,
247-
DEPRECATED_ELASTIC_INFERENCE_SERVICE_FEATURE_FLAG,
248-
ELASTIC_INFERENCE_SERVICE_FEATURE_FLAG
248+
var elasticInferenceServiceRequestSenderFactory = new HttpRequestSender.Factory(
249+
serviceComponents.get(),
250+
elasticInferenceServiceHttpClientManager,
251+
services.clusterService()
249252
);
250-
elasticInferenceUrl = inferenceServiceSettings.getEisGatewayUrl();
251-
}
253+
elasicInferenceServiceFactory.set(elasticInferenceServiceRequestSenderFactory);
252254

253-
if (elasticInferenceUrl != null) {
255+
ElasticInferenceServiceSettings inferenceServiceSettings = new ElasticInferenceServiceSettings(settings);
256+
String elasticInferenceUrl = this.getElasticInferenceServiceUrl(inferenceServiceSettings);
254257
elasticInferenceServiceComponents.set(new ElasticInferenceServiceComponents(elasticInferenceUrl));
255258

256259
inferenceServices.add(
257260
() -> List.of(
258261
context -> new ElasticInferenceService(
259-
httpFactory.get(),
262+
elasicInferenceServiceFactory.get(),
260263
serviceComponents.get(),
261264
elasticInferenceServiceComponents.get()
262265
)
@@ -379,16 +382,21 @@ public static ExecutorBuilder<?> inferenceUtilityExecutor(Settings settings) {
379382

380383
@Override
381384
public List<Setting<?>> getSettings() {
382-
return Stream.of(
383-
HttpSettings.getSettingsDefinitions(),
384-
HttpClientManager.getSettingsDefinitions(),
385-
ThrottlerManager.getSettingsDefinitions(),
386-
RetrySettings.getSettingsDefinitions(),
387-
ElasticInferenceServiceSettings.getSettingsDefinitions(),
388-
Truncator.getSettingsDefinitions(),
389-
RequestExecutorServiceSettings.getSettingsDefinitions(),
390-
List.of(SKIP_VALIDATE_AND_START)
391-
).flatMap(Collection::stream).collect(Collectors.toList());
385+
ArrayList<Setting<?>> settings = new ArrayList<>();
386+
settings.addAll(HttpSettings.getSettingsDefinitions());
387+
settings.addAll(HttpClientManager.getSettingsDefinitions());
388+
settings.addAll(ThrottlerManager.getSettingsDefinitions());
389+
settings.addAll(RetrySettings.getSettingsDefinitions());
390+
settings.addAll(Truncator.getSettingsDefinitions());
391+
settings.addAll(RequestExecutorServiceSettings.getSettingsDefinitions());
392+
settings.add(SKIP_VALIDATE_AND_START);
393+
394+
// Register Elastic Inference Service settings definitions if the corresponding feature flag is enabled.
395+
if (isElasticInferenceServiceEnabled()) {
396+
settings.addAll(ElasticInferenceServiceSettings.getSettingsDefinitions());
397+
}
398+
399+
return settings;
392400
}
393401

394402
@Override
@@ -431,7 +439,10 @@ public List<QuerySpec<?>> getQueries() {
431439
@Override
432440
public List<RetrieverSpec<?>> getRetrievers() {
433441
return List.of(
434-
new RetrieverSpec<>(new ParseField(TextSimilarityRankBuilder.NAME), TextSimilarityRankRetrieverBuilder::fromXContent),
442+
new RetrieverSpec<>(
443+
new ParseField(TextSimilarityRankBuilder.NAME),
444+
(parser, context) -> TextSimilarityRankRetrieverBuilder.fromXContent(parser, context, getLicenseState())
445+
),
435446
new RetrieverSpec<>(new ParseField(RandomRankBuilder.NAME), RandomRankRetrieverBuilder::fromXContent)
436447
);
437448
}
@@ -440,4 +451,36 @@ public List<RetrieverSpec<?>> getRetrievers() {
440451
public Map<String, Highlighter> getHighlighters() {
441452
return Map.of(SemanticTextHighlighter.NAME, new SemanticTextHighlighter());
442453
}
454+
455+
// Get Elastic Inference service URL based on feature flags to support transitioning
456+
// to the new Elastic Inference Service URL.
457+
private String getElasticInferenceServiceUrl(ElasticInferenceServiceSettings settings) {
458+
String elasticInferenceUrl = null;
459+
460+
if (ELASTIC_INFERENCE_SERVICE_FEATURE_FLAG.isEnabled()) {
461+
elasticInferenceUrl = settings.getElasticInferenceServiceUrl();
462+
} else if (DEPRECATED_ELASTIC_INFERENCE_SERVICE_FEATURE_FLAG.isEnabled()) {
463+
log.warn(
464+
"Deprecated flag {} detected for enabling {}. Please use {}.",
465+
ELASTIC_INFERENCE_SERVICE_IDENTIFIER,
466+
DEPRECATED_ELASTIC_INFERENCE_SERVICE_FEATURE_FLAG,
467+
ELASTIC_INFERENCE_SERVICE_FEATURE_FLAG
468+
);
469+
elasticInferenceUrl = settings.getEisGatewayUrl();
470+
}
471+
472+
return elasticInferenceUrl;
473+
}
474+
475+
protected Boolean isElasticInferenceServiceEnabled() {
476+
return (ELASTIC_INFERENCE_SERVICE_FEATURE_FLAG.isEnabled() || DEPRECATED_ELASTIC_INFERENCE_SERVICE_FEATURE_FLAG.isEnabled());
477+
}
478+
479+
protected SSLService getSslService() {
480+
return XPackPlugin.getSharedSslService();
481+
}
482+
483+
protected XPackLicenseState getLicenseState() {
484+
return XPackPlugin.getSharedLicenseState();
485+
}
443486
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/HttpClientManager.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,14 @@
77

88
package org.elasticsearch.xpack.inference.external.http;
99

10+
import org.apache.http.config.Registry;
11+
import org.apache.http.config.RegistryBuilder;
1012
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
1113
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
1214
import org.apache.http.impl.nio.reactor.IOReactorConfig;
15+
import org.apache.http.nio.conn.NoopIOSessionStrategy;
16+
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
17+
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
1318
import org.apache.http.nio.reactor.ConnectingIOReactor;
1419
import org.apache.http.nio.reactor.IOReactorException;
1520
import org.apache.http.pool.PoolStats;
@@ -21,18 +26,21 @@
2126
import org.elasticsearch.common.settings.Settings;
2227
import org.elasticsearch.core.TimeValue;
2328
import org.elasticsearch.threadpool.ThreadPool;
29+
import org.elasticsearch.xpack.core.ssl.SSLService;
2430
import org.elasticsearch.xpack.inference.logging.ThrottlerManager;
2531

2632
import java.io.Closeable;
2733
import java.io.IOException;
2834
import java.util.List;
2935

3036
import static org.elasticsearch.core.Strings.format;
37+
import static org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceServiceSettings.ELASTIC_INFERENCE_SERVICE_SSL_CONFIGURATION_PREFIX;
3138

3239
public class HttpClientManager implements Closeable {
3340
private static final Logger logger = LogManager.getLogger(HttpClientManager.class);
3441
/**
3542
* The maximum number of total connections the connection pool can lease to all routes.
43+
* The configuration applies to each instance of HTTPClientManager (max_total_connections=10 and instances=5 leads to 50 connections).
3644
* From googling around the connection pools maxTotal value should be close to the number of available threads.
3745
*
3846
* https://stackoverflow.com/questions/30989637/how-to-decide-optimal-settings-for-setmaxtotal-and-setdefaultmaxperroute
@@ -47,6 +55,7 @@ public class HttpClientManager implements Closeable {
4755

4856
/**
4957
* The max number of connections a single route can lease.
58+
* This configuration applies to each instance of HttpClientManager.
5059
*/
5160
public static final Setting<Integer> MAX_ROUTE_CONNECTIONS = Setting.intSetting(
5261
"xpack.inference.http.max_route_connections",
@@ -98,6 +107,22 @@ public static HttpClientManager create(
98107
return new HttpClientManager(settings, connectionManager, threadPool, clusterService, throttlerManager);
99108
}
100109

110+
public static HttpClientManager create(
111+
Settings settings,
112+
ThreadPool threadPool,
113+
ClusterService clusterService,
114+
ThrottlerManager throttlerManager,
115+
SSLService sslService
116+
) {
117+
// Set the sslStrategy to ensure an encrypted connection, as Elastic Inference Service requires it.
118+
SSLIOSessionStrategy sslioSessionStrategy = sslService.sslIOSessionStrategy(
119+
sslService.getSSLConfiguration(ELASTIC_INFERENCE_SERVICE_SSL_CONFIGURATION_PREFIX)
120+
);
121+
122+
PoolingNHttpClientConnectionManager connectionManager = createConnectionManager(sslioSessionStrategy);
123+
return new HttpClientManager(settings, connectionManager, threadPool, clusterService, throttlerManager);
124+
}
125+
101126
// Default for testing
102127
HttpClientManager(
103128
Settings settings,
@@ -121,6 +146,25 @@ public static HttpClientManager create(
121146
this.addSettingsUpdateConsumers(clusterService);
122147
}
123148

149+
private static PoolingNHttpClientConnectionManager createConnectionManager(SSLIOSessionStrategy sslStrategy) {
150+
ConnectingIOReactor ioReactor;
151+
try {
152+
var configBuilder = IOReactorConfig.custom().setSoKeepAlive(true);
153+
ioReactor = new DefaultConnectingIOReactor(configBuilder.build());
154+
} catch (IOReactorException e) {
155+
var message = "Failed to initialize HTTP client manager with SSL.";
156+
logger.error(message, e);
157+
throw new ElasticsearchException(message, e);
158+
}
159+
160+
Registry<SchemeIOSessionStrategy> registry = RegistryBuilder.<SchemeIOSessionStrategy>create()
161+
.register("http", NoopIOSessionStrategy.INSTANCE)
162+
.register("https", sslStrategy)
163+
.build();
164+
165+
return new PoolingNHttpClientConnectionManager(ioReactor, registry);
166+
}
167+
124168
private static PoolingNHttpClientConnectionManager createConnectionManager() {
125169
ConnectingIOReactor ioReactor;
126170
try {

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rank/textsimilarity/TextSimilarityRankRetrieverBuilder.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.features.NodeFeature;
1313
import org.elasticsearch.index.query.QueryBuilder;
1414
import org.elasticsearch.license.LicenseUtils;
15+
import org.elasticsearch.license.XPackLicenseState;
1516
import org.elasticsearch.search.builder.SearchSourceBuilder;
1617
import org.elasticsearch.search.rank.RankDoc;
1718
import org.elasticsearch.search.retriever.CompoundRetrieverBuilder;
@@ -21,7 +22,6 @@
2122
import org.elasticsearch.xcontent.ParseField;
2223
import org.elasticsearch.xcontent.XContentBuilder;
2324
import org.elasticsearch.xcontent.XContentParser;
24-
import org.elasticsearch.xpack.core.XPackPlugin;
2525

2626
import java.io.IOException;
2727
import java.util.List;
@@ -73,8 +73,11 @@ public class TextSimilarityRankRetrieverBuilder extends CompoundRetrieverBuilder
7373
RetrieverBuilder.declareBaseParserFields(TextSimilarityRankBuilder.NAME, PARSER);
7474
}
7575

76-
public static TextSimilarityRankRetrieverBuilder fromXContent(XContentParser parser, RetrieverParserContext context)
77-
throws IOException {
76+
public static TextSimilarityRankRetrieverBuilder fromXContent(
77+
XContentParser parser,
78+
RetrieverParserContext context,
79+
XPackLicenseState licenceState
80+
) throws IOException {
7881
if (context.clusterSupportsFeature(TEXT_SIMILARITY_RERANKER_RETRIEVER_SUPPORTED) == false) {
7982
throw new ParsingException(parser.getTokenLocation(), "unknown retriever [" + TextSimilarityRankBuilder.NAME + "]");
8083
}
@@ -83,7 +86,7 @@ public static TextSimilarityRankRetrieverBuilder fromXContent(XContentParser par
8386
"[text_similarity_reranker] retriever composition feature is not supported by all nodes in the cluster"
8487
);
8588
}
86-
if (TextSimilarityRankBuilder.TEXT_SIMILARITY_RERANKER_FEATURE.check(XPackPlugin.getSharedLicenseState()) == false) {
89+
if (TextSimilarityRankBuilder.TEXT_SIMILARITY_RERANKER_FEATURE.check(licenceState) == false) {
8790
throw LicenseUtils.newComplianceException(TextSimilarityRankBuilder.NAME);
8891
}
8992
return PARSER.apply(parser, context);

0 commit comments

Comments
 (0)