Skip to content

Commit 65f5d6e

Browse files
DonalEvansrjernst
authored andcommitted
[ML] Return 429 status when RequestExecutorService queue full (elastic#134178)
Rather than always throwing an ElasticsearchStatusException with 500 status in ActionUtils.wrapFailuresInElasticsearchException(), determine the appropriate status from the unwrapped exception - Remove createInternalServerError() method from ActionUtils - Refactor AlibabaCloudSearch*Action classes to be consistent with SenderExecutableAction - Update tests to account for new behaviour
1 parent 623e5a2 commit 65f5d6e

File tree

8 files changed

+82
-59
lines changed

8 files changed

+82
-59
lines changed

docs/changelog/134178.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 134178
2+
summary: Return 429 status when `RequestExecutorService` queue full
3+
area: Machine Learning
4+
type: bug
5+
issues: []

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/action/ActionUtils.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.action.ActionListener;
1414
import org.elasticsearch.common.Strings;
1515
import org.elasticsearch.inference.InferenceServiceResults;
16-
import org.elasticsearch.rest.RestStatus;
1716

1817
public class ActionUtils {
1918

@@ -28,19 +27,17 @@ public static ActionListener<InferenceServiceResults> wrapFailuresInElasticsearc
2827
l.onFailure(esException);
2928
} else {
3029
l.onFailure(
31-
createInternalServerError(
32-
unwrappedException,
33-
Strings.format("%s. Cause: %s", errorMessage, unwrappedException.getMessage())
30+
// Determine the appropriate RestStatus from the unwrapped exception, then wrap in an ElasticsearchStatusException
31+
new ElasticsearchStatusException(
32+
Strings.format("%s. Cause: %s", errorMessage, unwrappedException.getMessage()),
33+
ExceptionsHelper.status(unwrappedException),
34+
unwrappedException
3435
)
3536
);
3637
}
3738
});
3839
}
3940

40-
public static ElasticsearchStatusException createInternalServerError(Throwable e, String message) {
41-
return new ElasticsearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR, e);
42-
}
43-
4441
public static String constructFailedToSendRequestMessage(String message) {
4542
return Strings.format("Failed to send %s request", message);
4643
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchCompletionAction.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
12-
import org.elasticsearch.ElasticsearchException;
1312
import org.elasticsearch.ElasticsearchStatusException;
1413
import org.elasticsearch.action.ActionListener;
1514
import org.elasticsearch.core.TimeValue;
@@ -27,7 +26,6 @@
2726
import java.util.Objects;
2827

2928
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage;
30-
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.createInternalServerError;
3129
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.wrapFailuresInElasticsearchException;
3230

3331
public class AlibabaCloudSearchCompletionAction implements ExecutableAction {
@@ -61,16 +59,14 @@ public void execute(InferenceInputs inferenceInputs, TimeValue timeout, ActionLi
6159
return;
6260
}
6361

62+
ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
63+
failedToSendRequestErrorMessage,
64+
listener
65+
);
6466
try {
65-
ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
66-
failedToSendRequestErrorMessage,
67-
listener
68-
);
6967
sender.send(requestCreator, inferenceInputs, timeout, wrappedListener);
70-
} catch (ElasticsearchException e) {
71-
listener.onFailure(e);
7268
} catch (Exception e) {
73-
listener.onFailure(createInternalServerError(e, failedToSendRequestErrorMessage));
69+
wrappedListener.onFailure(e);
7470
}
7571
}
7672
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchEmbeddingsAction.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
package org.elasticsearch.xpack.inference.services.alibabacloudsearch.action;
99

10-
import org.elasticsearch.ElasticsearchException;
1110
import org.elasticsearch.action.ActionListener;
1211
import org.elasticsearch.core.TimeValue;
1312
import org.elasticsearch.inference.InferenceServiceResults;
@@ -22,7 +21,6 @@
2221
import java.util.Objects;
2322

2423
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage;
25-
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.createInternalServerError;
2624
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.wrapFailuresInElasticsearchException;
2725

2826
public class AlibabaCloudSearchEmbeddingsAction implements ExecutableAction {
@@ -42,16 +40,14 @@ public AlibabaCloudSearchEmbeddingsAction(Sender sender, AlibabaCloudSearchEmbed
4240

4341
@Override
4442
public void execute(InferenceInputs inferenceInputs, TimeValue timeout, ActionListener<InferenceServiceResults> listener) {
43+
ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
44+
failedToSendRequestErrorMessage,
45+
listener
46+
);
4547
try {
46-
ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
47-
failedToSendRequestErrorMessage,
48-
listener
49-
);
5048
sender.send(requestCreator, inferenceInputs, timeout, wrappedListener);
51-
} catch (ElasticsearchException e) {
52-
listener.onFailure(e);
5349
} catch (Exception e) {
54-
listener.onFailure(createInternalServerError(e, failedToSendRequestErrorMessage));
50+
wrappedListener.onFailure(e);
5551
}
5652
}
5753
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchRerankAction.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
12-
import org.elasticsearch.ElasticsearchException;
1312
import org.elasticsearch.action.ActionListener;
1413
import org.elasticsearch.core.TimeValue;
1514
import org.elasticsearch.inference.InferenceServiceResults;
@@ -24,7 +23,6 @@
2423
import java.util.Objects;
2524

2625
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage;
27-
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.createInternalServerError;
2826
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.wrapFailuresInElasticsearchException;
2927

3028
public class AlibabaCloudSearchRerankAction implements ExecutableAction {
@@ -46,16 +44,14 @@ public AlibabaCloudSearchRerankAction(Sender sender, AlibabaCloudSearchRerankMod
4644

4745
@Override
4846
public void execute(InferenceInputs inferenceInputs, TimeValue timeout, ActionListener<InferenceServiceResults> listener) {
47+
ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
48+
failedToSendRequestErrorMessage,
49+
listener
50+
);
4951
try {
50-
ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
51-
failedToSendRequestErrorMessage,
52-
listener
53-
);
5452
sender.send(requestCreator, inferenceInputs, timeout, wrappedListener);
55-
} catch (ElasticsearchException e) {
56-
listener.onFailure(e);
5753
} catch (Exception e) {
58-
listener.onFailure(createInternalServerError(e, failedToSendRequestErrorMessage));
54+
wrappedListener.onFailure(e);
5955
}
6056
}
6157
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchSparseAction.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
12-
import org.elasticsearch.ElasticsearchException;
1312
import org.elasticsearch.action.ActionListener;
1413
import org.elasticsearch.core.TimeValue;
1514
import org.elasticsearch.inference.InferenceServiceResults;
@@ -24,7 +23,6 @@
2423
import java.util.Objects;
2524

2625
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage;
27-
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.createInternalServerError;
2826
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.wrapFailuresInElasticsearchException;
2927

3028
public class AlibabaCloudSearchSparseAction implements ExecutableAction {
@@ -46,16 +44,14 @@ public AlibabaCloudSearchSparseAction(Sender sender, AlibabaCloudSearchSparseMod
4644

4745
@Override
4846
public void execute(InferenceInputs inferenceInputs, TimeValue timeout, ActionListener<InferenceServiceResults> listener) {
47+
ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
48+
failedToSendRequestErrorMessage,
49+
listener
50+
);
4951
try {
50-
ActionListener<InferenceServiceResults> wrappedListener = wrapFailuresInElasticsearchException(
51-
failedToSendRequestErrorMessage,
52-
listener
53-
);
5452
sender.send(requestCreator, inferenceInputs, timeout, wrappedListener);
55-
} catch (ElasticsearchException e) {
56-
listener.onFailure(e);
5753
} catch (Exception e) {
58-
listener.onFailure(createInternalServerError(e, failedToSendRequestErrorMessage));
54+
wrappedListener.onFailure(e);
5955
}
6056
}
6157
}

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/action/SenderExecutableActionTests.java

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010
import org.elasticsearch.ElasticsearchException;
1111
import org.elasticsearch.ElasticsearchStatusException;
1212
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
1314
import org.elasticsearch.core.TimeValue;
1415
import org.elasticsearch.inference.InferenceServiceResults;
16+
import org.elasticsearch.rest.RestStatus;
1517
import org.elasticsearch.test.ESTestCase;
1618
import org.elasticsearch.xpack.inference.external.http.sender.InferenceInputs;
1719
import org.elasticsearch.xpack.inference.external.http.sender.RequestManager;
@@ -89,31 +91,67 @@ public void testSenderReturnedElasticsearchExceptionIsUnwrapped() {
8991
@SuppressWarnings("unchecked")
9092
public void testSendThrowingExceptionIsWrapped() {
9193
var expectedException = new IllegalStateException("test");
92-
var actualException = new AtomicReference<Exception>();
94+
var actualExceptionReference = new AtomicReference<Exception>();
9395

9496
doThrow(expectedException).when(sender)
9597
.send(eq(requestManager), any(InferenceInputs.class), any(TimeValue.class), any(ActionListener.class));
9698

97-
execute(actualException);
99+
execute(actualExceptionReference);
98100

99-
assertThat(actualException.get(), notNullValue());
100-
assertThat(actualException.get().getMessage(), is(failureExceptionMessage));
101-
assertThat(actualException.get(), instanceOf(ElasticsearchStatusException.class));
102-
assertThat(actualException.get().getCause(), sameInstance(expectedException));
101+
Exception actualException = actualExceptionReference.get();
102+
assertThat(actualException, notNullValue());
103+
assertThat(actualException.getMessage(), is(failureExceptionMessage));
104+
assertThat(actualException, instanceOf(ElasticsearchStatusException.class));
105+
assertThat(actualException.getCause(), sameInstance(expectedException));
106+
assertThat(((ElasticsearchStatusException) actualException).status(), is(RestStatus.INTERNAL_SERVER_ERROR));
103107
}
104108

105109
public void testSenderReturnedExceptionIsWrapped() {
106110
var expectedException = new IllegalStateException("test");
107-
var actualException = new AtomicReference<Exception>();
111+
var actualExceptionReference = new AtomicReference<Exception>();
108112

109113
mockSender(listener -> listener.onFailure(expectedException));
110114

111-
execute(actualException);
115+
execute(actualExceptionReference);
112116

113-
assertThat(actualException.get(), notNullValue());
114-
assertThat(actualException.get().getMessage(), is(failureExceptionMessage));
115-
assertThat(actualException.get(), instanceOf(ElasticsearchStatusException.class));
116-
assertThat(actualException.get().getCause(), sameInstance(expectedException));
117+
Exception actualException = actualExceptionReference.get();
118+
assertThat(actualException, notNullValue());
119+
assertThat(actualException.getMessage(), is(failureExceptionMessage));
120+
assertThat(actualException, instanceOf(ElasticsearchStatusException.class));
121+
assertThat(actualException.getCause(), sameInstance(expectedException));
122+
assertThat(((ElasticsearchStatusException) actualException).status(), is(RestStatus.INTERNAL_SERVER_ERROR));
123+
}
124+
125+
public void testSenderReturnedExceptionHasCorrectStatus_whenExceptionIsIllegalArgumentException() {
126+
var expectedException = new IllegalArgumentException("test");
127+
var actualExceptionReference = new AtomicReference<Exception>();
128+
129+
mockSender(listener -> listener.onFailure(expectedException));
130+
131+
execute(actualExceptionReference);
132+
133+
Exception actualException = actualExceptionReference.get();
134+
assertThat(actualException, notNullValue());
135+
assertThat(actualException.getMessage(), is(failureExceptionMessage));
136+
assertThat(actualException, instanceOf(ElasticsearchStatusException.class));
137+
assertThat(actualException.getCause(), sameInstance(expectedException));
138+
assertThat(((ElasticsearchStatusException) actualException).status(), is(RestStatus.BAD_REQUEST));
139+
}
140+
141+
public void testSenderReturnedExceptionHasCorrectStatus_whenExceptionIsEsRejectedExecutionException() {
142+
var expectedException = new EsRejectedExecutionException("test");
143+
var actualExceptionReference = new AtomicReference<Exception>();
144+
145+
mockSender(listener -> listener.onFailure(expectedException));
146+
147+
execute(actualExceptionReference);
148+
149+
Exception actualException = actualExceptionReference.get();
150+
assertThat(actualException, notNullValue());
151+
assertThat(actualException.getMessage(), is(failureExceptionMessage));
152+
assertThat(actualException, instanceOf(ElasticsearchStatusException.class));
153+
assertThat(actualException.getCause(), sameInstance(expectedException));
154+
assertThat(((ElasticsearchStatusException) actualException).status(), is(RestStatus.TOO_MANY_REQUESTS));
117155
}
118156

119157
@SuppressWarnings("unchecked")

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/services/alibabacloudsearch/action/AlibabaCloudSearchCompletionActionTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import static org.elasticsearch.xpack.core.inference.results.ChatCompletionResultsTests.buildExpectationCompletion;
4343
import static org.elasticsearch.xpack.inference.Utils.inferenceUtilityExecutors;
4444
import static org.elasticsearch.xpack.inference.Utils.mockClusterServiceEmpty;
45-
import static org.elasticsearch.xpack.inference.external.action.ActionUtils.constructFailedToSendRequestMessage;
4645
import static org.elasticsearch.xpack.inference.services.settings.DefaultSecretSettingsTests.getSecretSettingsMap;
4746
import static org.hamcrest.Matchers.is;
4847
import static org.mockito.ArgumentMatchers.any;
@@ -110,8 +109,8 @@ public void testExecute_ListenerThrowsInternalServerError_WhenSenderThrowsExcept
110109
PlainActionFuture<InferenceServiceResults> listener = new PlainActionFuture<>();
111110
action.execute(new ChatCompletionInput(List.of(randomAlphaOfLength(10))), InferenceAction.Request.DEFAULT_TIMEOUT, listener);
112111

113-
var thrownException = expectThrows(ElasticsearchException.class, () -> listener.actionGet(TIMEOUT));
114-
assertThat(thrownException.getMessage(), is(constructFailedToSendRequestMessage("AlibabaCloud Search completion")));
112+
var thrownException = expectThrows(ElasticsearchStatusException.class, () -> listener.actionGet(TIMEOUT));
113+
assertThat(thrownException.getMessage(), is("Failed to send AlibabaCloud Search completion request. Cause: error"));
115114
}
116115

117116
public void testExecute_ThrowsIllegalArgumentException_WhenInputIsNotChatCompletionInput() {

0 commit comments

Comments
 (0)