Skip to content

Commit 9b01b15

Browse files
Adding latency improvements
1 parent abcffc4 commit 9b01b15

File tree

10 files changed

+196
-50
lines changed

10 files changed

+196
-50
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ static TransportVersion def(int id) {
357357
public static final TransportVersion ESQL_SAMPLE_OPERATOR_STATUS = def(9_127_0_00);
358358
public static final TransportVersion ALLOCATION_DECISION_NOT_PREFERRED = def(9_145_0_00);
359359
public static final TransportVersion ESQL_QUALIFIERS_IN_ATTRIBUTES = def(9_146_0_00);
360+
public static final TransportVersion INFERENCE_API_EIS_DIAGNOSTICS = def(9_147_0_00);
360361

361362
/*
362363
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/inference/action/GetInferenceDiagnosticsAction.java

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.xpack.core.inference.action;
99

1010
import org.apache.http.pool.PoolStats;
11+
import org.elasticsearch.TransportVersions;
1112
import org.elasticsearch.action.ActionType;
1213
import org.elasticsearch.action.FailedNodeException;
1314
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
@@ -115,30 +116,53 @@ public int hashCode() {
115116
}
116117

117118
public static class NodeResponse extends BaseNodeResponse implements ToXContentFragment {
118-
static final String CONNECTION_POOL_STATS_FIELD_NAME = "connection_pool_stats";
119+
private static final String EXTERNAL_FIELD = "external";
120+
private static final String EIS_FIELD = "eis_mtls";
121+
private static final String CONNECTION_POOL_STATS_FIELD_NAME = "connection_pool_stats";
119122

120-
private final ConnectionPoolStats connectionPoolStats;
123+
private final ConnectionPoolStats externalConnectionPoolStats;
124+
private final ConnectionPoolStats eisMtlsConnectionPoolStats;
121125

122-
public NodeResponse(DiscoveryNode node, PoolStats poolStats) {
126+
public NodeResponse(DiscoveryNode node, PoolStats poolStats, PoolStats eisPoolStats) {
123127
super(node);
124-
connectionPoolStats = ConnectionPoolStats.of(poolStats);
128+
externalConnectionPoolStats = ConnectionPoolStats.of(poolStats);
129+
eisMtlsConnectionPoolStats = ConnectionPoolStats.of(eisPoolStats);
125130
}
126131

127132
public NodeResponse(StreamInput in) throws IOException {
128133
super(in);
129134

130-
connectionPoolStats = new ConnectionPoolStats(in);
135+
externalConnectionPoolStats = new ConnectionPoolStats(in);
136+
if (in.getTransportVersion().onOrAfter(TransportVersions.INFERENCE_API_EIS_DIAGNOSTICS)) {
137+
eisMtlsConnectionPoolStats = new ConnectionPoolStats(in);
138+
} else {
139+
eisMtlsConnectionPoolStats = ConnectionPoolStats.EMPTY;
140+
}
131141
}
132142

133143
@Override
134144
public void writeTo(StreamOutput out) throws IOException {
135145
super.writeTo(out);
136-
connectionPoolStats.writeTo(out);
146+
externalConnectionPoolStats.writeTo(out);
147+
148+
if (out.getTransportVersion().onOrAfter(TransportVersions.INFERENCE_API_EIS_DIAGNOSTICS)) {
149+
eisMtlsConnectionPoolStats.writeTo(out);
150+
}
137151
}
138152

139153
@Override
140154
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
141-
builder.field(CONNECTION_POOL_STATS_FIELD_NAME, connectionPoolStats, params);
155+
builder.startObject(EXTERNAL_FIELD);
156+
{
157+
builder.field(CONNECTION_POOL_STATS_FIELD_NAME, externalConnectionPoolStats, params);
158+
}
159+
builder.endObject();
160+
161+
builder.startObject(EIS_FIELD);
162+
{
163+
builder.field(CONNECTION_POOL_STATS_FIELD_NAME, eisMtlsConnectionPoolStats, params);
164+
}
165+
builder.endObject();
142166
return builder;
143167
}
144168

@@ -147,23 +171,29 @@ public boolean equals(Object o) {
147171
if (this == o) return true;
148172
if (o == null || getClass() != o.getClass()) return false;
149173
NodeResponse response = (NodeResponse) o;
150-
return Objects.equals(connectionPoolStats, response.connectionPoolStats);
174+
return Objects.equals(externalConnectionPoolStats, response.externalConnectionPoolStats)
175+
&& Objects.equals(eisMtlsConnectionPoolStats, response.eisMtlsConnectionPoolStats);
151176
}
152177

153178
@Override
154179
public int hashCode() {
155-
return Objects.hash(connectionPoolStats);
180+
return Objects.hash(externalConnectionPoolStats, eisMtlsConnectionPoolStats);
181+
}
182+
183+
ConnectionPoolStats getExternalConnectionPoolStats() {
184+
return externalConnectionPoolStats;
156185
}
157186

158-
ConnectionPoolStats getConnectionPoolStats() {
159-
return connectionPoolStats;
187+
ConnectionPoolStats getEisMtlsConnectionPoolStats() {
188+
return eisMtlsConnectionPoolStats;
160189
}
161190

162191
static class ConnectionPoolStats implements ToXContentObject, Writeable {
163-
static final String LEASED_CONNECTIONS = "leased_connections";
164-
static final String PENDING_CONNECTIONS = "pending_connections";
165-
static final String AVAILABLE_CONNECTIONS = "available_connections";
166-
static final String MAX_CONNECTIONS = "max_connections";
192+
private static final String LEASED_CONNECTIONS = "leased_connections";
193+
private static final String PENDING_CONNECTIONS = "pending_connections";
194+
private static final String AVAILABLE_CONNECTIONS = "available_connections";
195+
private static final String MAX_CONNECTIONS = "max_connections";
196+
private static final ConnectionPoolStats EMPTY = new ConnectionPoolStats(0, 0, 0, 0);
167197

168198
static ConnectionPoolStats of(PoolStats poolStats) {
169199
return new ConnectionPoolStats(poolStats.getLeased(), poolStats.getPending(), poolStats.getAvailable(), poolStats.getMax());

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/inference/action/GetInferenceDiagnosticsActionNodeResponseTests.java

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,25 @@
88
package org.elasticsearch.xpack.core.inference.action;
99

1010
import org.apache.http.pool.PoolStats;
11+
import org.elasticsearch.TransportVersion;
12+
import org.elasticsearch.TransportVersions;
1113
import org.elasticsearch.cluster.node.DiscoveryNode;
1214
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
1315
import org.elasticsearch.common.Strings;
1416
import org.elasticsearch.common.io.stream.Writeable;
15-
import org.elasticsearch.test.AbstractWireSerializingTestCase;
17+
import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase;
1618

1719
import java.io.IOException;
1820
import java.io.UnsupportedEncodingException;
1921

20-
public class GetInferenceDiagnosticsActionNodeResponseTests extends AbstractWireSerializingTestCase<
22+
public class GetInferenceDiagnosticsActionNodeResponseTests extends AbstractBWCWireSerializationTestCase<
2123
GetInferenceDiagnosticsAction.NodeResponse> {
2224
public static GetInferenceDiagnosticsAction.NodeResponse createRandom() {
2325
DiscoveryNode node = DiscoveryNodeUtils.create("id");
24-
var randomPoolStats = new PoolStats(randomInt(), randomInt(), randomInt(), randomInt());
26+
var randomExternalPoolStats = new PoolStats(randomInt(), randomInt(), randomInt(), randomInt());
27+
var randomEisPoolStats = new PoolStats(randomInt(), randomInt(), randomInt(), randomInt());
2528

26-
return new GetInferenceDiagnosticsAction.NodeResponse(node, randomPoolStats);
29+
return new GetInferenceDiagnosticsAction.NodeResponse(node, randomExternalPoolStats, randomEisPoolStats);
2730
}
2831

2932
@Override
@@ -40,7 +43,8 @@ protected GetInferenceDiagnosticsAction.NodeResponse createTestInstance() {
4043
protected GetInferenceDiagnosticsAction.NodeResponse mutateInstance(GetInferenceDiagnosticsAction.NodeResponse instance)
4144
throws IOException {
4245
var select = randomIntBetween(0, 3);
43-
var connPoolStats = instance.getConnectionPoolStats();
46+
var connPoolStats = instance.getExternalConnectionPoolStats();
47+
var eisPoolStats = instance.getEisMtlsConnectionPoolStats();
4448

4549
return switch (select) {
4650
case 0 -> new GetInferenceDiagnosticsAction.NodeResponse(
@@ -50,6 +54,12 @@ protected GetInferenceDiagnosticsAction.NodeResponse mutateInstance(GetInference
5054
connPoolStats.getPendingConnections(),
5155
connPoolStats.getAvailableConnections(),
5256
connPoolStats.getMaxConnections()
57+
),
58+
new PoolStats(
59+
randomInt(),
60+
eisPoolStats.getPendingConnections(),
61+
eisPoolStats.getAvailableConnections(),
62+
eisPoolStats.getMaxConnections()
5363
)
5464
);
5565
case 1 -> new GetInferenceDiagnosticsAction.NodeResponse(
@@ -59,6 +69,12 @@ protected GetInferenceDiagnosticsAction.NodeResponse mutateInstance(GetInference
5969
randomInt(),
6070
connPoolStats.getAvailableConnections(),
6171
connPoolStats.getMaxConnections()
72+
),
73+
new PoolStats(
74+
eisPoolStats.getLeasedConnections(),
75+
randomInt(),
76+
eisPoolStats.getAvailableConnections(),
77+
eisPoolStats.getMaxConnections()
6278
)
6379
);
6480
case 2 -> new GetInferenceDiagnosticsAction.NodeResponse(
@@ -68,6 +84,12 @@ protected GetInferenceDiagnosticsAction.NodeResponse mutateInstance(GetInference
6884
connPoolStats.getPendingConnections(),
6985
randomInt(),
7086
connPoolStats.getMaxConnections()
87+
),
88+
new PoolStats(
89+
eisPoolStats.getLeasedConnections(),
90+
eisPoolStats.getPendingConnections(),
91+
randomInt(),
92+
eisPoolStats.getMaxConnections()
7193
)
7294
);
7395
case 3 -> new GetInferenceDiagnosticsAction.NodeResponse(
@@ -77,9 +99,29 @@ protected GetInferenceDiagnosticsAction.NodeResponse mutateInstance(GetInference
7799
connPoolStats.getPendingConnections(),
78100
connPoolStats.getAvailableConnections(),
79101
randomInt()
102+
),
103+
new PoolStats(
104+
eisPoolStats.getLeasedConnections(),
105+
eisPoolStats.getPendingConnections(),
106+
eisPoolStats.getAvailableConnections(),
107+
randomInt()
80108
)
81109
);
82110
default -> throw new UnsupportedEncodingException(Strings.format("Encountered unsupported case %s", select));
83111
};
84112
}
113+
114+
@Override
115+
protected GetInferenceDiagnosticsAction.NodeResponse mutateInstanceForVersion(GetInferenceDiagnosticsAction.NodeResponse instance, TransportVersion version) {
116+
if (version.before(TransportVersions.INFERENCE_API_EIS_DIAGNOSTICS)) {
117+
return new GetInferenceDiagnosticsAction.NodeResponse(instance.getNode(), new PoolStats(
118+
instance.getExternalConnectionPoolStats().getLeasedConnections(),
119+
instance.getExternalConnectionPoolStats().getPendingConnections(),
120+
instance.getExternalConnectionPoolStats().getAvailableConnections(),
121+
instance.getExternalConnectionPoolStats().getMaxConnections()
122+
), new PoolStats(0, 0, 0, 0));
123+
} else {
124+
return instance;
125+
}
126+
}
85127
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/inference/action/GetInferenceDiagnosticsActionResponseTests.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,17 @@
1111
import org.elasticsearch.cluster.ClusterName;
1212
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
1313
import org.elasticsearch.common.io.stream.Writeable;
14+
import org.elasticsearch.common.xcontent.XContentHelper;
1415
import org.elasticsearch.test.AbstractWireSerializingTestCase;
1516
import org.elasticsearch.xcontent.XContentBuilder;
1617
import org.elasticsearch.xcontent.XContentFactory;
1718
import org.elasticsearch.xcontent.XContentType;
18-
import org.hamcrest.CoreMatchers;
1919

2020
import java.io.IOException;
2121
import java.util.List;
2222

23+
import static org.hamcrest.Matchers.is;
24+
2325
public class GetInferenceDiagnosticsActionResponseTests extends AbstractWireSerializingTestCase<GetInferenceDiagnosticsAction.Response> {
2426

2527
public static GetInferenceDiagnosticsAction.Response createRandom() {
@@ -33,20 +35,39 @@ public static GetInferenceDiagnosticsAction.Response createRandom() {
3335

3436
public void testToXContent() throws IOException {
3537
var node = DiscoveryNodeUtils.create("id");
36-
var poolStats = new PoolStats(1, 2, 3, 4);
38+
var externalPoolStats = new PoolStats(1, 2, 3, 4);
39+
var eisPoolStats = new PoolStats(5, 6, 7, 8);
3740
var entity = new GetInferenceDiagnosticsAction.Response(
3841
ClusterName.DEFAULT,
39-
List.of(new GetInferenceDiagnosticsAction.NodeResponse(node, poolStats)),
42+
List.of(new GetInferenceDiagnosticsAction.NodeResponse(node, externalPoolStats, eisPoolStats)),
4043
List.of()
4144
);
4245

4346
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
4447
entity.toXContent(builder, null);
4548
String xContentResult = org.elasticsearch.common.Strings.toString(builder);
4649

47-
assertThat(xContentResult, CoreMatchers.is("""
48-
{"id":{"connection_pool_stats":{"leased_connections":1,"pending_connections":2,"available_connections":3,""" + """
49-
"max_connections":4}}}"""));
50+
assertThat(xContentResult, is(XContentHelper.stripWhitespace("""
51+
{
52+
"id":{
53+
"external": {
54+
"connection_pool_stats":{
55+
"leased_connections":1,
56+
"pending_connections":2,
57+
"available_connections":3,
58+
"max_connections":4
59+
}
60+
},
61+
"eis_mtls": {
62+
"connection_pool_stats":{
63+
"leased_connections":5,
64+
"pending_connections":6,
65+
"available_connections":7,
66+
"max_connections":8
67+
}
68+
}
69+
}
70+
}""")));
5071
}
5172

5273
@Override

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ public class InferencePlugin extends Plugin
203203

204204
public static final String NAME = "inference";
205205
public static final String UTILITY_THREAD_POOL_NAME = "inference_utility";
206+
public static final String INFERENCE_RESPONSE_THREAD_POOL_NAME = "inference_response";
206207

207208
private static final Logger log = LogManager.getLogger(InferencePlugin.class);
208209

@@ -374,7 +375,10 @@ public Collection<?> createComponents(PluginServices services) {
374375

375376
components.add(serviceRegistry);
376377
components.add(modelRegistry.get());
377-
components.add(httpClientManager);
378+
components.add(new TransportGetInferenceDiagnosticsAction.ClientManagers(
379+
httpClientManager,
380+
elasticInferenceServiceHttpClientManager
381+
));
378382
components.add(inferenceStatsBinding);
379383

380384
// Only add InferenceServiceNodeLocalRateLimitCalculator (which is a ClusterStateListener) for cluster aware rate limiting,
@@ -497,10 +501,10 @@ protected Settings getSecretsIndexSettings() {
497501

498502
@Override
499503
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settingsToUse) {
500-
return List.of(inferenceUtilityExecutor(settings));
504+
return List.of(inferenceUtilityExecutor(settings), inferenceResponseExecutor());
501505
}
502506

503-
public static ExecutorBuilder<?> inferenceUtilityExecutor(Settings settings) {
507+
private static ExecutorBuilder<?> inferenceUtilityExecutor(Settings settings) {
504508
return new ScalingExecutorBuilder(
505509
UTILITY_THREAD_POOL_NAME,
506510
0,
@@ -511,6 +515,17 @@ public static ExecutorBuilder<?> inferenceUtilityExecutor(Settings settings) {
511515
);
512516
}
513517

518+
private static ExecutorBuilder<?> inferenceResponseExecutor() {
519+
return new ScalingExecutorBuilder(
520+
INFERENCE_RESPONSE_THREAD_POOL_NAME,
521+
0,
522+
10,
523+
TimeValue.timeValueMinutes(10),
524+
false,
525+
"xpack.inference.inference_response_thread_pool"
526+
);
527+
}
528+
514529
@Override
515530
public List<Setting<?>> getSettings() {
516531
return List.copyOf(getInferenceSettings());

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/TransportGetInferenceDiagnosticsAction.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,17 @@ public class TransportGetInferenceDiagnosticsAction extends TransportNodesAction
3131
GetInferenceDiagnosticsAction.NodeResponse,
3232
Void> {
3333

34-
private final HttpClientManager httpClientManager;
34+
public record ClientManagers(HttpClientManager externalHttpClientManager, HttpClientManager eisMtlsHttpClientManager) {}
35+
36+
private final ClientManagers managers;
3537

3638
@Inject
3739
public TransportGetInferenceDiagnosticsAction(
3840
ThreadPool threadPool,
3941
ClusterService clusterService,
4042
TransportService transportService,
4143
ActionFilters actionFilters,
42-
HttpClientManager httpClientManager
44+
ClientManagers managers
4345
) {
4446
super(
4547
GetInferenceDiagnosticsAction.NAME,
@@ -50,7 +52,7 @@ public TransportGetInferenceDiagnosticsAction(
5052
threadPool.executor(ThreadPool.Names.MANAGEMENT)
5153
);
5254

53-
this.httpClientManager = Objects.requireNonNull(httpClientManager);
55+
this.managers = Objects.requireNonNull(managers);
5456
}
5557

5658
@Override
@@ -74,6 +76,10 @@ protected GetInferenceDiagnosticsAction.NodeResponse newNodeResponse(StreamInput
7476

7577
@Override
7678
protected GetInferenceDiagnosticsAction.NodeResponse nodeOperation(GetInferenceDiagnosticsAction.NodeRequest request, Task task) {
77-
return new GetInferenceDiagnosticsAction.NodeResponse(transportService.getLocalNode(), httpClientManager.getPoolStats());
79+
return new GetInferenceDiagnosticsAction.NodeResponse(
80+
transportService.getLocalNode(),
81+
managers.externalHttpClientManager().getPoolStats(),
82+
managers.eisMtlsHttpClientManager().getPoolStats()
83+
);
7884
}
7985
}

0 commit comments

Comments
 (0)