Skip to content

Commit ab7e442

Browse files
committed
remove routing stats
1 parent 93f0f19 commit ab7e442

File tree

7 files changed

+42
-80
lines changed

7 files changed

+42
-80
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ static TransportVersion def(int id) {
325325
public static final TransportVersion MAX_HEAP_SIZE_PER_NODE_IN_CLUSTER_INFO = def(9_159_0_00);
326326
public static final TransportVersion TIMESERIES_DEFAULT_LIMIT = def(9_160_0_00);
327327
public static final TransportVersion INFERENCE_API_OPENAI_HEADERS = def(9_161_0_00);
328+
public static final TransportVersion INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING_REMOVED = def(9_162_0_00);
328329

329330
/*
330331
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/inference/telemetry/InferenceStats.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,6 @@ public static Map<String, Object> modelAttributes(Model model) {
6161
return modelAttributesMap;
6262
}
6363

64-
public static Map<String, Object> routingAttributes(boolean hasBeenRerouted, String nodeIdHandlingRequest) {
65-
return Map.of("rerouted", hasBeenRerouted, "node_id", nodeIdHandlingRequest);
66-
}
67-
6864
public static Map<String, Object> modelAttributes(UnparsedModel model) {
6965
return Map.of("service", model.service(), "task_type", model.taskType().toString());
7066
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/inference/action/BaseInferenceActionRequest.java

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
*/
2424
public abstract class BaseInferenceActionRequest extends LegacyActionRequest {
2525

26-
private boolean hasBeenRerouted;
27-
2826
private final InferenceContext context;
2927

3028
public BaseInferenceActionRequest(InferenceContext context) {
@@ -34,12 +32,9 @@ public BaseInferenceActionRequest(InferenceContext context) {
3432

3533
public BaseInferenceActionRequest(StreamInput in) throws IOException {
3634
super(in);
37-
if (in.getTransportVersion().onOrAfter(TransportVersions.INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING)) {
38-
this.hasBeenRerouted = in.readBoolean();
39-
} else {
40-
// For backwards compatibility, we treat all inference requests coming from ES nodes having
41-
// a version pre-node-local-rate-limiting as already rerouted to maintain pre-node-local-rate-limiting behavior.
42-
this.hasBeenRerouted = true;
35+
if (in.getTransportVersion().onOrAfter(TransportVersions.INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING) &&
36+
in.getTransportVersion().before(TransportVersions.INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING_REMOVED)) {
37+
in.readBoolean();
4338
}
4439

4540
if (in.getTransportVersion().onOrAfter(TransportVersions.INFERENCE_CONTEXT)
@@ -56,23 +51,16 @@ public BaseInferenceActionRequest(StreamInput in) throws IOException {
5651

5752
public abstract String getInferenceEntityId();
5853

59-
public void setHasBeenRerouted(boolean hasBeenRerouted) {
60-
this.hasBeenRerouted = hasBeenRerouted;
61-
}
62-
63-
public boolean hasBeenRerouted() {
64-
return hasBeenRerouted;
65-
}
66-
6754
public InferenceContext getContext() {
6855
return context;
6956
}
7057

7158
@Override
7259
public void writeTo(StreamOutput out) throws IOException {
7360
super.writeTo(out);
74-
if (out.getTransportVersion().onOrAfter(TransportVersions.INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING)) {
75-
out.writeBoolean(hasBeenRerouted);
61+
if (out.getTransportVersion().onOrAfter(TransportVersions.INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING)
62+
&& out.getTransportVersion().before(TransportVersions.INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING_REMOVED)) {
63+
out.writeBoolean(true);
7664
}
7765

7866
if (out.getTransportVersion().onOrAfter(TransportVersions.INFERENCE_CONTEXT)
@@ -86,11 +74,11 @@ public boolean equals(Object o) {
8674
if (this == o) return true;
8775
if (o == null || getClass() != o.getClass()) return false;
8876
BaseInferenceActionRequest that = (BaseInferenceActionRequest) o;
89-
return hasBeenRerouted == that.hasBeenRerouted && Objects.equals(context, that.context);
77+
return Objects.equals(context, that.context);
9078
}
9179

9280
@Override
9381
public int hashCode() {
94-
return Objects.hash(hasBeenRerouted, context);
82+
return Objects.hash(context);
9583
}
9684
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/inference/action/InferenceActionRequestTests.java

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -691,13 +691,6 @@ protected InferenceAction.Request mutateInstanceForVersion(InferenceAction.Reque
691691
mutated = instance;
692692
}
693693

694-
// We always assume that a request has been rerouted, if it came from a node without adaptive rate limiting
695-
if (version.before(TransportVersions.INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING)) {
696-
mutated.setHasBeenRerouted(true);
697-
} else {
698-
mutated.setHasBeenRerouted(instance.hasBeenRerouted());
699-
}
700-
701694
return mutated;
702695
}
703696

@@ -749,7 +742,7 @@ public void testWriteTo_WhenVersionIsBeforeInputTypeAdded_ShouldSetInputTypeToUn
749742
assertThat(deserializedInstance.getInputType(), is(InputType.UNSPECIFIED));
750743
}
751744

752-
public void testWriteTo_WhenVersionIsBeforeAdaptiveRateLimiting_ShouldSetHasBeenReroutedToTrue() throws IOException {
745+
public void testWriteTo_ForHasBeenReroutedChanges() throws IOException {
753746
var instance = new InferenceAction.Request(
754747
TaskType.TEXT_EMBEDDING,
755748
"model",
@@ -763,15 +756,39 @@ public void testWriteTo_WhenVersionIsBeforeAdaptiveRateLimiting_ShouldSetHasBeen
763756
false
764757
);
765758

766-
InferenceAction.Request deserializedInstance = copyWriteable(
767-
instance,
768-
getNamedWriteableRegistry(),
769-
instanceReader(),
770-
TransportVersions.V_8_13_0
771-
);
759+
{
760+
// From a version before the rerouting logic was added
761+
InferenceAction.Request deserializedInstance = copyWriteable(
762+
instance,
763+
getNamedWriteableRegistry(),
764+
instanceReader(),
765+
TransportVersions.V_8_17_0
766+
);
767+
768+
assertEquals(instance, deserializedInstance);
769+
}
770+
{
771+
// From a version with rerouting
772+
InferenceAction.Request deserializedInstance = copyWriteable(
773+
instance,
774+
getNamedWriteableRegistry(),
775+
instanceReader(),
776+
TransportVersions.INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING
777+
);
772778

773-
// Verify that hasBeenRerouted is true after deserializing a request coming from an older transport version
774-
assertTrue(deserializedInstance.hasBeenRerouted());
779+
assertEquals(instance, deserializedInstance);
780+
}
781+
{
782+
// From a version with rerouting removed
783+
InferenceAction.Request deserializedInstance = copyWriteable(
784+
instance,
785+
getNamedWriteableRegistry(),
786+
instanceReader(),
787+
TransportVersions.INFERENCE_REQUEST_ADAPTIVE_RATE_LIMITING_REMOVED
788+
);
789+
790+
assertEquals(instance, deserializedInstance);
791+
}
775792
}
776793

777794
public void testWriteTo_WhenVersionIsBeforeInferenceContext_ShouldSetContextToEmptyContext() throws IOException {

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/inference/action/UnifiedCompletionActionRequestTests.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -68,25 +68,6 @@ public void testValidation_ReturnsNull_When_TaskType_IsAny() {
6868
assertNull(request.validate());
6969
}
7070

71-
public void testWriteTo_WhenVersionIsBeforeAdaptiveRateLimiting_ShouldSetHasBeenReroutedToTrue() throws IOException {
72-
var instance = new UnifiedCompletionAction.Request(
73-
"model",
74-
TaskType.ANY,
75-
UnifiedCompletionRequest.of(List.of(UnifiedCompletionRequestTests.randomMessage())),
76-
TimeValue.timeValueSeconds(10)
77-
);
78-
79-
UnifiedCompletionAction.Request deserializedInstance = copyWriteable(
80-
instance,
81-
getNamedWriteableRegistry(),
82-
instanceReader(),
83-
TransportVersions.ELASTIC_INFERENCE_SERVICE_UNIFIED_CHAT_COMPLETIONS_INTEGRATION
84-
);
85-
86-
// Verify that hasBeenRerouted is true after deserializing a request coming from an older transport version
87-
assertTrue(deserializedInstance.hasBeenRerouted());
88-
}
89-
9071
public void testWriteTo_WhenVersionIsBeforeInferenceContext_ShouldSetContextToEmptyContext() throws IOException {
9172
var instance = new UnifiedCompletionAction.Request(
9273
"model",

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/BaseTransportInferenceAction.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import static org.elasticsearch.inference.telemetry.InferenceStats.modelAndResponseAttributes;
5353
import static org.elasticsearch.inference.telemetry.InferenceStats.modelAttributes;
5454
import static org.elasticsearch.inference.telemetry.InferenceStats.responseAttributes;
55-
import static org.elasticsearch.inference.telemetry.InferenceStats.routingAttributes;
5655
import static org.elasticsearch.xpack.inference.InferencePlugin.INFERENCE_API_FEATURE;
5756

5857
/**
@@ -272,7 +271,6 @@ protected <T> Flow.Publisher<T> streamErrorHandler(Flow.Publisher<T> upstream) {
272271
private void recordRequestCountMetrics(Model model, Request request, String localNodeId) {
273272
Map<String, Object> requestCountAttributes = new HashMap<>();
274273
requestCountAttributes.putAll(modelAttributes(model));
275-
requestCountAttributes.putAll(routingAttributes(request.hasBeenRerouted(), localNodeId));
276274

277275
inferenceStats.requestCount().incrementBy(1, requestCountAttributes);
278276
}
@@ -286,7 +284,6 @@ private void recordRequestDurationMetrics(
286284
) {
287285
Map<String, Object> metricAttributes = new HashMap<>();
288286
metricAttributes.putAll(modelAndResponseAttributes(model, unwrapCause(t)));
289-
metricAttributes.putAll(routingAttributes(request.hasBeenRerouted(), localNodeId));
290287

291288
inferenceStats.inferenceDuration().record(timer.elapsedMillis(), metricAttributes);
292289
}

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/action/BaseTransportInferenceActionTestCase.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ public abstract class BaseTransportInferenceActionTestCase<Request extends BaseI
6464
protected static final String serviceId = "serviceId";
6565
protected final TaskType taskType;
6666
protected static final String inferenceId = "inferenceEntityId";
67-
protected static final String localNodeId = "local-node-id";
6867
protected InferenceServiceRegistry serviceRegistry;
6968
protected InferenceStats inferenceStats;
7069
protected TransportService transportService;
@@ -100,7 +99,6 @@ public void setUp() throws Exception {
10099
);
101100

102101
mockValidLicenseState();
103-
mockNodeClient();
104102
}
105103

106104
protected abstract BaseTransportInferenceAction<Request> createAction(
@@ -237,8 +235,6 @@ public void testMetricsAfterInferError() {
237235
assertThat(attributes.get("model_id"), nullValue());
238236
assertThat(attributes.get("status_code"), nullValue());
239237
assertThat(attributes.get("error.type"), is(expectedError));
240-
assertThat(attributes.get("rerouted"), is(Boolean.FALSE));
241-
assertThat(attributes.get("node_id"), is(localNodeId));
242238
}));
243239
}
244240

@@ -261,8 +257,6 @@ public void testMetricsAfterStreamUnsupported() {
261257
assertThat(attributes.get("model_id"), nullValue());
262258
assertThat(attributes.get("status_code"), is(expectedStatus.getStatus()));
263259
assertThat(attributes.get("error.type"), is(expectedError));
264-
assertThat(attributes.get("rerouted"), is(Boolean.FALSE));
265-
assertThat(attributes.get("node_id"), is(localNodeId));
266260
}));
267261
}
268262

@@ -278,8 +272,6 @@ public void testMetricsAfterInferSuccess() {
278272
assertThat(attributes.get("model_id"), nullValue());
279273
assertThat(attributes.get("status_code"), is(200));
280274
assertThat(attributes.get("error.type"), nullValue());
281-
assertThat(attributes.get("rerouted"), is(Boolean.FALSE));
282-
assertThat(attributes.get("node_id"), is(localNodeId));
283275
}));
284276
}
285277

@@ -291,8 +283,6 @@ public void testMetricsAfterStreamInferSuccess() {
291283
assertThat(attributes.get("model_id"), nullValue());
292284
assertThat(attributes.get("status_code"), is(200));
293285
assertThat(attributes.get("error.type"), nullValue());
294-
assertThat(attributes.get("rerouted"), is(Boolean.FALSE));
295-
assertThat(attributes.get("node_id"), is(localNodeId));
296286
}));
297287
}
298288

@@ -306,8 +296,6 @@ public void testMetricsAfterStreamInferFailure() {
306296
assertThat(attributes.get("model_id"), nullValue());
307297
assertThat(attributes.get("status_code"), nullValue());
308298
assertThat(attributes.get("error.type"), is(expectedError));
309-
assertThat(attributes.get("rerouted"), is(Boolean.FALSE));
310-
assertThat(attributes.get("node_id"), is(localNodeId));
311299
}));
312300
}
313301

@@ -341,8 +329,6 @@ public void onComplete() {
341329
assertThat(attributes.get("model_id"), nullValue());
342330
assertThat(attributes.get("status_code"), is(200));
343331
assertThat(attributes.get("error.type"), nullValue());
344-
assertThat(attributes.get("rerouted"), is(Boolean.FALSE));
345-
assertThat(attributes.get("node_id"), is(localNodeId));
346332
}));
347333
}
348334

@@ -445,8 +431,4 @@ protected Model mockModel(TaskType expectedTaskType) {
445431
protected void mockValidLicenseState() {
446432
when(licenseState.isAllowed(InferencePlugin.INFERENCE_API_FEATURE)).thenReturn(true);
447433
}
448-
449-
private void mockNodeClient() {
450-
when(nodeClient.getLocalNodeId()).thenReturn(localNodeId);
451-
}
452434
}

0 commit comments

Comments
 (0)