Skip to content

Commit 03424aa

Browse files
vidokelasticsearchmachine
andauthored
Support mTLS in Elastic Inference Service plugin (#116423) (#119679)
* Support mTLS in Elastic Inference Service plugin (#116423) * Introduce new SSL settings under `xpack.inference.elastic.http.ssl`. * Support mTLS connection between Elasticsearch and Elastic Inference Service. * Update docs/changelog/119679.yaml * Apply new changelog * [CI] Auto commit changes from spotless --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent f7f8ab0 commit 03424aa

File tree

22 files changed

+314
-76
lines changed

22 files changed

+314
-76
lines changed

docs/changelog/119679.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 119679
2+
summary: Support mTLS for the Elastic Inference Service integration inside the inference API
3+
area: Machine Learning
4+
type: feature
5+
issues: []

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ssl/SSLService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,8 @@ static Map<String, Settings> getSSLSettingsMap(Settings settings) {
596596
sslSettingsMap.put(WatcherField.EMAIL_NOTIFICATION_SSL_PREFIX, settings.getByPrefix(WatcherField.EMAIL_NOTIFICATION_SSL_PREFIX));
597597
sslSettingsMap.put(XPackSettings.TRANSPORT_SSL_PREFIX, settings.getByPrefix(XPackSettings.TRANSPORT_SSL_PREFIX));
598598
sslSettingsMap.putAll(getTransportProfileSSLSettings(settings));
599+
// Mount Elastic Inference Service (part of the Inference plugin) configuration
600+
sslSettingsMap.put("xpack.inference.elastic.http.ssl", settings.getByPrefix("xpack.inference.elastic.http.ssl."));
599601
// Only build remote cluster server SSL if the port is enabled
600602
if (REMOTE_CLUSTER_SERVER_ENABLED.get(settings)) {
601603
sslSettingsMap.put(XPackSettings.REMOTE_CLUSTER_SERVER_SSL_PREFIX, getRemoteClusterServerSslSettings(settings));

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/LocalStateCompositeXPackPlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,7 @@ public Map<String, IndexStorePlugin.SnapshotCommitSupplier> getSnapshotCommitSup
623623
}
624624

625625
@SuppressWarnings("unchecked")
626-
private <T> List<T> filterPlugins(Class<T> type) {
626+
protected <T> List<T> filterPlugins(Class<T> type) {
627627
return plugins.stream().filter(x -> type.isAssignableFrom(x.getClass())).map(p -> ((T) p)).collect(Collectors.toList());
628628
}
629629

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ssl/SSLServiceTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,8 @@ public void testGetConfigurationByContextName() throws Exception {
614614
"xpack.security.authc.realms.ldap.realm1.ssl",
615615
"xpack.security.authc.realms.saml.realm2.ssl",
616616
"xpack.monitoring.exporters.mon1.ssl",
617-
"xpack.monitoring.exporters.mon2.ssl" };
617+
"xpack.monitoring.exporters.mon2.ssl",
618+
"xpack.inference.elastic.http.ssl" };
618619

619620
assumeTrue("Not enough cipher suites are available to support this test", getCipherSuites.length >= contextNames.length);
620621

x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/action/filter/ShardBulkInferenceActionFilterIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.elasticsearch.plugins.Plugin;
2828
import org.elasticsearch.search.builder.SearchSourceBuilder;
2929
import org.elasticsearch.test.ESIntegTestCase;
30-
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
30+
import org.elasticsearch.xpack.inference.LocalStateInferencePlugin;
3131
import org.elasticsearch.xpack.inference.Utils;
3232
import org.elasticsearch.xpack.inference.mock.TestDenseInferenceServiceExtension;
3333
import org.elasticsearch.xpack.inference.mock.TestSparseInferenceServiceExtension;
@@ -74,7 +74,7 @@ public void setup() throws Exception {
7474

7575
@Override
7676
protected Collection<Class<? extends Plugin>> nodePlugins() {
77-
return Arrays.asList(Utils.TestInferencePlugin.class, LocalStateCompositeXPackPlugin.class);
77+
return Arrays.asList(LocalStateInferencePlugin.class);
7878
}
7979

8080
@Override

x-pack/plugin/inference/src/internalClusterTest/java/org/elasticsearch/xpack/inference/integration/ModelRegistryIT.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@
3131
import org.elasticsearch.threadpool.ThreadPool;
3232
import org.elasticsearch.xcontent.ToXContentObject;
3333
import org.elasticsearch.xcontent.XContentBuilder;
34-
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
35-
import org.elasticsearch.xpack.inference.InferencePlugin;
34+
import org.elasticsearch.xpack.inference.LocalStateInferencePlugin;
3635
import org.elasticsearch.xpack.inference.chunking.ChunkingSettingsTests;
3736
import org.elasticsearch.xpack.inference.registry.ModelRegistry;
3837
import org.elasticsearch.xpack.inference.services.elasticsearch.ElasticsearchInternalModel;
@@ -77,7 +76,7 @@ public void createComponents() {
7776

7877
@Override
7978
protected Collection<Class<? extends Plugin>> getPlugins() {
80-
return pluginList(ReindexPlugin.class, InferencePlugin.class, LocalStateCompositeXPackPlugin.class);
79+
return pluginList(ReindexPlugin.class, LocalStateInferencePlugin.class);
8180
}
8281

8382
public void testStoreModel() throws Exception {

x-pack/plugin/inference/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
requires software.amazon.awssdk.retries.api;
3535
requires org.reactivestreams;
3636
requires org.elasticsearch.logging;
37+
requires org.elasticsearch.sslconfig;
3738

3839
exports org.elasticsearch.xpack.inference.action;
3940
exports org.elasticsearch.xpack.inference.registry;

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java

Lines changed: 72 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.inference.InferenceServiceRegistry;
3232
import org.elasticsearch.license.License;
3333
import org.elasticsearch.license.LicensedFeature;
34+
import org.elasticsearch.license.XPackLicenseState;
3435
import org.elasticsearch.node.PluginComponentBinding;
3536
import org.elasticsearch.plugins.ActionPlugin;
3637
import org.elasticsearch.plugins.ExtensiblePlugin;
@@ -49,6 +50,7 @@
4950
import org.elasticsearch.threadpool.ThreadPool;
5051
import org.elasticsearch.xcontent.ParseField;
5152
import org.elasticsearch.xpack.core.ClientHelper;
53+
import org.elasticsearch.xpack.core.XPackPlugin;
5254
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
5355
import org.elasticsearch.xpack.core.inference.action.DeleteInferenceEndpointAction;
5456
import org.elasticsearch.xpack.core.inference.action.GetInferenceDiagnosticsAction;
@@ -58,6 +60,7 @@
5860
import org.elasticsearch.xpack.core.inference.action.PutInferenceModelAction;
5961
import org.elasticsearch.xpack.core.inference.action.UnifiedCompletionAction;
6062
import org.elasticsearch.xpack.core.inference.action.UpdateInferenceModelAction;
63+
import org.elasticsearch.xpack.core.ssl.SSLService;
6164
import org.elasticsearch.xpack.inference.action.TransportDeleteInferenceEndpointAction;
6265
import org.elasticsearch.xpack.inference.action.TransportGetInferenceDiagnosticsAction;
6366
import org.elasticsearch.xpack.inference.action.TransportGetInferenceModelAction;
@@ -126,7 +129,6 @@
126129
import java.util.Map;
127130
import java.util.function.Predicate;
128131
import java.util.function.Supplier;
129-
import java.util.stream.Collectors;
130132
import java.util.stream.Stream;
131133

132134
import static java.util.Collections.singletonList;
@@ -166,6 +168,7 @@ public class InferencePlugin extends Plugin implements ActionPlugin, ExtensibleP
166168
private final Settings settings;
167169
private final SetOnce<HttpRequestSender.Factory> httpFactory = new SetOnce<>();
168170
private final SetOnce<AmazonBedrockRequestSender.Factory> amazonBedrockFactory = new SetOnce<>();
171+
private final SetOnce<HttpRequestSender.Factory> elasicInferenceServiceFactory = new SetOnce<>();
169172
private final SetOnce<ServiceComponents> serviceComponents = new SetOnce<>();
170173
// This is mainly so that the rest handlers can access the ThreadPool in a way that avoids potential null pointers from it
171174
// not being initialized yet
@@ -252,31 +255,31 @@ public Collection<?> createComponents(PluginServices services) {
252255
var inferenceServices = new ArrayList<>(inferenceServiceExtensions);
253256
inferenceServices.add(this::getInferenceServiceFactories);
254257

255-
// Set elasticInferenceUrl based on feature flags to support transitioning to the new Elastic Inference Service URL without exposing
256-
// internal names like "eis" or "gateway".
257-
ElasticInferenceServiceSettings inferenceServiceSettings = new ElasticInferenceServiceSettings(settings);
258-
259-
String elasticInferenceUrl = null;
258+
if (isElasticInferenceServiceEnabled()) {
259+
// Create a separate instance of HTTPClientManager with its own SSL configuration (`xpack.inference.elastic.http.ssl.*`).
260+
var elasticInferenceServiceHttpClientManager = HttpClientManager.create(
261+
settings,
262+
services.threadPool(),
263+
services.clusterService(),
264+
throttlerManager,
265+
getSslService()
266+
);
260267

261-
if (ELASTIC_INFERENCE_SERVICE_FEATURE_FLAG.isEnabled()) {
262-
elasticInferenceUrl = inferenceServiceSettings.getElasticInferenceServiceUrl();
263-
} else if (DEPRECATED_ELASTIC_INFERENCE_SERVICE_FEATURE_FLAG.isEnabled()) {
264-
log.warn(
265-
"Deprecated flag {} detected for enabling {}. Please use {}.",
266-
ELASTIC_INFERENCE_SERVICE_IDENTIFIER,
267-
DEPRECATED_ELASTIC_INFERENCE_SERVICE_FEATURE_FLAG,
268-
ELASTIC_INFERENCE_SERVICE_FEATURE_FLAG
268+
var elasticInferenceServiceRequestSenderFactory = new HttpRequestSender.Factory(
269+
serviceComponents.get(),
270+
elasticInferenceServiceHttpClientManager,
271+
services.clusterService()
269272
);
270-
elasticInferenceUrl = inferenceServiceSettings.getEisGatewayUrl();
271-
}
273+
elasicInferenceServiceFactory.set(elasticInferenceServiceRequestSenderFactory);
272274

273-
if (elasticInferenceUrl != null) {
275+
ElasticInferenceServiceSettings inferenceServiceSettings = new ElasticInferenceServiceSettings(settings);
276+
String elasticInferenceUrl = this.getElasticInferenceServiceUrl(inferenceServiceSettings);
274277
elasticInferenceServiceComponents.set(new ElasticInferenceServiceComponents(elasticInferenceUrl));
275278

276279
inferenceServices.add(
277280
() -> List.of(
278281
context -> new ElasticInferenceService(
279-
httpFactory.get(),
282+
elasicInferenceServiceFactory.get(),
280283
serviceComponents.get(),
281284
elasticInferenceServiceComponents.get()
282285
)
@@ -400,16 +403,21 @@ public static ExecutorBuilder<?> inferenceUtilityExecutor(Settings settings) {
400403

401404
@Override
402405
public List<Setting<?>> getSettings() {
403-
return Stream.of(
404-
HttpSettings.getSettingsDefinitions(),
405-
HttpClientManager.getSettingsDefinitions(),
406-
ThrottlerManager.getSettingsDefinitions(),
407-
RetrySettings.getSettingsDefinitions(),
408-
ElasticInferenceServiceSettings.getSettingsDefinitions(),
409-
Truncator.getSettingsDefinitions(),
410-
RequestExecutorServiceSettings.getSettingsDefinitions(),
411-
List.of(SKIP_VALIDATE_AND_START)
412-
).flatMap(Collection::stream).collect(Collectors.toList());
406+
ArrayList<Setting<?>> settings = new ArrayList<>();
407+
settings.addAll(HttpSettings.getSettingsDefinitions());
408+
settings.addAll(HttpClientManager.getSettingsDefinitions());
409+
settings.addAll(ThrottlerManager.getSettingsDefinitions());
410+
settings.addAll(RetrySettings.getSettingsDefinitions());
411+
settings.addAll(Truncator.getSettingsDefinitions());
412+
settings.addAll(RequestExecutorServiceSettings.getSettingsDefinitions());
413+
settings.add(SKIP_VALIDATE_AND_START);
414+
415+
// Register Elastic Inference Service settings definitions if the corresponding feature flag is enabled.
416+
if (isElasticInferenceServiceEnabled()) {
417+
settings.addAll(ElasticInferenceServiceSettings.getSettingsDefinitions());
418+
}
419+
420+
return settings;
413421
}
414422

415423
@Override
@@ -466,7 +474,10 @@ public List<QueryRewriteInterceptor> getQueryRewriteInterceptors() {
466474
@Override
467475
public List<RetrieverSpec<?>> getRetrievers() {
468476
return List.of(
469-
new RetrieverSpec<>(new ParseField(TextSimilarityRankBuilder.NAME), TextSimilarityRankRetrieverBuilder::fromXContent),
477+
new RetrieverSpec<>(
478+
new ParseField(TextSimilarityRankBuilder.NAME),
479+
(parser, context) -> TextSimilarityRankRetrieverBuilder.fromXContent(parser, context, getLicenseState())
480+
),
470481
new RetrieverSpec<>(new ParseField(RandomRankBuilder.NAME), RandomRankRetrieverBuilder::fromXContent)
471482
);
472483
}
@@ -475,4 +486,36 @@ public List<RetrieverSpec<?>> getRetrievers() {
475486
public Map<String, Highlighter> getHighlighters() {
476487
return Map.of(SemanticTextHighlighter.NAME, new SemanticTextHighlighter());
477488
}
489+
490+
// Get Elastic Inference service URL based on feature flags to support transitioning
491+
// to the new Elastic Inference Service URL.
492+
private String getElasticInferenceServiceUrl(ElasticInferenceServiceSettings settings) {
493+
String elasticInferenceUrl = null;
494+
495+
if (ELASTIC_INFERENCE_SERVICE_FEATURE_FLAG.isEnabled()) {
496+
elasticInferenceUrl = settings.getElasticInferenceServiceUrl();
497+
} else if (DEPRECATED_ELASTIC_INFERENCE_SERVICE_FEATURE_FLAG.isEnabled()) {
498+
log.warn(
499+
"Deprecated flag {} detected for enabling {}. Please use {}.",
500+
ELASTIC_INFERENCE_SERVICE_IDENTIFIER,
501+
DEPRECATED_ELASTIC_INFERENCE_SERVICE_FEATURE_FLAG,
502+
ELASTIC_INFERENCE_SERVICE_FEATURE_FLAG
503+
);
504+
elasticInferenceUrl = settings.getEisGatewayUrl();
505+
}
506+
507+
return elasticInferenceUrl;
508+
}
509+
510+
protected Boolean isElasticInferenceServiceEnabled() {
511+
return (ELASTIC_INFERENCE_SERVICE_FEATURE_FLAG.isEnabled() || DEPRECATED_ELASTIC_INFERENCE_SERVICE_FEATURE_FLAG.isEnabled());
512+
}
513+
514+
protected SSLService getSslService() {
515+
return XPackPlugin.getSharedSslService();
516+
}
517+
518+
protected XPackLicenseState getLicenseState() {
519+
return XPackPlugin.getSharedLicenseState();
520+
}
478521
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/HttpClientManager.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,14 @@
77

88
package org.elasticsearch.xpack.inference.external.http;
99

10+
import org.apache.http.config.Registry;
11+
import org.apache.http.config.RegistryBuilder;
1012
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
1113
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
1214
import org.apache.http.impl.nio.reactor.IOReactorConfig;
15+
import org.apache.http.nio.conn.NoopIOSessionStrategy;
16+
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
17+
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
1318
import org.apache.http.nio.reactor.ConnectingIOReactor;
1419
import org.apache.http.nio.reactor.IOReactorException;
1520
import org.apache.http.pool.PoolStats;
@@ -21,18 +26,21 @@
2126
import org.elasticsearch.common.settings.Settings;
2227
import org.elasticsearch.core.TimeValue;
2328
import org.elasticsearch.threadpool.ThreadPool;
29+
import org.elasticsearch.xpack.core.ssl.SSLService;
2430
import org.elasticsearch.xpack.inference.logging.ThrottlerManager;
2531

2632
import java.io.Closeable;
2733
import java.io.IOException;
2834
import java.util.List;
2935

3036
import static org.elasticsearch.core.Strings.format;
37+
import static org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceServiceSettings.ELASTIC_INFERENCE_SERVICE_SSL_CONFIGURATION_PREFIX;
3138

3239
public class HttpClientManager implements Closeable {
3340
private static final Logger logger = LogManager.getLogger(HttpClientManager.class);
3441
/**
3542
* The maximum number of total connections the connection pool can lease to all routes.
43+
* The configuration applies to each instance of HTTPClientManager (max_total_connections=10 and instances=5 leads to 50 connections).
3644
* From googling around the connection pools maxTotal value should be close to the number of available threads.
3745
*
3846
* https://stackoverflow.com/questions/30989637/how-to-decide-optimal-settings-for-setmaxtotal-and-setdefaultmaxperroute
@@ -47,6 +55,7 @@ public class HttpClientManager implements Closeable {
4755

4856
/**
4957
* The max number of connections a single route can lease.
58+
* This configuration applies to each instance of HttpClientManager.
5059
*/
5160
public static final Setting<Integer> MAX_ROUTE_CONNECTIONS = Setting.intSetting(
5261
"xpack.inference.http.max_route_connections",
@@ -98,6 +107,22 @@ public static HttpClientManager create(
98107
return new HttpClientManager(settings, connectionManager, threadPool, clusterService, throttlerManager);
99108
}
100109

110+
public static HttpClientManager create(
111+
Settings settings,
112+
ThreadPool threadPool,
113+
ClusterService clusterService,
114+
ThrottlerManager throttlerManager,
115+
SSLService sslService
116+
) {
117+
// Set the sslStrategy to ensure an encrypted connection, as Elastic Inference Service requires it.
118+
SSLIOSessionStrategy sslioSessionStrategy = sslService.sslIOSessionStrategy(
119+
sslService.getSSLConfiguration(ELASTIC_INFERENCE_SERVICE_SSL_CONFIGURATION_PREFIX)
120+
);
121+
122+
PoolingNHttpClientConnectionManager connectionManager = createConnectionManager(sslioSessionStrategy);
123+
return new HttpClientManager(settings, connectionManager, threadPool, clusterService, throttlerManager);
124+
}
125+
101126
// Default for testing
102127
HttpClientManager(
103128
Settings settings,
@@ -121,6 +146,25 @@ public static HttpClientManager create(
121146
this.addSettingsUpdateConsumers(clusterService);
122147
}
123148

149+
private static PoolingNHttpClientConnectionManager createConnectionManager(SSLIOSessionStrategy sslStrategy) {
150+
ConnectingIOReactor ioReactor;
151+
try {
152+
var configBuilder = IOReactorConfig.custom().setSoKeepAlive(true);
153+
ioReactor = new DefaultConnectingIOReactor(configBuilder.build());
154+
} catch (IOReactorException e) {
155+
var message = "Failed to initialize HTTP client manager with SSL.";
156+
logger.error(message, e);
157+
throw new ElasticsearchException(message, e);
158+
}
159+
160+
Registry<SchemeIOSessionStrategy> registry = RegistryBuilder.<SchemeIOSessionStrategy>create()
161+
.register("http", NoopIOSessionStrategy.INSTANCE)
162+
.register("https", sslStrategy)
163+
.build();
164+
165+
return new PoolingNHttpClientConnectionManager(ioReactor, registry);
166+
}
167+
124168
private static PoolingNHttpClientConnectionManager createConnectionManager() {
125169
ConnectingIOReactor ioReactor;
126170
try {

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rank/textsimilarity/TextSimilarityRankRetrieverBuilder.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.features.NodeFeature;
1313
import org.elasticsearch.index.query.QueryBuilder;
1414
import org.elasticsearch.license.LicenseUtils;
15+
import org.elasticsearch.license.XPackLicenseState;
1516
import org.elasticsearch.search.builder.SearchSourceBuilder;
1617
import org.elasticsearch.search.rank.RankDoc;
1718
import org.elasticsearch.search.retriever.CompoundRetrieverBuilder;
@@ -21,7 +22,6 @@
2122
import org.elasticsearch.xcontent.ParseField;
2223
import org.elasticsearch.xcontent.XContentBuilder;
2324
import org.elasticsearch.xcontent.XContentParser;
24-
import org.elasticsearch.xpack.core.XPackPlugin;
2525

2626
import java.io.IOException;
2727
import java.util.List;
@@ -78,8 +78,11 @@ public class TextSimilarityRankRetrieverBuilder extends CompoundRetrieverBuilder
7878
RetrieverBuilder.declareBaseParserFields(TextSimilarityRankBuilder.NAME, PARSER);
7979
}
8080

81-
public static TextSimilarityRankRetrieverBuilder fromXContent(XContentParser parser, RetrieverParserContext context)
82-
throws IOException {
81+
public static TextSimilarityRankRetrieverBuilder fromXContent(
82+
XContentParser parser,
83+
RetrieverParserContext context,
84+
XPackLicenseState licenceState
85+
) throws IOException {
8386
if (context.clusterSupportsFeature(TEXT_SIMILARITY_RERANKER_RETRIEVER_SUPPORTED) == false) {
8487
throw new ParsingException(parser.getTokenLocation(), "unknown retriever [" + TextSimilarityRankBuilder.NAME + "]");
8588
}
@@ -88,7 +91,7 @@ public static TextSimilarityRankRetrieverBuilder fromXContent(XContentParser par
8891
"[text_similarity_reranker] retriever composition feature is not supported by all nodes in the cluster"
8992
);
9093
}
91-
if (TextSimilarityRankBuilder.TEXT_SIMILARITY_RERANKER_FEATURE.check(XPackPlugin.getSharedLicenseState()) == false) {
94+
if (TextSimilarityRankBuilder.TEXT_SIMILARITY_RERANKER_FEATURE.check(licenceState) == false) {
9295
throw LicenseUtils.newComplianceException(TextSimilarityRankBuilder.NAME);
9396
}
9497
return PARSER.apply(parser, context);

0 commit comments

Comments
 (0)