Skip to content

Commit 65dcdf2

Browse files
jonathan-buttnerelasticsearchmachine
andauthored
[ML] Implementing latency improvements for EIS integration (#133861)
* Adding latency improvements * Update docs/changelog/133861.yaml * [CI] Auto commit changes from spotless * Renaming test executor getter and adding response executor * [CI] Auto commit changes from spotless * Address feedback --------- Co-authored-by: elasticsearchmachine <[email protected]>
1 parent ad5ecc8 commit 65dcdf2

File tree

12 files changed

+223
-89
lines changed

12 files changed

+223
-89
lines changed

docs/changelog/133861.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 133861
2+
summary: Implementing latency improvements for EIS integration
3+
area: Machine Learning
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,7 @@ static TransportVersion def(int id) {
352352
public static final TransportVersion GEMINI_THINKING_BUDGET_ADDED = def(9_153_0_00);
353353
public static final TransportVersion VISIT_PERCENTAGE = def(9_154_0_00);
354354
public static final TransportVersion TIME_SERIES_TELEMETRY = def(9_155_0_00);
355+
public static final TransportVersion INFERENCE_API_EIS_DIAGNOSTICS = def(9_156_0_00);
355356

356357
/*
357358
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/inference/action/GetInferenceDiagnosticsAction.java

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.xpack.core.inference.action;
99

1010
import org.apache.http.pool.PoolStats;
11+
import org.elasticsearch.TransportVersions;
1112
import org.elasticsearch.action.ActionType;
1213
import org.elasticsearch.action.FailedNodeException;
1314
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
@@ -115,30 +116,53 @@ public int hashCode() {
115116
}
116117

117118
public static class NodeResponse extends BaseNodeResponse implements ToXContentFragment {
118-
static final String CONNECTION_POOL_STATS_FIELD_NAME = "connection_pool_stats";
119+
private static final String EXTERNAL_FIELD = "external";
120+
private static final String EIS_FIELD = "eis_mtls";
121+
private static final String CONNECTION_POOL_STATS_FIELD_NAME = "connection_pool_stats";
119122

120-
private final ConnectionPoolStats connectionPoolStats;
123+
private final ConnectionPoolStats externalConnectionPoolStats;
124+
private final ConnectionPoolStats eisMtlsConnectionPoolStats;
121125

122-
public NodeResponse(DiscoveryNode node, PoolStats poolStats) {
126+
public NodeResponse(DiscoveryNode node, PoolStats poolStats, PoolStats eisPoolStats) {
123127
super(node);
124-
connectionPoolStats = ConnectionPoolStats.of(poolStats);
128+
externalConnectionPoolStats = ConnectionPoolStats.of(poolStats);
129+
eisMtlsConnectionPoolStats = ConnectionPoolStats.of(eisPoolStats);
125130
}
126131

127132
public NodeResponse(StreamInput in) throws IOException {
128133
super(in);
129134

130-
connectionPoolStats = new ConnectionPoolStats(in);
135+
externalConnectionPoolStats = new ConnectionPoolStats(in);
136+
if (in.getTransportVersion().onOrAfter(TransportVersions.INFERENCE_API_EIS_DIAGNOSTICS)) {
137+
eisMtlsConnectionPoolStats = new ConnectionPoolStats(in);
138+
} else {
139+
eisMtlsConnectionPoolStats = ConnectionPoolStats.EMPTY;
140+
}
131141
}
132142

133143
@Override
134144
public void writeTo(StreamOutput out) throws IOException {
135145
super.writeTo(out);
136-
connectionPoolStats.writeTo(out);
146+
externalConnectionPoolStats.writeTo(out);
147+
148+
if (out.getTransportVersion().onOrAfter(TransportVersions.INFERENCE_API_EIS_DIAGNOSTICS)) {
149+
eisMtlsConnectionPoolStats.writeTo(out);
150+
}
137151
}
138152

139153
@Override
140154
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
141-
builder.field(CONNECTION_POOL_STATS_FIELD_NAME, connectionPoolStats, params);
155+
builder.startObject(EXTERNAL_FIELD);
156+
{
157+
builder.field(CONNECTION_POOL_STATS_FIELD_NAME, externalConnectionPoolStats, params);
158+
}
159+
builder.endObject();
160+
161+
builder.startObject(EIS_FIELD);
162+
{
163+
builder.field(CONNECTION_POOL_STATS_FIELD_NAME, eisMtlsConnectionPoolStats, params);
164+
}
165+
builder.endObject();
142166
return builder;
143167
}
144168

@@ -147,23 +171,29 @@ public boolean equals(Object o) {
147171
if (this == o) return true;
148172
if (o == null || getClass() != o.getClass()) return false;
149173
NodeResponse response = (NodeResponse) o;
150-
return Objects.equals(connectionPoolStats, response.connectionPoolStats);
174+
return Objects.equals(externalConnectionPoolStats, response.externalConnectionPoolStats)
175+
&& Objects.equals(eisMtlsConnectionPoolStats, response.eisMtlsConnectionPoolStats);
151176
}
152177

153178
@Override
154179
public int hashCode() {
155-
return Objects.hash(connectionPoolStats);
180+
return Objects.hash(externalConnectionPoolStats, eisMtlsConnectionPoolStats);
181+
}
182+
183+
ConnectionPoolStats getExternalConnectionPoolStats() {
184+
return externalConnectionPoolStats;
156185
}
157186

158-
ConnectionPoolStats getConnectionPoolStats() {
159-
return connectionPoolStats;
187+
ConnectionPoolStats getEisMtlsConnectionPoolStats() {
188+
return eisMtlsConnectionPoolStats;
160189
}
161190

162191
static class ConnectionPoolStats implements ToXContentObject, Writeable {
163-
static final String LEASED_CONNECTIONS = "leased_connections";
164-
static final String PENDING_CONNECTIONS = "pending_connections";
165-
static final String AVAILABLE_CONNECTIONS = "available_connections";
166-
static final String MAX_CONNECTIONS = "max_connections";
192+
private static final String LEASED_CONNECTIONS = "leased_connections";
193+
private static final String PENDING_CONNECTIONS = "pending_connections";
194+
private static final String AVAILABLE_CONNECTIONS = "available_connections";
195+
private static final String MAX_CONNECTIONS = "max_connections";
196+
private static final ConnectionPoolStats EMPTY = new ConnectionPoolStats(0, 0, 0, 0);
167197

168198
static ConnectionPoolStats of(PoolStats poolStats) {
169199
return new ConnectionPoolStats(poolStats.getLeased(), poolStats.getPending(), poolStats.getAvailable(), poolStats.getMax());

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/inference/action/GetInferenceDiagnosticsActionNodeResponseTests.java

Lines changed: 57 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,25 @@
88
package org.elasticsearch.xpack.core.inference.action;
99

1010
import org.apache.http.pool.PoolStats;
11+
import org.elasticsearch.TransportVersion;
12+
import org.elasticsearch.TransportVersions;
1113
import org.elasticsearch.cluster.node.DiscoveryNode;
1214
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
1315
import org.elasticsearch.common.Strings;
1416
import org.elasticsearch.common.io.stream.Writeable;
15-
import org.elasticsearch.test.AbstractWireSerializingTestCase;
17+
import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase;
1618

1719
import java.io.IOException;
1820
import java.io.UnsupportedEncodingException;
1921

20-
public class GetInferenceDiagnosticsActionNodeResponseTests extends AbstractWireSerializingTestCase<
22+
public class GetInferenceDiagnosticsActionNodeResponseTests extends AbstractBWCWireSerializationTestCase<
2123
GetInferenceDiagnosticsAction.NodeResponse> {
2224
public static GetInferenceDiagnosticsAction.NodeResponse createRandom() {
2325
DiscoveryNode node = DiscoveryNodeUtils.create("id");
24-
var randomPoolStats = new PoolStats(randomInt(), randomInt(), randomInt(), randomInt());
26+
var randomExternalPoolStats = new PoolStats(randomInt(), randomInt(), randomInt(), randomInt());
27+
var randomEisPoolStats = new PoolStats(randomInt(), randomInt(), randomInt(), randomInt());
2528

26-
return new GetInferenceDiagnosticsAction.NodeResponse(node, randomPoolStats);
29+
return new GetInferenceDiagnosticsAction.NodeResponse(node, randomExternalPoolStats, randomEisPoolStats);
2730
}
2831

2932
@Override
@@ -39,47 +42,61 @@ protected GetInferenceDiagnosticsAction.NodeResponse createTestInstance() {
3942
@Override
4043
protected GetInferenceDiagnosticsAction.NodeResponse mutateInstance(GetInferenceDiagnosticsAction.NodeResponse instance)
4144
throws IOException {
42-
var select = randomIntBetween(0, 3);
43-
var connPoolStats = instance.getConnectionPoolStats();
45+
if (randomBoolean()) {
46+
PoolStats mutatedConnPoolStats = mutatePoolStats(instance.getExternalConnectionPoolStats());
47+
PoolStats eisPoolStats = copyPoolStats(instance.getEisMtlsConnectionPoolStats());
48+
return new GetInferenceDiagnosticsAction.NodeResponse(instance.getNode(), mutatedConnPoolStats, eisPoolStats);
49+
} else {
50+
PoolStats connPoolStats = copyPoolStats(instance.getExternalConnectionPoolStats());
51+
PoolStats mutatedEisPoolStats = mutatePoolStats(instance.getEisMtlsConnectionPoolStats());
52+
return new GetInferenceDiagnosticsAction.NodeResponse(instance.getNode(), connPoolStats, mutatedEisPoolStats);
53+
}
54+
}
4455

56+
private PoolStats mutatePoolStats(GetInferenceDiagnosticsAction.NodeResponse.ConnectionPoolStats stats)
57+
throws UnsupportedEncodingException {
58+
var select = randomIntBetween(0, 3);
4559
return switch (select) {
46-
case 0 -> new GetInferenceDiagnosticsAction.NodeResponse(
47-
instance.getNode(),
48-
new PoolStats(
49-
randomInt(),
50-
connPoolStats.getPendingConnections(),
51-
connPoolStats.getAvailableConnections(),
52-
connPoolStats.getMaxConnections()
53-
)
54-
);
55-
case 1 -> new GetInferenceDiagnosticsAction.NodeResponse(
56-
instance.getNode(),
57-
new PoolStats(
58-
connPoolStats.getLeasedConnections(),
59-
randomInt(),
60-
connPoolStats.getAvailableConnections(),
61-
connPoolStats.getMaxConnections()
62-
)
63-
);
64-
case 2 -> new GetInferenceDiagnosticsAction.NodeResponse(
65-
instance.getNode(),
66-
new PoolStats(
67-
connPoolStats.getLeasedConnections(),
68-
connPoolStats.getPendingConnections(),
69-
randomInt(),
70-
connPoolStats.getMaxConnections()
71-
)
60+
case 0 -> new PoolStats(randomInt(), stats.getPendingConnections(), stats.getAvailableConnections(), stats.getMaxConnections());
61+
case 1 -> new PoolStats(stats.getLeasedConnections(), randomInt(), stats.getAvailableConnections(), stats.getMaxConnections());
62+
case 2 -> new PoolStats(stats.getLeasedConnections(), stats.getPendingConnections(), randomInt(), stats.getMaxConnections());
63+
case 3 -> new PoolStats(
64+
stats.getLeasedConnections(),
65+
stats.getPendingConnections(),
66+
stats.getAvailableConnections(),
67+
randomInt()
7268
);
73-
case 3 -> new GetInferenceDiagnosticsAction.NodeResponse(
69+
default -> throw new UnsupportedEncodingException(Strings.format("Encountered unsupported case %s", select));
70+
};
71+
}
72+
73+
private PoolStats copyPoolStats(GetInferenceDiagnosticsAction.NodeResponse.ConnectionPoolStats stats) {
74+
return new PoolStats(
75+
stats.getLeasedConnections(),
76+
stats.getPendingConnections(),
77+
stats.getAvailableConnections(),
78+
stats.getMaxConnections()
79+
);
80+
}
81+
82+
@Override
83+
protected GetInferenceDiagnosticsAction.NodeResponse mutateInstanceForVersion(
84+
GetInferenceDiagnosticsAction.NodeResponse instance,
85+
TransportVersion version
86+
) {
87+
if (version.before(TransportVersions.INFERENCE_API_EIS_DIAGNOSTICS)) {
88+
return new GetInferenceDiagnosticsAction.NodeResponse(
7489
instance.getNode(),
7590
new PoolStats(
76-
connPoolStats.getLeasedConnections(),
77-
connPoolStats.getPendingConnections(),
78-
connPoolStats.getAvailableConnections(),
79-
randomInt()
80-
)
91+
instance.getExternalConnectionPoolStats().getLeasedConnections(),
92+
instance.getExternalConnectionPoolStats().getPendingConnections(),
93+
instance.getExternalConnectionPoolStats().getAvailableConnections(),
94+
instance.getExternalConnectionPoolStats().getMaxConnections()
95+
),
96+
new PoolStats(0, 0, 0, 0)
8197
);
82-
default -> throw new UnsupportedEncodingException(Strings.format("Encountered unsupported case %s", select));
83-
};
98+
} else {
99+
return instance;
100+
}
84101
}
85102
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/inference/action/GetInferenceDiagnosticsActionResponseTests.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,17 @@
1111
import org.elasticsearch.cluster.ClusterName;
1212
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
1313
import org.elasticsearch.common.io.stream.Writeable;
14+
import org.elasticsearch.common.xcontent.XContentHelper;
1415
import org.elasticsearch.test.AbstractWireSerializingTestCase;
1516
import org.elasticsearch.xcontent.XContentBuilder;
1617
import org.elasticsearch.xcontent.XContentFactory;
1718
import org.elasticsearch.xcontent.XContentType;
18-
import org.hamcrest.CoreMatchers;
1919

2020
import java.io.IOException;
2121
import java.util.List;
2222

23+
import static org.hamcrest.Matchers.is;
24+
2325
public class GetInferenceDiagnosticsActionResponseTests extends AbstractWireSerializingTestCase<GetInferenceDiagnosticsAction.Response> {
2426

2527
public static GetInferenceDiagnosticsAction.Response createRandom() {
@@ -33,20 +35,39 @@ public static GetInferenceDiagnosticsAction.Response createRandom() {
3335

3436
public void testToXContent() throws IOException {
3537
var node = DiscoveryNodeUtils.create("id");
36-
var poolStats = new PoolStats(1, 2, 3, 4);
38+
var externalPoolStats = new PoolStats(1, 2, 3, 4);
39+
var eisPoolStats = new PoolStats(5, 6, 7, 8);
3740
var entity = new GetInferenceDiagnosticsAction.Response(
3841
ClusterName.DEFAULT,
39-
List.of(new GetInferenceDiagnosticsAction.NodeResponse(node, poolStats)),
42+
List.of(new GetInferenceDiagnosticsAction.NodeResponse(node, externalPoolStats, eisPoolStats)),
4043
List.of()
4144
);
4245

4346
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
4447
entity.toXContent(builder, null);
4548
String xContentResult = org.elasticsearch.common.Strings.toString(builder);
4649

47-
assertThat(xContentResult, CoreMatchers.is("""
48-
{"id":{"connection_pool_stats":{"leased_connections":1,"pending_connections":2,"available_connections":3,""" + """
49-
"max_connections":4}}}"""));
50+
assertThat(xContentResult, is(XContentHelper.stripWhitespace("""
51+
{
52+
"id":{
53+
"external": {
54+
"connection_pool_stats":{
55+
"leased_connections":1,
56+
"pending_connections":2,
57+
"available_connections":3,
58+
"max_connections":4
59+
}
60+
},
61+
"eis_mtls": {
62+
"connection_pool_stats":{
63+
"leased_connections":5,
64+
"pending_connections":6,
65+
"available_connections":7,
66+
"max_connections":8
67+
}
68+
}
69+
}
70+
}""")));
5071
}
5172

5273
@Override

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ public class InferencePlugin extends Plugin
203203

204204
public static final String NAME = "inference";
205205
public static final String UTILITY_THREAD_POOL_NAME = "inference_utility";
206+
public static final String INFERENCE_RESPONSE_THREAD_POOL_NAME = "inference_response";
206207

207208
private static final Logger log = LogManager.getLogger(InferencePlugin.class);
208209

@@ -374,7 +375,9 @@ public Collection<?> createComponents(PluginServices services) {
374375

375376
components.add(serviceRegistry);
376377
components.add(modelRegistry.get());
377-
components.add(httpClientManager);
378+
components.add(
379+
new TransportGetInferenceDiagnosticsAction.ClientManagers(httpClientManager, elasticInferenceServiceHttpClientManager)
380+
);
378381
components.add(inferenceStatsBinding);
379382

380383
// Only add InferenceServiceNodeLocalRateLimitCalculator (which is a ClusterStateListener) for cluster aware rate limiting,
@@ -497,10 +500,10 @@ protected Settings getSecretsIndexSettings() {
497500

498501
@Override
499502
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settingsToUse) {
500-
return List.of(inferenceUtilityExecutor(settings));
503+
return List.of(inferenceUtilityExecutor(), inferenceResponseExecutor());
501504
}
502505

503-
public static ExecutorBuilder<?> inferenceUtilityExecutor(Settings settings) {
506+
private static ExecutorBuilder<?> inferenceUtilityExecutor() {
504507
return new ScalingExecutorBuilder(
505508
UTILITY_THREAD_POOL_NAME,
506509
0,
@@ -511,6 +514,17 @@ public static ExecutorBuilder<?> inferenceUtilityExecutor(Settings settings) {
511514
);
512515
}
513516

517+
private static ExecutorBuilder<?> inferenceResponseExecutor() {
518+
return new ScalingExecutorBuilder(
519+
INFERENCE_RESPONSE_THREAD_POOL_NAME,
520+
0,
521+
10,
522+
TimeValue.timeValueMinutes(10),
523+
false,
524+
"xpack.inference.inference_response_thread_pool"
525+
);
526+
}
527+
514528
@Override
515529
public List<Setting<?>> getSettings() {
516530
return List.copyOf(getInferenceSettings());

0 commit comments

Comments
 (0)